#!/usr/bin/env python3 # SPDX-FileCopyrightText: 2021 Luca Beltrame # SPDX-License-Identifier: BSD-3-clause import argparse from datetime import date from functools import lru_cache import logging import json from pathlib import Path import subprocess from typing import Union, Dict, List, Tuple from urllib.parse import quote import git from git.repo import Repo from jinja2 import BaseLoader, Environment import requests import sarge API_URL = "https://invent.kde.org/api/v4/projects/" OBS_URL = "https://api.opensuse.org/trigger/runservice" MATRIX_COMMANDER = "/usr/local/bin/matrix-commander.py" CREDENTIALS_FILE = "/etc/matrix-commander/credentials.json" STORE_DIR = "/var/lib/matrix-commander/store" REPO_TEMPLATE = "https://invent.kde.org/{}" MESSAGE_TEMPLATE = """ ## OBS package update report Updated at {{ date.today().strftime('%Y-%m-%d') }} {% for repository, update in repositories.items() %} ### {{ repository }} Examined {{ update | length }} packages changed upstream: {% for package, remote, state in update %} {% if state != "error" %} - {{ package }} updated to [{{ state[0:8] }}](https://commits.kde.org/{{ remote }}/{{ state }}) {% else %} - {{ package }} not updated (source service error) {% endif %} {% endfor %} {% endfor %} """ def project_exists(project: str) -> bool: # We want / to get quoted, so put safe to "" project_name = quote(project, safe="") request = requests.get(API_URL + project_name) if request: return True return False def get_remote_hash(url: str, branch: str = "master") -> str: gitcmd = git.cmd.Git() revision = gitcmd.ls_remote(url, f"refs/heads/{branch}", quiet=True, refs=True) git_hash, branch = revision.split("\t") return git_hash def trigger_update(repository: str, package_name: str, token: str) -> Union[requests.Response, bool]: header = {"Authorization": f"Token {token}"} parameters = {"project": repository, "package": package_name} logging.info("Updating package %s", package_name) result = requests.post(OBS_URL, params=parameters, headers=header) if not result: logging.error( "Error during service run, package %s, error code %s, url %s", package_name, result.status_code, result.url) return False logging.debug("Package %s complete", package_name) return result class RepoUpdater: def __init__(self, config: str, cache_file: str, token_file: str) -> None: self.cache = cache_file self.token = token_file if not Path(cache_file).exists(): logging.debug("File cache not found, not loading") self._data: Dict[str, Dict[str, str]] = dict() else: with open(cache_file) as handle: self._data = json.load(handle) with open(config, "r") as mapping: repo_data = json.load(mapping) self.config = repo_data @property def repositories(self) -> List[str]: return self._data.keys() def update_repository(self, repository) -> List[Tuple[str, str, str]]: if self._data.get(repository) is None: logging.debug("No prior data - initializing empty") self._data[repository] = dict() logging.debug("Updating list of repositories") to_update = self.get_updated_remotes(repository) if not to_update: logging.debug(f"Nothing to update for {repository}") return logging.info(f"Found {len(to_update)} updated repositories") updated = list() logging.info("Updating packages for %s", repository) for package in to_update: remote_name = self.config[repository][package]["kde"] if trigger_update(repository, package, self.token): remote_hash = to_update[package] self._data[repository][remote_name] = remote_hash self.save_cache() updated.append((package, remote_name, remote_hash)) else: updated.append((package, remote_name, "error")) return updated @lru_cache(maxsize=200) def get_updated_remotes(self, repository: str) -> Dict[str, str]: to_update = dict() repodata = self.config[repository] for repo, data in repodata.items(): kde_name = data["kde"] branch = data["branch"] url = REPO_TEMPLATE.format(kde_name) if not project_exists(kde_name): logging.warning("Repository %s not found, skipping", kde_name) continue local_hash = self._data[repository].get(kde_name, "") remote_hash = get_remote_hash(url, branch) if local_hash != remote_hash: logging.debug("Hash doesn't match, marking as changed") to_update[repo] = remote_hash return to_update def save_cache(self) -> None: logging.debug("Saving JSON cache") with open(self.cache, "w") as handle: json.dump(self._data, handle, indent=4) def notify_matrix(update_data: Dict[str, List[Tuple[str, str]]]) -> None: template = Environment(loader=BaseLoader, lstrip_blocks=True, trim_blocks=True).from_string(MESSAGE_TEMPLATE) message = template.render(repositories=update_data, date=date) cmd = [MATRIX_COMMANDER, "--markdown", "--log-level", "ERROR", "--credentials", CREDENTIALS_FILE, "--store", STORE_DIR, "-m", message] logging.debug("Sending Matrix notification") sarge.run(cmd, stdout=subprocess.DEVNULL) def commit_changes(cache_file: str, repo_home: str) -> None: repo = Repo(repo_home) repo.index.add([cache_file]) repo.index.commit("[OBS unstable update bot] Update caches") origin = repo.remotes["origin"] origin.pull(rebase=True) origin.push() def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( "--cache-file", help="Location of the cache file", default=Path.home() / ".local/share/obs_repo_cache.json") parser.add_argument( "--repo-root", help="Location of the git repository root") parser.add_argument("mapping_file", help="KDE:OBS repository mapping file") parser.add_argument("token", help="Authorization token file") parser.add_argument("--debug", help="Show debugging output", action="store_true") options = parser.parse_args() level = logging.INFO if not options.debug else logging.DEBUG logging.basicConfig(format='%(levelname)s - %(message)s', level=level) cache_file = options.cache_file with open(options.token) as handle: token = handle.read().strip() updater = RepoUpdater(options.mapping_file, cache_file, token) updated_data = dict() for repo in updater.repositories: updated = updater.update_repository(repo) updated_data[repo] = updated if options.repo_root is not None: logging.info("Committing changes") commit_changes(cache_file, options.repo_root) notify_matrix(updated_data) logging.info("Complete") if __name__ == "__main__": main()