# SPDX-FileCopyrightText: 2022 Luca Beltrame # SPDX-License-Identifier: AGPL-3.0-or-later from dataclasses import dataclass, field from time import sleep from typing import Optional, List, Type, Tuple import aiohttp import cryptocode import more_itertools as mit from lxml import objectify from jinja2 import BaseLoader, Environment from maubot import Plugin, MessageEvent from maubot.handlers import command from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper HEADER_TEMPLATE = """ ### Package status {% if state %} Only showing packages with {{ state }} state. {% endif %}""" STATUS_TEMPLATE = """ {% for package in packages %} {%if package.status != "disabled" %} {% set build_log_url = "{0}/package/live_build_log/{1}/{2}/{3}/{4}/".format( base_url, project, package.name, repo.name, repo.arch) %} - {{ package.name }} - *[{{ package.status }}]({{ build_log_url }})* {% else %} - {{ package.name }} - *{{ package.status }}* {% endif %} {% endfor %} """ ALIAS_TEMPLATE = """ ### Configured aliases {% for alias, contents in repo_aliases.items() %} #### {{ alias }} - project: {{ contents.project }} - repository: {{ contents.repository }} - package(s): {{ contents.package }} - architecture(s): {{ contents.arch }} - state: {{ contents.state }} {% endfor %} """ @dataclass class BuildResult: name: str status: str @dataclass class BuildRepository: name: str arch: str packages: List[BuildResult] = field(default_factory=list) class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("api_url") helper.copy("instance_url") helper.copy("rebuild_token") helper.copy("trigger_token") helper.copy("secret") password = self["password"] if len(password) < 91 and not password.endswith("=="): encrypted_password = cryptocode.encrypt(password, self["secret"]) helper.base["password"] = encrypted_password else: helper.copy("password") helper.copy("username") helper.copy("repo_aliases") helper.copy("acl") class OSCBot(Plugin): http: aiohttp.ClientSession async def start(self) -> None: await super().start() self.config.load_and_update() self.http = self.client.api.session self.template = Environment(loader=BaseLoader, lstrip_blocks=True, trim_blocks=True) @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config def get_alias(self, project_alias: str) -> Tuple[str, str, str, str, str]: data = self.config["repo_aliases"][project_alias] # There is no concept of non-positional arguments in maubot # So we just use "all" in case we want to skip something package = data["package"] if data["package"] != "all" else None repository = (data["repository"] if data["repository"] != "all" else None) arch = data["arch"] if data["arch"] != "all" else None project = data["project"] state = data["state"] if data["state"] != "all" else None return (project, package, repository, state, arch) def check_acl(self, user_id: str, need_admin: bool = False) -> bool: acls = self.config["acl"] if not need_admin: if user_id in acls["admin"] or acls["user"]: return True else: if user_id in acls["admin"]: return True self.log.debug(f"Denied operation as {user_id}") return False async def parse_rebuilpac( self, project: str, package: Optional[str] = None, repo: Optional[str] = None, arch: Optional[str] = None) -> Tuple[bool, str]: trigger_url = f"{self.config['api_url']}/trigger/rebuild" header = {"Authorization": f"Token {self.config['rebuild_token']}"} params = {"project": project} if package: params["package"] = package if repo: params["repository"] = repo if arch: params["arch"] = arch response = await self.http.post(trigger_url, params=params, headers=header) status = objectify.fromstring(await response.text()) if response.status == 200: return (True, status.summary.text) else: return (False, status.summary.text) async def parse_status( self, project: str, package: Optional[str] = None, state: Optional[str] = None, repo: Optional[str] = None, arch: Optional[str] = None) -> List[BuildRepository]: username = self.config["username"] password = cryptocode.decrypt(self.config["password"], self.config["secret"]) api_url = self.config["api_url"] api_call = f"{api_url}/build/{project}/_result" auth = aiohttp.BasicAuth(username, password) params = {} if package: params["package"] = package response = await self.http.get(api_call, auth=auth, params=params) if response.status != 200: self.log.error(f"Unexpected status: got {response.status}") return [] response_text = await response.text() parsed = objectify.fromstring(response_text) results = list() for child in parsed.result: repository_name = child.get("repository") repo_arch = child.get("arch") if repo and repo != repository_name: self.log.debug(f"Skipping {repository_name}, not matching") continue if arch and arch != repo_arch: self.log.debug(f"Skipping {repo_arch} ({repository_name}), " " not matching") continue packages = list() for status in child.status: package_name = status.get("package") package_status = status.get("code") if state and state != package_status: self.log.debug(f"Skipping {package_name}," f" unwanted state {package_status}") continue result = BuildResult(name=package_name, status=package_status) packages.append(result) if not packages: continue repository = BuildRepository(name=repository_name, arch=repo_arch, packages=packages) results.append(repository) return results @command.new(name="osc", help="Manage the bot", require_subcommand=True) async def osc(self) -> None: pass @osc.subcommand("alias", help="Manage aliases") async def alias(self) -> None: pass @alias.subcommand("list", help="List configured aliases") async def list_aliases(self, evt: MessageEvent) -> None: if not self.check_acl(evt.sender, need_admin=False): await evt.reply("You are not authorized to perform this action.") return body = self.template.from_string(ALIAS_TEMPLATE) if self.config.get("repo_aliases", None) is None: await evt.reply("No aliases defined.") return message = body.render(repo_aliases=self.config["repo_aliases"]) await evt.respond(message, markdown=True) @alias.subcommand("edit", help="Edit configured aliases") @command.argument("alias", "alias name") @command.argument("project", "project name") @command.argument("package", "package name", required=False) @command.argument("repository", "repository", required=False) @command.argument("arch", "architecture", required=False) @command.argument("state", "state", required=False) async def edit_alias(self, evt: MessageEvent, alias: str, project: Optional[str] = None, package: Optional[str] = None, repository: Optional[str] = None, arch: Optional[str] = None, state: Optional[str] = None) -> None: if not self.check_acl(evt.sender, need_admin=True): await evt.reply("You are not authorized to perform this action.") return if alias not in self.config["repo_aliases"]: await evt.respond(f"Unknown alias {alias}") return alias_data = self.config["repo_aliases"][alias] if project: alias_data["project"] = project if repository: alias_data["repository"] = repository if package: alias_data["package"] = package if arch: alias_data["arch"] = alias if state: alias_data["state"] = state self.config["repo_aliases"][alias] = alias_data self.config.save() await evt.reply(f"Alias {alias} successfully changed.") @alias.subcommand("create", help="Create a new alias") @command.argument("alias", "alias name") @command.argument("project", "project name") @command.argument("package", "package name", required=False) @command.argument("repository", "repository", required=False) @command.argument("arch", "architecture", required=False) @command.argument("state", "state", required=False) async def create_alias(self, evt: MessageEvent, alias: str, project: Optional[str] = None, package: Optional[str] = None, repository: Optional[str] = None, arch: Optional[str] = None, state: Optional[str] = None) -> None: if not self.check_acl(evt.sender, need_admin=True): await evt.reply("You are not authorized to perform this action.") return repository = "all" if not repository else repository package = "all" if not package else package arch = "all" if not arch else arch state = "all" if not state else state alias_data = { "project": project, "package": package, "repository": repository, "arch": arch, "state": state } self.config["repo_aliases"][alias] = alias_data self.config.save() await evt.reply(f"Alias {alias} successfully created.") @alias.subcommand("delete", help="Delete an alias", aliases=("rm", )) @command.argument("alias", "alias name") async def delete_alias(self, evt: MessageEvent, alias: str): if not self.check_acl(evt.sender, need_admin=True): await evt.reply("You are not authorized to perform this action.") return if alias not in self.config["repo_aliases"]: await evt.respond(f"Unknown alias {alias}") return del self.config["repo_aliases"][alias] self.config.save() await evt.reply(f"Alias {alias} successfully deleted.") @osc.subcommand( "rebuildpac", aliases=("rb",), help="Rebuild a package or all packages in the repositories") @command.argument("project", "project name/alias") @command.argument("package", "package name (or \"all\" for all packages)") @command.argument("repository", "repository (optional)", required=False) @command.argument("arch", "architecture (optional)", required=False) async def rebuildpac(self, evt: MessageEvent, project: str, package: Optional[str] = None, repository: Optional[str] = None, arch: Optional[str] = None) -> None: if not self.check_acl(evt.sender, need_admin=True): await evt.reply("You are not authorized to perform this action.") return if project in self.config["repo_aliases"]: # We're not interested in state and we query package explicitly project, _, repository, _, arch = self.get_alias(project) else: repository = None if repository == "all" else repository arch = None if arch == "all" else arch package = None if package == "all" else package result, status = await self.parse_rebuilpac(project, package, repository, arch) if not result: message = f"Error received from OBS: {status}" else: message = f"Rebuild triggered {status}" await evt.reply(message, markdown=True) @osc.subcommand( "servicerun", help="Trigger source services for a package") @command.argument("project", "project name") @command.argument("package", "package name") async def servicerun(self, evt: MessageEvent, project: str, package: str) -> None: if not self.check_acl(evt.sender, need_admin=True): await evt.reply("You are not authorized to perform this action.") return token = self.config["trigger_token"] trigger_url = f"{self.config['api_url']}/trigger/runservice" params = {"project": project, "package": package} header = {"Authorization": f"Token {token}"} response = await self.http.post(trigger_url, params=params, headers=header) status = objectify.fromstring(await response.text()) if response.status == 200: message = f"Service triggered: {status.summary.text}" else: message = f"Error running service: {status.summary.text}" await evt.reply(message, markdown=True) @osc.subcommand("status", aliases=("st",), help="Check status for package and repository") @command.argument("project", "project name") @command.argument("package", "package name | all", required=False) @command.argument("state", "build state | all", required=False) @command.argument("repository", "repository | all)", required=False) @command.argument("arch", "architecture state | all", required=False) async def status(self, evt: MessageEvent, project: str, package: Optional[str] = None, state: Optional[str] = None, repository: Optional[str] = None, arch: Optional[str] = None) -> None: if not self.check_acl(evt.sender, need_admin=False): await evt.reply("You are not authorized to perform this action.") return if project in self.config["repo_aliases"]: project, package, repository, state, arch = self.get_alias(project) else: # There is no concept of non-positional arguments in maubot # So we just use "all" in case we want to skip something package = None if package == "all" else package repository = None if repository == "all" else repository arch = None if arch == "all" else arch response = await self.parse_status(project, package, state=state, repo=repository, arch=arch) if not response: await evt.reply("No results found.") return header = self.template.from_string(HEADER_TEMPLATE) message = header.render(state=state) await evt.reply(message, markdown=True) base_url = self.config["instance_url"] if len(response) > 50: await evt.respond(f"Too many repositories ({len(response)})" f"to display in project {project}, " "please narrow down your search.") return for repository in response: await evt.respond(f"#### {repository.name} - {repository.arch}", markdown=True) # Just so that someone doesn't flood by listing the whole # openSUSE:Factory if len(repository.packages) > 1000: await evt.respond( f"Too many packages ({len(repository.packages)}) " "to display, please narrow down your search.") continue # To avoid creating too large messages, we chunk packages in # groups of 100 for packagelist in mit.chunked(repository.packages, 100): body = self.template.from_string(STATUS_TEMPLATE) message = body.render(packages=packagelist, base_url=base_url, project=project, repo=repository) await evt.respond(message, markdown=True) # Wait 200 milliseconds to avoid triggering rate limiting sleep(0.2)