matrix-ops-bot/ops_bot/matrix.py

182 lines
5.2 KiB
Python
Raw Permalink Normal View History

import json
import logging
import os
import pathlib
import sys
from typing import List, Optional, Protocol
from markdown import markdown
from nio import AsyncClient, AsyncClientConfig, LoginResponse
from pydantic import BaseModel
from pydantic_settings import BaseSettings
class ClientCredentials(BaseModel):
homeserver: str
user_id: str
device_id: str
access_token: str
class CredentialStorage(Protocol):
def save_config(self, config: ClientCredentials) -> None:
"""Save config"""
2026-03-05 16:08:31 +01:00
...
def read_config(self) -> ClientCredentials:
"""Load config"""
2026-03-05 16:08:31 +01:00
...
class LocalCredentialStore:
def __init__(self, config_file_path: pathlib.Path):
self.credential_file: pathlib.Path = config_file_path
def save(self, config: ClientCredentials) -> None:
with self.credential_file.open(mode="w") as f:
json.dump(config.dict(), f)
def read(self) -> ClientCredentials:
with self.credential_file.open(mode="r") as f:
return ClientCredentials(**json.load(f))
def exists(self) -> bool:
return self.credential_file.exists()
class MatrixClientSettings(BaseSettings):
homeserver: str
user_id: str
password: str
device_name: str
store_path: str
verify_ssl: Optional[bool] = True
class Config:
env_prefix = "MATRIX_"
case_sensitive = False
class MatrixClient:
2022-12-01 16:31:04 +00:00
def __init__(self, settings: MatrixClientSettings, join_rooms: List[str]):
self.settings = settings
self.store_path = pathlib.Path(settings.store_path)
2022-12-01 16:31:04 +00:00
self.join_rooms = join_rooms
self.credential_store = LocalCredentialStore(
self.store_path.joinpath("credentials.json")
)
2026-03-05 16:08:31 +01:00
self.client: Optional[AsyncClient] = None
self.client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
self.greeting_sent = False
if self.store_path and not os.path.isdir(self.store_path):
os.mkdir(self.store_path)
async def start(self) -> None:
await self.login()
2026-03-05 16:08:31 +01:00
if self.client is None:
raise RuntimeError("Matrix client failed to initialize")
2026-03-05 16:08:31 +01:00
client = self.client
if client.should_upload_keys:
await client.keys_upload()
2022-12-01 16:31:04 +00:00
for room in self.join_rooms:
2026-03-05 16:08:31 +01:00
await client.join(room)
2026-03-05 16:08:31 +01:00
await client.joined_rooms()
await client.sync_forever(timeout=300000, full_state=True)
def save_credentials(self, resp: LoginResponse, homeserver: str) -> None:
credentials = ClientCredentials(
homeserver=homeserver,
user_id=resp.user_id,
device_id=resp.device_id,
access_token=resp.access_token,
)
self.credential_store.save(credentials)
async def login_fresh(self) -> None:
self.client = AsyncClient(
homeserver=self.settings.homeserver,
user=self.settings.user_id,
store_path=str(self.settings.store_path),
config=self.client_config,
ssl=self.settings.verify_ssl,
)
response = await self.client.login(
password=self.settings.password, device_name=self.settings.device_name
)
if isinstance(response, LoginResponse):
self.save_credentials(response, self.settings.homeserver)
else:
logging.error(
f'Login for "{self.settings.user_id}" via homeserver="{self.settings.homeserver}"'
)
logging.info(f"Login failure response: {response}")
sys.exit(1)
async def login_with_credentials(self) -> None:
credentials = self.credential_store.read()
self.client = AsyncClient(
homeserver=credentials.homeserver,
user=credentials.user_id,
device_id=credentials.device_id,
store_path=str(self.store_path),
config=self.client_config,
ssl=True,
)
self.client.restore_login(
user_id=credentials.user_id,
device_id=credentials.device_id,
access_token=credentials.access_token,
)
async def login(self) -> None:
if self.credential_store.exists():
await self.login_with_credentials()
else:
await self.login_fresh()
async def room_send(
self,
room: str,
message: str,
message_formatted: Optional[str] = None,
) -> None:
2026-03-05 16:08:31 +01:00
if self.client is None:
raise RuntimeError("Matrix client failed to initialize")
content = {
"msgtype": "m.text",
"body": f"{message}",
}
if message_formatted is not None:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = markdown(
message_formatted, extensions=["extra"]
)
await self.client.room_send(
room_id=room,
message_type="m.room.message",
content=content,
ignore_unverified_devices=True,
)
async def shutdown(self) -> None:
2026-03-05 16:08:31 +01:00
if self.client is not None:
await self.client.close()