198 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			198 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
#!/usr/bin/python3
 | 
						|
 | 
						|
# SPDX-FileCopyrightText: 2021 Luca Beltrame <lbeltrame@kde.org>
 | 
						|
#
 | 
						|
# SPDX-License-Identifier: BSD-3-Clause
 | 
						|
 | 
						|
import argparse
 | 
						|
from collections import defaultdict
 | 
						|
from datetime import datetime
 | 
						|
from typing import List, Tuple
 | 
						|
import sys
 | 
						|
 | 
						|
import lxml.etree as etree
 | 
						|
import lxml.builder as builder
 | 
						|
import more_itertools as mlt
 | 
						|
from pytz import timezone
 | 
						|
from requests_oauthlib import OAuth1Session
 | 
						|
import simplejson as json
 | 
						|
 | 
						|
E = builder.ElementMaker()
 | 
						|
 | 
						|
Request = E.Request
 | 
						|
Segment = E.Segment
 | 
						|
StartDateTime = E.StartDateTime
 | 
						|
EndDateTime = E.EndDateTime
 | 
						|
RailObject = E.RailObject
 | 
						|
date_ = E.date
 | 
						|
time_ = E.time
 | 
						|
 | 
						|
TRIPIT_CREATE_URL = "https://api.tripit.com/v1/create"
 | 
						|
TOKEN = "tripit.credentials.json"
 | 
						|
 | 
						|
 | 
						|
def get_tokens(filename: str) -> Tuple[str]:
 | 
						|
 | 
						|
    with open(filename) as handle:
 | 
						|
        tokens = json.load(handle)
 | 
						|
 | 
						|
    return (tokens["client_key"], tokens["client_secret"],
 | 
						|
            tokens["resource_owner_key"], tokens["resource_owner_secret"])
 | 
						|
 | 
						|
 | 
						|
def add_train_segment(start_station: str,
 | 
						|
                      end_station: str,
 | 
						|
                      start_time: str,
 | 
						|
                      end_time: str,
 | 
						|
                      trip_date: str,
 | 
						|
                      train: str = None,
 | 
						|
                      number: int = None,
 | 
						|
                      carrier: str = None) -> Segment:
 | 
						|
 | 
						|
    start_station_name = E.start_station_name
 | 
						|
    end_station_name = E.end_station_name
 | 
						|
    train_type = E.train_type
 | 
						|
    train_number = E.train_number
 | 
						|
    carrier_name = E.carrier_name
 | 
						|
 | 
						|
    data = []
 | 
						|
 | 
						|
    data.append(StartDateTime(date_(trip_date), time_(start_time)))
 | 
						|
    data.append(EndDateTime(date_(trip_date), time_(end_time)))
 | 
						|
    data.append(start_station_name(start_station))
 | 
						|
    data.append(end_station_name(end_station))
 | 
						|
 | 
						|
    if number:
 | 
						|
        data.append(train_number(number))
 | 
						|
 | 
						|
    if carrier:
 | 
						|
        data.append(carrier_name(carrier))
 | 
						|
 | 
						|
    if train_type:
 | 
						|
        data.append(train_type(train))
 | 
						|
 | 
						|
    doc = Segment(*data)
 | 
						|
 | 
						|
    return doc
 | 
						|
 | 
						|
 | 
						|
def _convert_datetimes(segment: int) -> List[str]:
 | 
						|
 | 
						|
    date_time = datetime.fromtimestamp(segment,
 | 
						|
                                       timezone("Japan"))
 | 
						|
 | 
						|
    date = date_time.strftime("%Y-%m-%d")
 | 
						|
    time = date_time.strftime("%H:%M:00")
 | 
						|
 | 
						|
    return date, time
 | 
						|
 | 
						|
 | 
						|
def _parse_record(segment: dict) -> Tuple[str]:
 | 
						|
 | 
						|
    start_station = segment["start_station"].title()
 | 
						|
    start_date, start_time = _convert_datetimes(segment["start_time"])
 | 
						|
 | 
						|
    end_station = segment["end_station"].title()
 | 
						|
    end_date, end_time = _convert_datetimes(segment["end_time"])
 | 
						|
    train_type = segment["train_name"]
 | 
						|
    train_number = ("" if segment.get("train_number") is None
 | 
						|
                    else segment["train_number"])
 | 
						|
 | 
						|
    return (start_station, start_date, start_time, end_station, end_date,
 | 
						|
            end_time, train_type, train_number)
 | 
						|
 | 
						|
 | 
						|
def parse_json(json_data: dict, carrier: str = None,
 | 
						|
               result_number: int = None) -> str:
 | 
						|
 | 
						|
    jdoc = json_data
 | 
						|
 | 
						|
    result = jdoc["result"]
 | 
						|
    record = result[result_number - 1]
 | 
						|
    segments = list()
 | 
						|
 | 
						|
    iter_records = mlt.peekable(record["steps"])
 | 
						|
 | 
						|
    previous_record = None
 | 
						|
 | 
						|
    for segment in iter_records:
 | 
						|
 | 
						|
        parsed = _parse_record(segment)
 | 
						|
 | 
						|
        start_station, start_date, start_time, *rest = parsed
 | 
						|
        end_station, end_date, end_time, *rest = rest
 | 
						|
        train_type, train_number = rest
 | 
						|
 | 
						|
        # FIXME: Impossible to extract it from the current Hyperdia data
 | 
						|
        carrier = "" if carrier is None else carrier
 | 
						|
        next_record = iter_records.peek(None)
 | 
						|
 | 
						|
        if next_record is not None:
 | 
						|
 | 
						|
            if segment["go_through"] and next_record["go_through"]:
 | 
						|
                # Nothing to be done here
 | 
						|
                continue
 | 
						|
 | 
						|
            if not segment["go_through"] and next_record["go_through"]:
 | 
						|
                # Next one is going through: set the start,
 | 
						|
                # but don't add anything
 | 
						|
                previous_record = segment
 | 
						|
                continue
 | 
						|
 | 
						|
            elif not next_record["go_through"] and segment["go_through"]:
 | 
						|
                # The next one is a "real" one, set the end, and add
 | 
						|
                # use the existing record to extract the data
 | 
						|
                parsed = _parse_record(previous_record)
 | 
						|
                start_station, start_date, start_time, *_ = parsed
 | 
						|
                *_, train_type, train_number = parsed
 | 
						|
                previous_record = None
 | 
						|
 | 
						|
        subdoc = add_train_segment(start_station,
 | 
						|
                                   end_station,
 | 
						|
                                   start_time,
 | 
						|
                                   end_time,
 | 
						|
                                   start_date,
 | 
						|
                                   train=train_type,
 | 
						|
                                   number=train_number,
 | 
						|
                                   carrier=carrier)
 | 
						|
        segments.append(subdoc)
 | 
						|
 | 
						|
    rail_object = RailObject(*segments)
 | 
						|
    request_object = Request(rail_object)
 | 
						|
 | 
						|
    return etree.tounicode(request_object)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
 | 
						|
    parser.add_argument("-c", "--carrier", default="Japan Rail",
 | 
						|
                        help="Carrier name")
 | 
						|
    parser.add_argument("-n", "--number", default=1, type=int,
 | 
						|
                        help="Route number to insert into Tripit")
 | 
						|
    parser.add_argument("source", type=argparse.FileType("r"),
 | 
						|
                        default=sys.stdin, nargs="?",
 | 
						|
                        help="JSON file produced by hyperdia.py")
 | 
						|
 | 
						|
    options = parser.parse_args()
 | 
						|
 | 
						|
    json_data = json.load(options.source)
 | 
						|
 | 
						|
    client_key, client_secret, resource_key, resource_secret = get_tokens(
 | 
						|
        TOKEN)
 | 
						|
 | 
						|
    session = OAuth1Session(client_key, client_secret, resource_key,
 | 
						|
                            resource_secret)
 | 
						|
 | 
						|
    result = parse_json(json_data, carrier=options.carrier,
 | 
						|
                        result_number=options.number)
 | 
						|
 | 
						|
    response = session.post(TRIPIT_CREATE_URL, data={"xml": result})
 | 
						|
 | 
						|
    if response.ok:
 | 
						|
        print("Trip added to Tripit")
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |