diff --git a/hyperdia.py b/hyperdia.py new file mode 100644 index 0000000..f6b9738 --- /dev/null +++ b/hyperdia.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 + +from dataclasses import dataclass +from datetime import datetime +from itertools import zip_longest +from typing import NamedTuple, Optional +from urllib.parse import urlparse, urlencode, urlunparse + +from bs4 import BeautifulSoup +import more_itertools as mlt +import pytz +import requests + +HYPERDIA_CGI = "http://www.hyperdia.com/en/cgi/search/en/hyperdia2.cgi" +HYPERDIA_SEARCH = "http://www.hyperdia.com/en/cgi/en/search.html" + +HYPERDIA_PARAMS = { + "dep_node": "", + "arv_node": "", + "year": "", + "month": "", + "day": "", + "hour": "", + "minute": "", + "search_type": "0", + "transtime": "undefined", + "max_route": "5", + "sort": "0", + "faretype": "0", + "ship": "off", + "lmlimit": None, + "sum_target": "7", + "facility": "reserved", + "search_target": "route", + "sprexprs": "on", + "sprnozomi": "on", + "slpexprs": "on", + "jr": "on", + "privately": "on", + "search_way": "" +} + +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:71.0) Gecko/20100101 Firefox/71.0', + 'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'Host': 'www.hyperdia.com', + 'Origin': 'http://www.hyperdia.com' +} + + +@dataclass +class HyperdiaStep: + + start_station: str + end_station: str + start_time: datetime + end_time: datetime + duration: Optional[str] = None + train_name: Optional[str] = None + is_transfer: Optional[bool] = False + + +def pairwise(iterable): + "s -> (s0, s1), (s2, s3), (s4, s5), ..." + a = iter(iterable) + return zip(a, a) + + +def grouped(iterable, n): + """s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), + (s2n,s2n+1,s2n+2,...s3n-1), ...""" + return zip(*[iter(iterable)]*n) + + +def get_hyperdia_data(start_station, end_station, hour, minute, day="15", + month="08", year="2020", via=None): + + session = requests.Session() + post_params = HYPERDIA_PARAMS.copy() + headers = HEADERS.copy() + + post_params["dep_node"] = start_station + post_params["arv_node"] = end_station + post_params["year"] = year + post_params["day"] = day + post_params["month"] = month + post_params["hour"] = hour + post_params["minute"] = minute + + if via is None: + for element in ("via_node01", "via_node02", "via_node03"): + post_params[element] = "" + else: + + if len(via) > 3: + raise ValueError("Only up to three through stations are allowed") + + for node, station in zip_longest( + via, + ("via_node01", "via_node02", "via_node03"), + fill_value=""): + + post_params[node] = station + + referer = list(urlparse(HYPERDIA_SEARCH)) + referer[4] = urlencode(post_params) + referer = urlunparse(referer) + headers["Referer"] = referer + + session.headers.update(headers) + + result = session.post(HYPERDIA_CGI, data=post_params) + + return result + +#TODO: Adjust this, use the Firefox inspector +# For now, keep this in mind: +# Odd rows per result: stations +# Even rows: Train names, transfers... + + +def parse_hyperdia_heading(soup, fare_number=1): + + data = dict() + + mapping = {1: "total_time", 2: "transfer_num", 3: "total_distance"} + + counter = 1 + + for element in soup.find_all("span", class_="text_blue"): + + if counter > 3: + break + + data[mapping[counter]] = element.text + counter += 1 + + fare = soup.find("span", {"class": "text_blue", + "id": f"fare_total{fare_number}"}) + fare = int(fare.text.replace(",", "")) + data["total_fare"] = fare + + return data + + +def parse_station_time(element, year, month, day, start=True): + + times = list(element.stripped_strings) + # The first element if it's a transfer (arrival time; we ignore walking) + # Otherwise we get the only item + + current_time = times[-1] if start else times[0] + station_time = datetime(year, month, day, int(current_time.split(":")[0]), + int(current_time.split(":")[1]), + tzinfo=pytz.timezone("Japan")) + + return station_time + + +def parse_train_name(element): + + # Trains are in a list with only one element, inside a span + selected_item = element.select("td > ul > li > span")[0] + + return list(selected_item.stripped_strings)[0] + + +def parse_hyperdia_table(soup, year, month, day): + + data = list() + + previous = 0 + + # Skip the heading and the row immediately afterwards (commuter pass) + for group in mlt.windowed(soup.find_all("tr")[2:], n=3, step=2): + + # Groups of 3 elements: + # First row: start station (time in first column, station in column 3) + # Second row: train information (duration in column 1, name in column 3) + # Third row: arrival time(s) (same format as first row) + # Times might be repeated more than once if it's a transfer + + start_info, journey_info, end_info = group + startdata = start_info.find_all("td")[0:3] + traindata = journey_info.find_all("td")[2] + enddata = end_info.find_all("td")[0:3] + # Ignore "add to favorities" + start_station_name = list(startdata[2].stripped_strings)[0] + start_station_time = parse_station_time(startdata[0], year, month, day, + start=True) + train_name = parse_train_name(traindata) + end_station_name = list(enddata[2].stripped_strings)[0] + end_station_time = parse_station_time(enddata[0], year, month, day, + start=False) + + is_transfer = True if train_name == "Walk" else False + duration = ((end_station_time - start_station_time).seconds / 60) % 60 + + entry = HyperdiaStep( + start_station=start_station_name, + end_station=end_station_name, + start_time=start_station_time, + end_time=end_station_time, + train_name=train_name, + is_transfer=is_transfer, + duration=duration) + + data.append(entry) + + return data + + +def parse_hyperdia_html(soup): + + tables = soup.find_all("table", {"class": "table"}) + titles = soup.find_all("div", {"class": "title2"}) + + results = list() + for data in tables: + properties = {} + extracted = data.find_all( + "span", {"class": ["text_16", + "text_blue_l", "text_blue_p"]}) + parsed = list(pairwise(extracted)) + start = parsed[0] + end = parsed[-1] + + properties["start"] = start[1].text + properties["starttime"] = start[0].text + properties["end"] = end[1].text + properties["endtime"] = end[0].text.strip() + results.append(properties) + return results