# (C) import asyncio from dataclasses import dataclass import sys from typing import Optional, List, Type import aiohttp from lxml import objectify from jinja2 import BaseLoader, Environment from maubot import Plugin, MessageEvent from maubot.handlers import command from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper HEADER_TEMPLATE = """ ### Package status {% if state %} Only showing packages with {{ state }} state. {% endif %}""" REPO_TEMPLATE = """ #### {{ repo.name }} - {{ repo.arch }} {% for package in repo.packages %} - {{ package.name }} - *{{ package.status }}* {% endfor %} """ @dataclass class BuildResult: name: str status: str @dataclass class BuildRepository: name: str arch: str packages: List[BuildResult] class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("username") helper.copy("password") class OSCBot(Plugin): http: aiohttp.ClientSession async def start(self) -> None: await super().start() self.config.load_and_update() self.http = self.client.api.session self.template = Environment(loader=BaseLoader, lstrip_blocks=True, trim_blocks=True) @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config async def parse_status( self, project: str, package: Optional[str] = None, state: Optional[str] = None, repo: Optional[str] = None, arch: Optional[str] = None) -> List[BuildRepository]: username = self.config["username"] password = self.config["password"] api_url = self.config["api_url"] api_call = f"{api_url}/build/{project}/_result" auth = aiohttp.BasicAuth(username, password) params = {} if package: params["package"] = package response = await self.http.get(api_call, auth=auth, params=params) if response.status != 200: self.log.error(f"Unexpected status: got {response.status}") return [] response_text = await response.text() parsed = objectify.fromstring(response_text) results = list() for child in parsed.result: repository_name = child.get("repository") repo_arch = child.get("arch") if repo and repo != repository_name: self.log.debug(f"Skipping {repository_name}, not matching") continue if arch and arch != repo_arch: self.log.debug(f"Skipping {repo_arch} ({repository_name}), " " not matching") continue packages = list() for status in child.status: package_name = status.get("package") package_status = status.get("code") if state and state != package_status: self.log.debug(f"Skipping {package_name}," f" unwanted state {package_status}") continue result = BuildResult(name=package_name, status=package_status) packages.append(result) if not packages: continue repository = BuildRepository(name=repository_name, arch=repo_arch, packages=packages) results.append(repository) return results @command.new(name="osc", help="Manage the bot", require_subcommand=True) async def osc(self) -> None: pass @osc.subcommand("status", aliases=("st",), help="Check status for package and repository") @command.argument("project", "project name") @command.argument("package", "package name (optional)", required=False) @command.argument("state", "build state (optional)", required=False) @command.argument("repository", "repository (optional)", required=False) @command.argument("arch", "architecture state (optional)", required=False) async def status(self, evt: MessageEvent, project: str, package: Optional[str] = None, state: Optional[str] = None, repository: Optional[str] = None, arch: Optional[str] = None) -> None: # There is no concept of non-positional arguments in maubot # So we just use "all" in case we want to skip something if state == "all": state = None if package == "all": package = None if repository == "all": repository = None if arch == "all": arch = None response = await self.parse_status(project, package, state=state, repo=repository, arch=arch) if not response: await evt.reply("No results found.") return header = self.template.from_string(HEADER_TEMPLATE) message = header.render(state=state) await evt.reply(message, markdown=True) for repository in response: body = self.template.from_string(REPO_TEMPLATE) message = body.render(repo=repository) await evt.respond(message, markdown=True)