maubot-osc/oscbot/__init__.py
Luca Beltrame 32876558ae
First working version
Only checks status for now.
2022-01-23 11:30:54 +01:00

175 lines
5.1 KiB
Python

# (C)
import asyncio
from dataclasses import dataclass
import sys
from typing import Optional, List, Type
import aiohttp
from lxml import objectify
from jinja2 import BaseLoader, Environment
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
API_URL = "https://api.opensuse.org"
HEADER_TEMPLATE = """
### Package status
{% if state %}
Only showing packages with {{ state }} state.
{% endif %}"""
REPO_TEMPLATE = """
#### {{ repo.name }} - {{ repo.arch }}
{% for package in repo.packages %}
- {{ package.name }} - *{{ package.status }}*
{% endfor %}
"""
@dataclass
class BuildResult:
name: str
status: str
@dataclass
class BuildRepository:
name: str
arch: str
packages: List[BuildResult]
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("username")
helper.copy("password")
class OSCBot(Plugin):
http: aiohttp.ClientSession
async def start(self) -> None:
await super().start()
self.config.load_and_update()
self.http = self.client.api.session
self.template = Environment(loader=BaseLoader,
lstrip_blocks=True,
trim_blocks=True)
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
async def parse_status(
self,
project: str,
package: Optional[str] = None,
state: Optional[str] = None,
repo: Optional[str] = None,
arch: Optional[str] = None) -> List[BuildRepository]:
username = self.config["username"]
password = self.config["password"]
api_call = f"{API_URL}/build/{project}/_result"
auth = aiohttp.BasicAuth(username, password)
params = {}
if package:
params["package"] = package
response = await self.http.get(api_call, auth=auth, params=params)
if response.status != 200:
self.log.error(f"Unexpected status: got {response.status}")
return []
response_text = await response.text()
parsed = objectify.fromstring(response_text)
results = list()
for child in parsed.result:
repository_name = child.get("repository")
repo_arch = child.get("arch")
if repo and repo != repository_name:
self.log.debug(f"Skipping {repository_name}, not matching")
continue
if arch and arch != repo_arch:
self.log.debug(f"Skipping {repo_arch} ({repository_name}), "
" not matching")
continue
packages = list()
for status in child.status:
package_name = status.get("package")
package_status = status.get("code")
if state and state != package_status:
self.log.debug(f"Skipping {package_name},"
f" unwanted state {package_status}")
continue
result = BuildResult(name=package_name, status=package_status)
packages.append(result)
if not packages:
continue
repository = BuildRepository(name=repository_name,
arch=repo_arch,
packages=packages)
results.append(repository)
return results
@command.new(name="osc", help="Manage the bot",
require_subcommand=True)
async def osc(self) -> None:
pass
@osc.subcommand("status", aliases=("st",),
help="Check status for package and repository")
@command.argument("project", "project name")
@command.argument("package", "package name (optional)", required=False)
@command.argument("state", "build state (optional)", required=False)
@command.argument("repository", "repository (optional)", required=False)
@command.argument("arch", "architecture state (optional)", required=False)
async def status(self, evt: MessageEvent,
project: str,
package: Optional[str] = None,
state: Optional[str] = None,
repository: Optional[str] = None,
arch: Optional[str] = None) -> None:
if state == "all":
state = None
if package == "all":
package = None
response = await self.parse_status(project, package, state=state,
repo=repository, arch=arch)
if not response:
await evt.reply("No results found.")
return
header = self.template.from_string(HEADER_TEMPLATE)
message = header.render(state=state)
await evt.reply(message, markdown=True)
for repository in response:
body = self.template.from_string(REPO_TEMPLATE)
message = body.render(repo=repository)
await evt.respond(message, markdown=True)