1
0
Fork 0
scripts/tripit_add.py
Luca Beltrame e27f33b2d0
Make sure the trip is properly merged
Clean up and make the code more readable.
2020-02-16 10:10:36 +01:00

154 lines
4.5 KiB
Python

#!/usr/bin/python3
from collections import defaultdict
from datetime import datetime
from typing import List, Tuple
import lxml.etree as etree
import lxml.builder as builder
import more_itertools as mlt
from pytz import timezone
from requests_oauthlib import OAuth1Session
import simplejson as json
E = builder.ElementMaker()
Request = E.Request
Segment = E.Segment
StartDateTime = E.StartDateTime
EndDateTime = E.EndDateTime
RailObject = E.RailObject
date_ = E.date
time_ = E.time
def get_tokens(filename: str) -> Tuple[str]:
with open(filename) as handle:
tokens = json.load(handle)
return (tokens["client_key"], tokens["client_secret"],
tokens["resource_owner_key"], tokens["resource_owner_secret"])
def add_train_segment(start_station: str,
end_station: str,
start_time: str,
end_time: str,
trip_date: str,
train: str = None,
number: int = None,
carrier: str = None) -> Segment:
start_station_name = E.start_station_name
end_station_name = E.end_station_name
train_type = E.train_type
train_number = E.train_number
carrier_name = E.carrier_name
data = []
data.append(StartDateTime(date_(trip_date), time_(start_time)))
data.append(EndDateTime(date_(trip_date), time_(end_time)))
data.append(start_station_name(start_station))
data.append(end_station_name(end_station))
if number:
data.append(train_number(number))
if carrier:
data.append(carrier_name(carrier))
if train_type:
data.append(train_type(train))
doc = Segment(*data)
return doc
def _convert_datetimes(segment: int) -> List[str]:
date_time = datetime.fromtimestamp(segment,
timezone("Japan"))
date = date_time.strftime("%Y-%m-%d")
time = date_time.strftime("%H:%M:00")
return date, time
def _parse_record(segment: dict) -> Tuple[str]:
start_station = segment["start_station"].title()
start_date, start_time = _convert_datetimes(segment["start_time"])
end_station = segment["end_station"].title()
end_date, end_time = _convert_datetimes(segment["end_time"])
train_type = segment["train_name"]
train_number = ("" if segment.get("train_number") is None
else segment["train_number"])
return (start_station, start_date, start_time, end_station, end_date,
end_time, train_type, train_number)
def parse_json(json_data: dict, carrier: str = None,
result_number: int = None) -> str:
jdoc = json_data
result = jdoc["result"]
record = result[result_number - 1]
segments = list()
iter_records = mlt.peekable(record["steps"])
previous_record = None
for segment in iter_records:
parsed = _parse_record(segment)
start_station, start_date, start_time, *rest = parsed
end_station, end_date, end_time, *rest = rest
train_type, train_number = rest
# FIXME: Impossible to extract it from the current Hyperdia data
carrier = "" if carrier is None else carrier
next_record = iter_records.peek(None)
if next_record is not None:
if segment["go_through"] and next_record["go_through"]:
# Nothing to be done here
continue
if not segment["go_through"] and next_record["go_through"]:
# Next one is going through: set the start,
# but don't add anything
previous_record = segment
continue
elif not next_record["go_through"] and segment["go_through"]:
# The next one is a "real" one, set the end, and add
# use the existing record to extract the data
parsed = _parse_record(previous_record)
start_station, start_date, start_time, *_ = parsed
*_, train_type, train_number = parsed
previous_record = None
subdoc = add_train_segment(start_station,
end_station,
start_time,
end_time,
start_date,
train=train_type,
number=train_number,
carrier=carrier)
segments.append(subdoc)
rail_object = RailObject(*segments)
request_object = Request(rail_object)
return etree.tounicode(request_object)