All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
It's easier then to check the package and what not and the full log is one click away anyway.
294 lines
10 KiB
Python
294 lines
10 KiB
Python
# SPDX-FileCopyrightText: 2022 Luca Beltrame <lbeltrame@kde.org>
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Type, Tuple
|
|
|
|
import aiohttp
|
|
import cryptocode
|
|
from lxml import objectify
|
|
from jinja2 import BaseLoader, Environment
|
|
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import command
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
|
|
|
|
HEADER_TEMPLATE = """
|
|
### Package status
|
|
|
|
{% if state %}
|
|
Only showing packages with {{ state }} state.
|
|
|
|
{% endif %}"""
|
|
|
|
REPO_TEMPLATE = """
|
|
#### {{ repo.name }} - {{ repo.arch }}
|
|
|
|
{% for package in repo.packages %}
|
|
{%if package.status != "disabled" %}
|
|
{% set build_log_url = "{0}/package/live_build_log/{1}/{2}/{3}/{4}/".format(
|
|
base_url, project, package.name, repo.name, repo.arch) %}
|
|
- {{ package.name }} - *[{{ package.status }}]({{ build_log_url }})*
|
|
{% else %}
|
|
- {{ package.name }} - *{{ package.status }}*
|
|
{% endif %}
|
|
{% endfor %}
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class BuildResult:
|
|
name: str
|
|
status: str
|
|
|
|
|
|
@dataclass
|
|
class BuildRepository:
|
|
name: str
|
|
arch: str
|
|
packages: List[BuildResult] = field(default_factory=list)
|
|
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
|
|
helper.copy("api_url")
|
|
helper.copy("instance_url")
|
|
helper.copy("rebuild_token")
|
|
helper.copy("trigger_token")
|
|
helper.copy("secret")
|
|
|
|
password = self["password"]
|
|
if len(password) < 91 and not password.endswith("=="):
|
|
encrypted_password = cryptocode.encrypt(password, self["secret"])
|
|
helper.base["password"] = encrypted_password
|
|
else:
|
|
helper.copy("password")
|
|
|
|
helper.copy("username")
|
|
helper.copy("repo_aliases")
|
|
|
|
|
|
class OSCBot(Plugin):
|
|
|
|
http: aiohttp.ClientSession
|
|
|
|
async def start(self) -> None:
|
|
await super().start()
|
|
self.config.load_and_update()
|
|
self.http = self.client.api.session
|
|
self.template = Environment(loader=BaseLoader,
|
|
lstrip_blocks=True,
|
|
trim_blocks=True)
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config
|
|
|
|
def get_alias(self, project_alias: str) -> Tuple[str, str, str, str, str]:
|
|
data = self.config["repo_aliases"][project_alias]
|
|
# There is no concept of non-positional arguments in maubot
|
|
# So we just use "all" in case we want to skip something
|
|
package = data["package"] if data["package"] != "all" else None
|
|
repository = (data["repository"] if data["repository"] != "all"
|
|
else None)
|
|
arch = data["arch"] if data["arch"] != "all" else None
|
|
project = data["project"]
|
|
state = data["state"] if data["state"] != "all" else None
|
|
|
|
return (project, package, repository, state, arch)
|
|
|
|
async def parse_rebuilpac(
|
|
self,
|
|
project: str,
|
|
package: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
arch: Optional[str] = None) -> Tuple[bool, str]:
|
|
|
|
trigger_url = f"{self.config['api_url']}/trigger/rebuild"
|
|
header = {"Authorization": f"Token {self.config['rebuild_token']}"}
|
|
params = {"project": project}
|
|
|
|
if package:
|
|
params["package"] = package
|
|
if repo:
|
|
params["repository"] = repo
|
|
if arch:
|
|
params["arch"] = arch
|
|
|
|
response = await self.http.post(trigger_url, params=params,
|
|
headers=header)
|
|
|
|
status = objectify.fromstring(await response.text())
|
|
if response.status == 200:
|
|
return (True, status.summary.text)
|
|
else:
|
|
return (False, status.summary.text)
|
|
|
|
async def parse_status(
|
|
self,
|
|
project: str,
|
|
package: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
arch: Optional[str] = None) -> List[BuildRepository]:
|
|
|
|
username = self.config["username"]
|
|
password = cryptocode.decrypt(self.config["password"],
|
|
self.config["secret"])
|
|
|
|
api_url = self.config["api_url"]
|
|
api_call = f"{api_url}/build/{project}/_result"
|
|
|
|
auth = aiohttp.BasicAuth(username, password)
|
|
|
|
params = {}
|
|
|
|
if package:
|
|
params["package"] = package
|
|
|
|
response = await self.http.get(api_call, auth=auth, params=params)
|
|
|
|
if response.status != 200:
|
|
self.log.error(f"Unexpected status: got {response.status}")
|
|
return []
|
|
|
|
response_text = await response.text()
|
|
parsed = objectify.fromstring(response_text)
|
|
|
|
results = list()
|
|
|
|
for child in parsed.result:
|
|
repository_name = child.get("repository")
|
|
repo_arch = child.get("arch")
|
|
|
|
if repo and repo != repository_name:
|
|
self.log.debug(f"Skipping {repository_name}, not matching")
|
|
continue
|
|
|
|
if arch and arch != repo_arch:
|
|
self.log.debug(f"Skipping {repo_arch} ({repository_name}), "
|
|
" not matching")
|
|
continue
|
|
|
|
packages = list()
|
|
for status in child.status:
|
|
package_name = status.get("package")
|
|
package_status = status.get("code")
|
|
|
|
if state and state != package_status:
|
|
self.log.debug(f"Skipping {package_name},"
|
|
f" unwanted state {package_status}")
|
|
continue
|
|
|
|
result = BuildResult(name=package_name, status=package_status)
|
|
packages.append(result)
|
|
|
|
if not packages:
|
|
continue
|
|
|
|
repository = BuildRepository(name=repository_name,
|
|
arch=repo_arch,
|
|
packages=packages)
|
|
results.append(repository)
|
|
|
|
return results
|
|
|
|
@command.new(name="osc", help="Manage the bot",
|
|
require_subcommand=True)
|
|
async def osc(self) -> None:
|
|
pass
|
|
|
|
@osc.subcommand(
|
|
"rebuildpac", aliases=("rb",),
|
|
help="Rebuild a package or all packages in the repositories")
|
|
@command.argument("project", "project name/alias")
|
|
@command.argument("package", "package name (or \"all\" for all packages)")
|
|
@command.argument("repository", "repository (optional)", required=False)
|
|
@command.argument("arch", "architecture (optional)", required=False)
|
|
async def rebuildpac(self, evt: MessageEvent,
|
|
project: str,
|
|
package: Optional[str] = None,
|
|
repository: Optional[str] = None,
|
|
arch: Optional[str] = None) -> None:
|
|
|
|
package = None if package == "all" else package
|
|
repository = None if repository == "all" else repository
|
|
arch = None if arch == "all" else arch
|
|
|
|
result, status = await self.parse_rebuilpac(project, package,
|
|
repository,
|
|
arch)
|
|
if not result:
|
|
message = f"Error received from OBS: {status}"
|
|
else:
|
|
message = f"Rebuild triggered {status}"
|
|
|
|
await evt.reply(message, markdown=True)
|
|
|
|
@osc.subcommand(
|
|
"servicerun",
|
|
help="Trigger source services for a package")
|
|
@command.argument("project", "project name")
|
|
@command.argument("package", "package name")
|
|
async def servicerun(self, evt: MessageEvent,
|
|
project: str,
|
|
package: str) -> None:
|
|
|
|
token = self.config["trigger_token"]
|
|
trigger_url = f"{self.config['api_url']}/trigger/runservice"
|
|
params = {"project": project, "package": package}
|
|
header = {"Authorization": f"Token {token}"}
|
|
|
|
response = await self.http.post(trigger_url, params=params,
|
|
headers=header)
|
|
|
|
status = objectify.fromstring(await response.text())
|
|
if response.status == 200:
|
|
message = f"Service triggered: {status.summary.text}"
|
|
else:
|
|
message = f"Error running service: {status.summary.text}"
|
|
await evt.reply(message, markdown=True)
|
|
|
|
@osc.subcommand("status", aliases=("st",),
|
|
help="Check status for package and repository")
|
|
@command.argument("project", "project name")
|
|
@command.argument("package", "package name | all", required=False)
|
|
@command.argument("state", "build state | all", required=False)
|
|
@command.argument("repository", "repository | all)", required=False)
|
|
@command.argument("arch", "architecture state | all", required=False)
|
|
async def status(self, evt: MessageEvent,
|
|
project: str,
|
|
package: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
repository: Optional[str] = None,
|
|
arch: Optional[str] = None) -> None:
|
|
|
|
if project in self.config["repo_aliases"]:
|
|
project, package, repository, state, arch = self.get_alias(project)
|
|
else:
|
|
# There is no concept of non-positional arguments in maubot
|
|
# So we just use "all" in case we want to skip something
|
|
package = None if package == "all" else package
|
|
repository = None if repository == "all" else repository
|
|
arch = None if arch == "all" else arch
|
|
|
|
response = await self.parse_status(project, package, state=state,
|
|
repo=repository, arch=arch)
|
|
|
|
if not response:
|
|
await evt.reply("No results found.")
|
|
return
|
|
|
|
header = self.template.from_string(HEADER_TEMPLATE)
|
|
message = header.render(state=state)
|
|
await evt.reply(message, markdown=True)
|
|
|
|
base_url = self.config["instance_url"]
|
|
|
|
for repository in response:
|
|
body = self.template.from_string(REPO_TEMPLATE)
|
|
message = body.render(repo=repository, project=project,
|
|
base_url=base_url)
|
|
await evt.respond(message, markdown=True)
|