#!/usr/bin/python3 from collections import defaultdict from datetime import datetime from typing import List, Tuple import lxml.etree as etree import lxml.builder as builder import more_itertools as mlt from pytz import timezone from requests_oauthlib import OAuth1Session import simplejson as json E = builder.ElementMaker() Request = E.Request Segment = E.Segment StartDateTime = E.StartDateTime EndDateTime = E.EndDateTime RailObject = E.RailObject date_ = E.date time_ = E.time def get_tokens(filename: str) -> Tuple[str]: with open(filename) as handle: tokens = json.load(handle) return (tokens["client_key"], tokens["client_secret"], tokens["resource_owner_key"], tokens["resource_owner_secret"]) def add_train_segment(start_station: str, end_station: str, start_time: str, end_time: str, trip_date: str, train: str = None, number: int = None, carrier: str = None) -> Segment: start_station_name = E.start_station_name end_station_name = E.end_station_name train_type = E.train_type train_number = E.train_number carrier_name = E.carrier_name data = [] data.append(StartDateTime(date_(trip_date), time_(start_time))) data.append(EndDateTime(date_(trip_date), time_(end_time))) data.append(start_station_name(start_station)) data.append(end_station_name(end_station)) if number: data.append(train_number(number)) if carrier: data.append(carrier_name(carrier)) if train_type: data.append(train_type(train)) doc = Segment(*data) return doc def _convert_datetimes(segment: int) -> List[str]: date_time = datetime.fromtimestamp(segment, timezone("Japan")) date = date_time.strftime("%Y-%m-%d") time = date_time.strftime("%H:%M:00") return date, time def parse_json(json_data: dict, carrier: str = None, result_number: int = None) -> str: jdoc = json_data result = jdoc["result"] record = result[result_number - 1] segments = list() iter_records = mlt.peekable(record["steps"]) previous_record = defaultdict(None) for segment in iter_records: start_station = segment["start_station"].title() start_date, start_time = _convert_datetimes(segment["start_time"]) end_station = segment["end_station"].title() end_date, end_time = _convert_datetimes(segment["end_time"]) # FIXME: Impossible to extract it from the current Hyperdia data carrier = "" if carrier is None else carrier train_type = segment["train_name"] train_number = ("" if segment.get("train_number") is None else segment["train_number"]) next_record = iter_records.peek(None) if next_record is not None: if not segment["go_through"] and next_record["go_through"]: # Next one is going through: set the start, # but don't add anything previous_record["start_station"] = start_station previous_record["start_time"] = start_time previous_record["start_date"] = start_date previous_record["train_name"] = train_type continue elif not next_record["go_through"] and segment["go_through"]: # The next one is a "real" one, set the end, and add # the record start_station = previous_record["start_station"] start_time = previous_record["start_time"] start_date = previous_record["start_date"] train_type = previous_record["train_name"] previous_record = dict.fromkeys(previous_record.keys(), None) subdoc = add_train_segment(start_station, end_station, start_time, end_time, start_date, train=train_type, number=train_number, carrier=carrier) segments.append(subdoc) rail_object = RailObject(*segments) request_object = Request(rail_object) return etree.tounicode(request_object)