#!/usr/bin/env python3 from dataclasses import dataclass from datetime import datetime from itertools import zip_longest from typing import NamedTuple, Optional from urllib.parse import urlparse, urlencode, urlunparse from bs4 import BeautifulSoup import more_itertools as mlt import pytz import requests HYPERDIA_CGI = "http://www.hyperdia.com/en/cgi/search/en/hyperdia2.cgi" HYPERDIA_SEARCH = "http://www.hyperdia.com/en/cgi/en/search.html" HYPERDIA_PARAMS = { "dep_node": "", "arv_node": "", "year": "", "month": "", "day": "", "hour": "", "minute": "", "search_type": "0", "transtime": "undefined", "max_route": "5", "sort": "0", "faretype": "0", "ship": "off", "lmlimit": None, "sum_target": "7", "facility": "reserved", "search_target": "route", "sprexprs": "on", "sprnozomi": "on", "slpexprs": "on", "jr": "on", "privately": "on", "search_way": "" } HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:71.0) Gecko/20100101 Firefox/71.0', 'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Host': 'www.hyperdia.com', 'Origin': 'http://www.hyperdia.com' } @dataclass class HyperdiaStep: start_station: str end_station: str start_time: datetime end_time: datetime duration: Optional[str] = None train_name: Optional[str] = None is_transfer: Optional[bool] = False def pairwise(iterable): "s -> (s0, s1), (s2, s3), (s4, s5), ..." a = iter(iterable) return zip(a, a) def grouped(iterable, n): """s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), (s2n,s2n+1,s2n+2,...s3n-1), ...""" return zip(*[iter(iterable)]*n) def get_hyperdia_data(start_station, end_station, hour, minute, day="15", month="08", year="2020", via=None): session = requests.Session() post_params = HYPERDIA_PARAMS.copy() headers = HEADERS.copy() post_params["dep_node"] = start_station post_params["arv_node"] = end_station post_params["year"] = year post_params["day"] = day post_params["month"] = month post_params["hour"] = hour post_params["minute"] = minute if via is None: for element in ("via_node01", "via_node02", "via_node03"): post_params[element] = "" else: if len(via) > 3: raise ValueError("Only up to three through stations are allowed") for node, station in zip_longest( via, ("via_node01", "via_node02", "via_node03"), fill_value=""): post_params[node] = station referer = list(urlparse(HYPERDIA_SEARCH)) referer[4] = urlencode(post_params) referer = urlunparse(referer) headers["Referer"] = referer session.headers.update(headers) result = session.post(HYPERDIA_CGI, data=post_params) return result #TODO: Adjust this, use the Firefox inspector # For now, keep this in mind: # Odd rows per result: stations # Even rows: Train names, transfers... def parse_hyperdia_heading(soup, fare_number=1): data = dict() mapping = {1: "total_time", 2: "transfer_num", 3: "total_distance"} counter = 1 for element in soup.find_all("span", class_="text_blue"): if counter > 3: break data[mapping[counter]] = element.text counter += 1 fare = soup.find("span", {"class": "text_blue", "id": f"fare_total{fare_number}"}) fare = int(fare.text.replace(",", "")) data["total_fare"] = fare return data def parse_station_time(element, year, month, day, start=True): times = list(element.stripped_strings) # The first element if it's a transfer (arrival time; we ignore walking) # Otherwise we get the only item current_time = times[-1] if start else times[0] station_time = datetime(year, month, day, int(current_time.split(":")[0]), int(current_time.split(":")[1]), tzinfo=pytz.timezone("Japan")) return station_time def parse_train_name(element): # Trains are in a list with only one element, inside a span selected_item = element.select("td > ul > li > span")[0] return list(selected_item.stripped_strings)[0] def parse_hyperdia_table(soup, year, month, day): data = list() previous = 0 # Skip the heading and the row immediately afterwards (commuter pass) for group in mlt.windowed(soup.find_all("tr")[2:], n=3, step=2): # Groups of 3 elements: # First row: start station (time in first column, station in column 3) # Second row: train information (duration in column 1, name in column 3) # Third row: arrival time(s) (same format as first row) # Times might be repeated more than once if it's a transfer start_info, journey_info, end_info = group startdata = start_info.find_all("td")[0:3] traindata = journey_info.find_all("td")[2] enddata = end_info.find_all("td")[0:3] # Ignore "add to favorities" start_station_name = list(startdata[2].stripped_strings)[0] start_station_time = parse_station_time(startdata[0], year, month, day, start=True) train_name = parse_train_name(traindata) end_station_name = list(enddata[2].stripped_strings)[0] end_station_time = parse_station_time(enddata[0], year, month, day, start=False) is_transfer = True if train_name == "Walk" else False duration = ((end_station_time - start_station_time).seconds / 60) % 60 entry = HyperdiaStep( start_station=start_station_name, end_station=end_station_name, start_time=start_station_time, end_time=end_station_time, train_name=train_name, is_transfer=is_transfer, duration=duration) data.append(entry) return data def parse_hyperdia_html(soup): tables = soup.find_all("table", {"class": "table"}) titles = soup.find_all("div", {"class": "title2"}) results = list() for data in tables: properties = {} extracted = data.find_all( "span", {"class": ["text_16", "text_blue_l", "text_blue_p"]}) parsed = list(pairwise(extracted)) start = parsed[0] end = parsed[-1] properties["start"] = start[1].text properties["starttime"] = start[0].text properties["end"] = end[1].text properties["endtime"] = end[0].text.strip() results.append(properties) return results