Hyperdia also lists the stations where the train goes through, and changes line name, although you do not require to change trains This marks them as such in both the JSON and the Markdown output.
445 lines
14 KiB
Python
Executable file
445 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from itertools import zip_longest
|
|
import re
|
|
from typing import Optional, List
|
|
from urllib.parse import urlparse, urlencode, urlunparse
|
|
|
|
from bs4 import BeautifulSoup
|
|
import more_itertools as mlt
|
|
import pandas as pd
|
|
import pytz
|
|
import requests
|
|
import simplejson as json
|
|
from tabulate import tabulate
|
|
|
|
HYPERDIA_CGI = "http://www.hyperdia.com/en/cgi/search/en/hyperdia2.cgi"
|
|
HYPERDIA_SEARCH = "http://www.hyperdia.com/en/cgi/en/search.html"
|
|
GROUP_MATCHER = re.compile(r".*No\.(?P<tracknum>[0-9]{1,}).*")
|
|
|
|
HYPERDIA_PARAMS = {
|
|
"dep_node": "",
|
|
"arv_node": "",
|
|
"year": "",
|
|
"month": "",
|
|
"day": "",
|
|
"hour": "",
|
|
"minute": "",
|
|
"search_type": "0",
|
|
"transtime": "undefined",
|
|
"max_route": "5",
|
|
"sort": "0",
|
|
"faretype": "0",
|
|
"ship": "off",
|
|
"lmlimit": None,
|
|
"sum_target": "7",
|
|
"facility": "reserved",
|
|
"search_target": "route",
|
|
"sprexprs": "on", # Shinkansen
|
|
"sprnozomi": "on", # Shinkansen plus Nozomi/Mizuho
|
|
"slpexprs": "on", # 特急 aka limited express
|
|
"jr": "on", # JR lines
|
|
"privately": "on", # Non-JR lines
|
|
"search_way": ""
|
|
}
|
|
|
|
HEADERS = {
|
|
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:71.0) Gecko/20100101 Firefox/71.0',
|
|
'Content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
|
|
'Host': 'www.hyperdia.com',
|
|
'Origin': 'http://www.hyperdia.com'
|
|
}
|
|
|
|
|
|
def required_length(nmin, nmax):
|
|
class RequiredLength(argparse.Action):
|
|
def __call__(self, parser, args, values, option_string=None):
|
|
if not nmin <= len(values) <= nmax:
|
|
msg = (f'argument "{self.dest}" requires '
|
|
f'between {nmin} and {nmax} arguments')
|
|
raise argparse.ArgumentTypeError(msg)
|
|
setattr(args, self.dest, values)
|
|
return RequiredLength
|
|
|
|
|
|
@dataclass
|
|
class HyperdiaStep:
|
|
|
|
start_station: str
|
|
end_station: str
|
|
start_time: datetime
|
|
end_time: datetime
|
|
duration: Optional[str] = None
|
|
train_name: Optional[str] = None
|
|
is_transfer: Optional[bool] = False
|
|
go_through: Optional[bool] = False
|
|
start_track_number: Optional[int] = None
|
|
end_track_number: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class HyperdiaTrip:
|
|
|
|
steps: List[HyperdiaStep]
|
|
total_distance: int
|
|
total_time: int
|
|
total_cost: int
|
|
transfers: int
|
|
result_number: Optional[int] = None
|
|
# Date in format ISO (YYYY-MM-DD)
|
|
travel_date: Optional[str] = None
|
|
|
|
|
|
def _serialize(trip: HyperdiaTrip) -> dict:
|
|
|
|
structure = dict()
|
|
structure["steps"] = list()
|
|
|
|
for attrib in ("total_distance", "total_time", "total_cost", "transfers",
|
|
"result_number", "travel_date"):
|
|
structure[attrib] = getattr(trip, attrib)
|
|
|
|
for step in trip.steps:
|
|
|
|
subdict = dict()
|
|
|
|
subdict["start_station"] = step.start_station
|
|
subdict["end_station"] = step.end_station
|
|
subdict["start_time"] = step.start_time.timestamp()
|
|
subdict["end_time"] = step.end_time.timestamp()
|
|
subdict["duration"] = step.duration
|
|
subdict["train_name"] = step.train_name
|
|
subdict["is_transfer"] = step.is_transfer
|
|
subdict["go_through"] = step.go_through
|
|
subdict["start_track_number"] = step.start_track_number
|
|
subdict["end_track_number"] = step.end_track_number
|
|
|
|
structure["steps"].append(subdict)
|
|
|
|
return structure
|
|
|
|
|
|
def get_hyperdia_data(start_station, end_station, hour, minute, day="15",
|
|
month="08", year="2020", max_route=5, via=None,
|
|
use_shinkansen=True):
|
|
|
|
session = requests.Session()
|
|
post_params = HYPERDIA_PARAMS.copy()
|
|
headers = HEADERS.copy()
|
|
|
|
post_params["dep_node"] = start_station
|
|
post_params["arv_node"] = end_station
|
|
post_params["year"] = year
|
|
post_params["day"] = day
|
|
post_params["month"] = month
|
|
post_params["hour"] = hour
|
|
post_params["minute"] = minute
|
|
post_params["max_route"] = max_route
|
|
|
|
if not use_shinkansen:
|
|
post_params["sprexprs"] = "off"
|
|
post_params["sprnozomi"] = "off"
|
|
|
|
if via is None:
|
|
for element in ("via_node01", "via_node02", "via_node03"):
|
|
post_params[element] = ""
|
|
else:
|
|
|
|
if len(via) > 3:
|
|
raise ValueError("Only up to three through stations are allowed")
|
|
|
|
for station, node in zip_longest(
|
|
via,
|
|
("via_node01", "via_node02", "via_node03"),
|
|
fillvalue=""):
|
|
|
|
post_params[node] = station
|
|
|
|
referer = list(urlparse(HYPERDIA_SEARCH))
|
|
referer[4] = urlencode(post_params)
|
|
referer = urlunparse(referer)
|
|
headers["Referer"] = referer
|
|
|
|
session.headers.update(headers)
|
|
|
|
result = session.post(HYPERDIA_CGI, data=post_params)
|
|
|
|
return result
|
|
|
|
|
|
def parse_hyperdia_heading(soup):
|
|
|
|
# Heading (div class="title_r") with this structure:
|
|
# First span: total time in minutes
|
|
# Second span: number of transfers
|
|
# Third span: total distance in Km
|
|
# Fourth span: total cost in JPY
|
|
|
|
elements = soup.select("span")[0:4]
|
|
|
|
total_time, transfers, distance, cost = [item.text.strip()
|
|
for item in elements]
|
|
|
|
cost = int(cost.replace(",", ""))
|
|
|
|
return {"total_time": total_time, "transfers": transfers,
|
|
"total_distance": distance, "total_cost": cost}
|
|
|
|
|
|
def parse_station_time(element, year, month, day, start=True):
|
|
|
|
times = list(element.stripped_strings)
|
|
# The first element if it's a transfer (arrival time; we ignore walking)
|
|
# Otherwise we get the only item
|
|
|
|
current_time = times[-1] if start else times[0]
|
|
|
|
hour, minutes = current_time.split(":")
|
|
|
|
station_time = datetime(year, int(month), int(day),
|
|
int(hour),
|
|
int(minutes),
|
|
tzinfo=pytz.timezone("Japan"))
|
|
|
|
return station_time
|
|
|
|
|
|
def parse_train_name(element):
|
|
|
|
# Trains are in a list with only one element, inside a span
|
|
selected_item = element.select("td > ul > li > span")[0]
|
|
|
|
# Long train lines (for XXX) have newlines in it, remove, along with tabs
|
|
|
|
result = list(selected_item.stripped_strings)[0]
|
|
result = result.replace("\n", "").replace("\t", "")
|
|
|
|
return result
|
|
|
|
|
|
def parse_track_number(element):
|
|
|
|
# Second span in the station name column contains the track number
|
|
# if applicable (if not, it's empty)
|
|
|
|
track_data = element.select("span")[1].text
|
|
|
|
if not track_data:
|
|
return None
|
|
|
|
track_number = int(GROUP_MATCHER.search(track_data)["tracknum"])
|
|
return track_number
|
|
|
|
|
|
def parse_hyperdia_table(soup, year, month, day):
|
|
|
|
data = list()
|
|
|
|
previous_is_direct = False
|
|
go_through = False
|
|
|
|
# Skip the heading and the row immediately afterwards (commuter pass)
|
|
|
|
for group in mlt.windowed(soup.find_all("tr")[2:], n=3, step=2):
|
|
|
|
# Groups of 3 elements:
|
|
# First row: start station (time in first column, station in column 3)
|
|
# Second row: train information (duration in column 1,
|
|
# name in column 3)
|
|
# Third row: arrival time(s) (same format as first row)
|
|
# Times might be repeated more than once if it's a transfer
|
|
|
|
start_info, journey_info, end_info = group
|
|
startdata = start_info.find_all("td")[0:3]
|
|
traindata = journey_info.find_all("td")[2]
|
|
enddata = end_info.find_all("td")[0:3]
|
|
|
|
# Ignore "add to favorities"
|
|
start_station_name = list(startdata[2].stripped_strings)[0]
|
|
|
|
direct_connection = enddata[1].next_element.get("src")
|
|
|
|
# Second span in the station name column contains the track number
|
|
# if applicable (if not, it's empty)
|
|
start_track_number = parse_track_number(startdata[2])
|
|
end_track_number = parse_track_number(enddata[2])
|
|
|
|
start_station_time = parse_station_time(startdata[0], year, month, day,
|
|
start=True)
|
|
if previous_is_direct:
|
|
train_name = "Line name change, train goes through"
|
|
previous_is_direct = False
|
|
go_through = True
|
|
else:
|
|
train_name = parse_train_name(traindata)
|
|
|
|
if direct_connection is not None and "icon_choku.gif" in direct_connection:
|
|
previous_is_direct = True
|
|
|
|
end_station_name = list(enddata[2].stripped_strings)[0]
|
|
end_station_time = parse_station_time(enddata[0], year, month, day,
|
|
start=False)
|
|
|
|
is_transfer = True if train_name == "Walk" else False
|
|
duration = ((end_station_time - start_station_time).seconds // 60)
|
|
|
|
entry = HyperdiaStep(
|
|
start_station=start_station_name,
|
|
end_station=end_station_name,
|
|
start_time=start_station_time,
|
|
end_time=end_station_time,
|
|
train_name=train_name,
|
|
is_transfer=is_transfer,
|
|
duration=duration,
|
|
start_track_number=start_track_number,
|
|
end_track_number=end_track_number,
|
|
go_through=go_through)
|
|
|
|
go_through = False
|
|
|
|
data.append(entry)
|
|
|
|
return data
|
|
|
|
|
|
def parse_hyperdia_html(soup, *args, **kwargs):
|
|
|
|
tables = soup.find_all("table", {"class": "table"})
|
|
headings = soup.find_all("div", {"class": "title_r"})
|
|
|
|
results = list()
|
|
|
|
for heading, table in zip(headings, tables):
|
|
|
|
parsed_heading = parse_hyperdia_heading(heading)
|
|
parsed_table = parse_hyperdia_table(table, *args, **kwargs)
|
|
|
|
if int(kwargs["month"]) > 9:
|
|
# Add "0" in front of single-digit months
|
|
month = str(kwargs["month"]).zfill(2)
|
|
else:
|
|
month = kwargs["month"]
|
|
|
|
travel_date = f'{kwargs["year"]}-{month}-{kwargs["day"]}'
|
|
|
|
trip = HyperdiaTrip(steps=parsed_table, travel_date=travel_date,
|
|
**parsed_heading)
|
|
results.append(trip)
|
|
|
|
return results
|
|
|
|
|
|
def convert_trip_to_table(trip: HyperdiaTrip) -> pd.DataFrame:
|
|
|
|
columns = ["From", "Departure time", "Departure track",
|
|
"To", "Arrival time", "Arrival track", "Duration",
|
|
"Train / Transfer"]
|
|
|
|
rows = list()
|
|
|
|
for element in trip.steps:
|
|
|
|
start_track_number = ("-" if not element.start_track_number
|
|
else f"{element.start_track_number:.0f}")
|
|
end_track_number = ("-" if not element.end_track_number
|
|
else f"{element.end_track_number:.0f}")
|
|
|
|
row = (element.start_station,
|
|
f"{element.start_time: %H:%M}",
|
|
start_track_number,
|
|
element.end_station,
|
|
f"{element.end_time: %H:%M}",
|
|
end_track_number,
|
|
f"{element.duration:.0f} minutes",
|
|
element.train_name)
|
|
|
|
rows.append(row)
|
|
|
|
df = pd.DataFrame.from_records(rows, columns=columns)
|
|
df = df.fillna("-")
|
|
|
|
return df
|
|
|
|
|
|
def trip_summary(trip: HyperdiaTrip) -> str:
|
|
|
|
table = convert_trip_to_table(trip)
|
|
table = tabulate(table, tablefmt="github", headers="keys", showindex=False)
|
|
|
|
summary = (f"Total time: {trip.total_time} minutes,"
|
|
f" Total distance: {trip.total_distance} Km,"
|
|
f" Total cost: {trip.total_cost} JPY")
|
|
|
|
return table + "\n\n" + summary + "\n\n"
|
|
|
|
|
|
def hyperdia_search(start_station: str, end_station: str, hour: int,
|
|
minute: int, day: int = "15", month: str = "08",
|
|
year: int = 2020, max_route: int = 5,
|
|
via: List[str] = None, output_type: str = "md",
|
|
use_shinkansen: bool = True):
|
|
|
|
# TODO: Error checking
|
|
raw_result = get_hyperdia_data(start_station, end_station,
|
|
hour, minute, day, month, year, max_route,
|
|
via, use_shinkansen)
|
|
soup = BeautifulSoup(raw_result.text, "html.parser")
|
|
results = parse_hyperdia_html(soup, year=year, month=month, day=day)
|
|
|
|
json_data = dict()
|
|
json_data["result"] = list()
|
|
|
|
for index, trip in enumerate(results, start=1):
|
|
|
|
trip.result_number = index
|
|
|
|
if output_type == "md":
|
|
print(f"##### Route {index}", end="\n\n")
|
|
print(trip_summary(trip))
|
|
elif output_type == "json":
|
|
json_data["result"].append(_serialize(trip))
|
|
|
|
if output_type == "json":
|
|
print(json.dumps(json_data, indent=2))
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-t", "--time", help="Hour of travel",
|
|
type=lambda d: datetime.strptime(d, '%H.%M').time())
|
|
parser.add_argument("-d", "--date", help="Date of travel",
|
|
type=lambda d: datetime.strptime(d, "%Y-%m-%d").date())
|
|
parser.add_argument("--max-routes", help="Maximum number of routes",
|
|
type=int)
|
|
parser.add_argument("--no-shinkansen", action="store_false",
|
|
help="Do not use shinkansen routes")
|
|
parser.add_argument("--via", nargs='+', action=required_length(1, 3),
|
|
help="Stations to force route through (min 1, max 3)")
|
|
parser.add_argument("--output-type", choices=("md", "json"), default="md",
|
|
help="Output type (markdown or JSON)")
|
|
parser.add_argument("start_station", help="Start station")
|
|
parser.add_argument("end_station", help="End station")
|
|
|
|
options = parser.parse_args()
|
|
|
|
hour, minute = options.time.hour, options.time.minute
|
|
day, month, year = options.date.day, options.date.month, options.date.year
|
|
|
|
if month > 9:
|
|
# Add "0" in front of single-digit months
|
|
month = str(month).zfill(2)
|
|
else:
|
|
month = str(month)
|
|
|
|
hyperdia_search(options.start_station, options.end_station, hour, minute,
|
|
day, month, year, options.max_routes, via=options.via,
|
|
output_type=options.output_type,
|
|
use_shinkansen=options.no_shinkansen)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|