#!/usr/bin/env python3 from dataclasses import dataclass from datetime import datetime from itertools import zip_longest import re from typing import NamedTuple, Optional, List from urllib.parse import urlparse, urlencode, urlunparse from bs4 import BeautifulSoup import more_itertools as mlt import pandas as pd import pytz import requests from tabulate import tabulate HYPERDIA_CGI = "http://www.hyperdia.com/en/cgi/search/en/hyperdia2.cgi" HYPERDIA_SEARCH = "http://www.hyperdia.com/en/cgi/en/search.html" GROUP_MATCHER = re.compile(r".*No\.(?P[0-9]{1,}).*") HYPERDIA_PARAMS = { "dep_node": "", "arv_node": "", "year": "", "month": "", "day": "", "hour": "", "minute": "", "search_type": "0", "transtime": "undefined", "max_route": "5", "sort": "0", "faretype": "0", "ship": "off", "lmlimit": None, "sum_target": "7", "facility": "reserved", "search_target": "route", "sprexprs": "on", "sprnozomi": "on", "slpexprs": "on", "jr": "on", "privately": "on", "search_way": "" } HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:71.0) Gecko/20100101 Firefox/71.0', 'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Host': 'www.hyperdia.com', 'Origin': 'http://www.hyperdia.com' } @dataclass class HyperdiaStep: start_station: str end_station: str start_time: datetime end_time: datetime duration: Optional[str] = None train_name: Optional[str] = None is_transfer: Optional[bool] = False start_track_number: Optional[int] = None end_track_number: Optional[int] = None @dataclass class HyperdiaTrip: steps: List[HyperdiaStep] total_distance: int total_time: int total_cost: int transfers: int def get_hyperdia_data(start_station, end_station, hour, minute, day="15", month="08", year="2020", max_route=5, via=None): session = requests.Session() post_params = HYPERDIA_PARAMS.copy() headers = HEADERS.copy() post_params["dep_node"] = start_station post_params["arv_node"] = end_station post_params["year"] = year post_params["day"] = day post_params["month"] = month post_params["hour"] = hour post_params["minute"] = minute post_params["max_route"] = max_route if via is None: for element in ("via_node01", "via_node02", "via_node03"): post_params[element] = "" else: if len(via) > 3: raise ValueError("Only up to three through stations are allowed") for node, station in zip_longest( via, ("via_node01", "via_node02", "via_node03"), fill_value=""): post_params[node] = station referer = list(urlparse(HYPERDIA_SEARCH)) referer[4] = urlencode(post_params) referer = urlunparse(referer) headers["Referer"] = referer session.headers.update(headers) result = session.post(HYPERDIA_CGI, data=post_params) return result def parse_hyperdia_heading(soup): # Heading (div class="title_r") with this structure: # First span: total time in minutes # Second span: number of transfers # Third span: total distance in Km # Fourth span: total cost in JPY elements = soup.select("span")[0:4] total_time, transfers, distance, cost = [item.text.strip() for item in elements] cost = int(cost.replace(",", "")) return {"total_time": total_time, "transfers": transfers, "total_distance": distance, "total_cost": cost} def parse_station_time(element, year, month, day, start=True): times = list(element.stripped_strings) # The first element if it's a transfer (arrival time; we ignore walking) # Otherwise we get the only item current_time = times[-1] if start else times[0] hour, minutes = current_time.split(":") station_time = datetime(year, int(month), int(day), int(hour), int(minutes), tzinfo=pytz.timezone("Japan")) return station_time def parse_train_name(element): # Trains are in a list with only one element, inside a span selected_item = element.select("td > ul > li > span")[0] return list(selected_item.stripped_strings)[0] def parse_track_number(element): # Second span in the station name column contains the track number # if applicable (if not, it's empty) track_data = element.select("span")[1].text if not track_data: return None track_number = int(GROUP_MATCHER.search(track_data)["tracknum"]) return track_number def parse_hyperdia_table(soup, year, month, day): data = list() previous = 0 # Skip the heading and the row immediately afterwards (commuter pass) for group in mlt.windowed(soup.find_all("tr")[2:], n=3, step=2): # Groups of 3 elements: # First row: start station (time in first column, station in column 3) # Second row: train information (duration in column 1, name in column 3) # Third row: arrival time(s) (same format as first row) # Times might be repeated more than once if it's a transfer start_info, journey_info, end_info = group startdata = start_info.find_all("td")[0:3] traindata = journey_info.find_all("td")[2] enddata = end_info.find_all("td")[0:3] # Ignore "add to favorities" start_station_name = list(startdata[2].stripped_strings)[0] # Second span in the station name column contains the track number # if applicable (if not, it's empty) start_track_number = parse_track_number(startdata[2]) end_track_number = parse_track_number(enddata[2]) start_station_time = parse_station_time(startdata[0], year, month, day, start=True) train_name = parse_train_name(traindata) end_station_name = list(enddata[2].stripped_strings)[0] end_station_time = parse_station_time(enddata[0], year, month, day, start=False) is_transfer = True if train_name == "Walk" else False duration = ((end_station_time - start_station_time).seconds / 60) % 60 entry = HyperdiaStep( start_station=start_station_name, end_station=end_station_name, start_time=start_station_time, end_time=end_station_time, train_name=train_name, is_transfer=is_transfer, duration=duration, start_track_number=start_track_number, end_track_number=end_track_number) data.append(entry) return data def parse_hyperdia_html(soup, *args, **kwargs): tables = soup.find_all("table", {"class": "table"}) headings = soup.find_all("div", {"class": "title_r"}) results = list() for heading, table in zip(headings, tables): parsed_heading = parse_hyperdia_heading(heading) parsed_table = parse_hyperdia_table(table, *args, **kwargs) trip = HyperdiaTrip(steps=parsed_table, **parsed_heading) results.append(trip) return results def convert_trip_to_table(trip: HyperdiaTrip) -> pd.DataFrame: columns = ["From", "Departure time", "Departure track", "To", "Arrival time", "Arrival track", "Duration", "Train / Transfer"] rows = list() for element in trip.steps: start_track_number = ("-" if not element.start_track_number else f"{element.start_track_number:.0f}") end_track_number = ("-" if not element.end_track_number else f"{element.end_track_number:.0f}") row = (element.start_station, f"{element.start_time: %H:%M}", start_track_number, element.end_station, f"{element.end_time: %H:%M}", end_track_number, f"{element.duration:.0f} minutes", element.train_name) rows.append(row) df = pd.DataFrame.from_records(rows, columns=columns) df = df.fillna("-") return df def trip_summary(trip: HyperdiaTrip) -> str: table = convert_trip_to_table(trip) table = tabulate(table, tablefmt="github", headers="keys", showindex=False) summary = (f"Total time: {trip.total_time} minutes," f" Total distance: {trip.total_distance}," f" Total cost {trip.total_cost} JPY") return table + "\n\n" + summary + "\n\n" def hyperdia_search(start_station: str, end_station: str, hour: int, minute: int, day: int = "15", month: str = "08", year: int = 2020, max_route: int = 5, via: List[str] = None) -> List[str]: raw_result = get_hyperdia_data(start_station, end_station, hour, minute, day, month, year, max_route, via) soup = BeautifulSoup(raw_result.text, "html.parser") results = parse_hyperdia_html(soup, year=year, month=month, day=day) for trip in results: print(trip_summary(trip))