maubot-osc/oscbot/__init__.py
Luca Beltrame 2c5fbcc2fd
Encrypt the OBS password on first run
This makes sure it is at least not stored in plaintext somewhere.
2022-01-29 09:52:08 +01:00

294 lines
10 KiB
Python

# SPDX-FileCopyrightText: 2022 Luca Beltrame <lbeltrame@kde.org>
# SPDX-License-Identifier: AGPL-3.0-or-later
from dataclasses import dataclass, field
from typing import Optional, List, Type, Tuple
import aiohttp
import cryptocode
from lxml import objectify
from jinja2 import BaseLoader, Environment
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
HEADER_TEMPLATE = """
### Package status
{% if state %}
Only showing packages with {{ state }} state.
{% endif %}"""
REPO_TEMPLATE = """
#### {{ repo.name }} - {{ repo.arch }}
{% for package in repo.packages %}
{%if package.status != "disabled" %}
{% set build_log_url = "{0}/public/build/{1}/{2}/{3}/{4}/_log".format(
base_url,project, repo.name, repo.arch, package.name) %}
- {{ package.name }} - *[{{ package.status }}]({{ build_log_url }})*
{% else %}
- {{ package.name }} - *{{ package.status }}*
{% endif %}
{% endfor %}
"""
@dataclass
class BuildResult:
name: str
status: str
@dataclass
class BuildRepository:
name: str
arch: str
packages: List[BuildResult] = field(default_factory=list)
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("api_url")
helper.copy("instance_url")
helper.copy("rebuild_token")
helper.copy("trigger_token")
helper.copy("secret")
password = self["password"]
if len(password) < 91 and not password.endswith("=="):
encrypted_password = cryptocode.encrypt(password, self["secret"])
helper.base["password"] = encrypted_password
else:
helper.copy("password")
helper.copy("username")
helper.copy("repo_aliases")
class OSCBot(Plugin):
http: aiohttp.ClientSession
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.http = self.client.api.session
self.template = Environment(loader=BaseLoader,
lstrip_blocks=True,
trim_blocks=True)
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
def get_alias(self, project_alias: str) -> Tuple[str, str, str, str, str]:
data = self.config["repo_aliases"][project_alias]
# There is no concept of non-positional arguments in maubot
# So we just use "all" in case we want to skip something
package = data["package"] if data["package"] != "all" else None
repository = (data["repository"] if data["repository"] != "all"
else None)
arch = data["arch"] if data["arch"] != "all" else None
project = data["project"]
state = data["state"] if data["state"] != "all" else None
return (project, package, repository, state, arch)
async def parse_rebuilpac(
self,
project: str,
package: Optional[str] = None,
repo: Optional[str] = None,
arch: Optional[str] = None) -> Tuple[bool, str]:
trigger_url = f"{self.config['api_url']}/trigger/rebuild"
header = {"Authorization": f"Token {self.config['rebuild_token']}"}
params = {"project": project}
if package:
params["package"] = package
if repo:
params["repository"] = repo
if arch:
params["arch"] = arch
response = await self.http.post(trigger_url, params=params,
headers=header)
status = objectify.fromstring(await response.text())
if response.status == 200:
return (True, status.summary.text)
else:
return (False, status.summary.text)
async def parse_status(
self,
project: str,
package: Optional[str] = None,
state: Optional[str] = None,
repo: Optional[str] = None,
arch: Optional[str] = None) -> List[BuildRepository]:
username = self.config["username"]
password = cryptocode.decrypt(self.config["password"],
self.config["secret"])
api_url = self.config["api_url"]
api_call = f"{api_url}/build/{project}/_result"
auth = aiohttp.BasicAuth(username, password)
params = {}
if package:
params["package"] = package
response = await self.http.get(api_call, auth=auth, params=params)
if response.status != 200:
self.log.error(f"Unexpected status: got {response.status}")
return []
response_text = await response.text()
parsed = objectify.fromstring(response_text)
results = list()
for child in parsed.result:
repository_name = child.get("repository")
repo_arch = child.get("arch")
if repo and repo != repository_name:
self.log.debug(f"Skipping {repository_name}, not matching")
continue
if arch and arch != repo_arch:
self.log.debug(f"Skipping {repo_arch} ({repository_name}), "
" not matching")
continue
packages = list()
for status in child.status:
package_name = status.get("package")
package_status = status.get("code")
if state and state != package_status:
self.log.debug(f"Skipping {package_name},"
f" unwanted state {package_status}")
continue
result = BuildResult(name=package_name, status=package_status)
packages.append(result)
if not packages:
continue
repository = BuildRepository(name=repository_name,
arch=repo_arch,
packages=packages)
results.append(repository)
return results
@command.new(name="osc", help="Manage the bot",
require_subcommand=True)
async def osc(self) -> None:
pass
@osc.subcommand(
"rebuildpac", aliases=("rb",),
help="Rebuild a package or all packages in the repositories")
@command.argument("project", "project name/alias")
@command.argument("package", "package name (or \"all\" for all packages)")
@command.argument("repository", "repository (optional)", required=False)
@command.argument("arch", "architecture (optional)", required=False)
async def rebuildpac(self, evt: MessageEvent,
project: str,
package: Optional[str] = None,
repository: Optional[str] = None,
arch: Optional[str] = None) -> None:
package = None if package == "all" else package
repository = None if repository == "all" else repository
arch = None if arch == "all" else arch
result, status = await self.parse_rebuilpac(project, package,
repository,
arch)
if not result:
message = f"Error received from OBS: {status}"
else:
message = f"Rebuild triggered {status}"
await evt.reply(message, markdown=True)
@osc.subcommand(
"servicerun",
help="Trigger source services for a package")
@command.argument("project", "project name")
@command.argument("package", "package name")
async def servicerun(self, evt: MessageEvent,
project: str,
package: str) -> None:
token = self.config["trigger_token"]
trigger_url = f"{self.config['api_url']}/trigger/runservice"
params = {"project": project, "package": package}
header = {"Authorization": f"Token {token}"}
response = await self.http.post(trigger_url, params=params,
headers=header)
status = objectify.fromstring(await response.text())
if response.status == 200:
message = f"Service triggered: {status.summary.text}"
else:
message = f"Error running service: {status.summary.text}"
await evt.reply(message, markdown=True)
@osc.subcommand("status", aliases=("st",),
help="Check status for package and repository")
@command.argument("project", "project name")
@command.argument("package", "package name | all", required=False)
@command.argument("state", "build state | all", required=False)
@command.argument("repository", "repository | all)", required=False)
@command.argument("arch", "architecture state | all", required=False)
async def status(self, evt: MessageEvent,
project: str,
package: Optional[str] = None,
state: Optional[str] = None,
repository: Optional[str] = None,
arch: Optional[str] = None) -> None:
if project in self.config["repo_aliases"]:
project, package, repository, state, arch = self.get_alias(project)
else:
# There is no concept of non-positional arguments in maubot
# So we just use "all" in case we want to skip something
package = None if package == "all" else package
repository = None if repository == "all" else repository
arch = None if arch == "all" else arch
response = await self.parse_status(project, package, state=state,
repo=repository, arch=arch)
if not response:
await evt.reply("No results found.")
return
header = self.template.from_string(HEADER_TEMPLATE)
message = header.render(state=state)
await evt.reply(message, markdown=True)
base_url = self.config["instance_url"]
for repository in response:
body = self.template.from_string(REPO_TEMPLATE)
message = body.render(repo=repository, project=project,
base_url=base_url)
await evt.respond(message, markdown=True)