import json import logging import os import pathlib import sys from typing import List, Optional, Protocol from markdown import markdown from nio import AsyncClient, AsyncClientConfig, LoginResponse from pydantic import BaseModel from pydantic_settings import BaseSettings class ClientCredentials(BaseModel): homeserver: str user_id: str device_id: str access_token: str class CredentialStorage(Protocol): def save_config(self, config: ClientCredentials) -> None: """Save config""" ... def read_config(self) -> ClientCredentials: """Load config""" ... class LocalCredentialStore: def __init__(self, config_file_path: pathlib.Path): self.credential_file: pathlib.Path = config_file_path def save(self, config: ClientCredentials) -> None: with self.credential_file.open(mode="w") as f: json.dump(config.dict(), f) def read(self) -> ClientCredentials: with self.credential_file.open(mode="r") as f: return ClientCredentials(**json.load(f)) def exists(self) -> bool: return self.credential_file.exists() class MatrixClientSettings(BaseSettings): homeserver: str user_id: str password: str device_name: str store_path: str verify_ssl: Optional[bool] = True class Config: env_prefix = "MATRIX_" case_sensitive = False class MatrixClient: def __init__(self, settings: MatrixClientSettings, join_rooms: List[str]): self.settings = settings self.store_path = pathlib.Path(settings.store_path) self.join_rooms = join_rooms self.credential_store = LocalCredentialStore( self.store_path.joinpath("credentials.json") ) self.client: Optional[AsyncClient] = None self.client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.greeting_sent = False if self.store_path and not os.path.isdir(self.store_path): os.mkdir(self.store_path) async def start(self) -> None: await self.login() if self.client is None: raise RuntimeError("Matrix client failed to initialize") client = self.client if client.should_upload_keys: await client.keys_upload() for room in self.join_rooms: await client.join(room) await client.joined_rooms() await client.sync_forever(timeout=300000, full_state=True) def save_credentials(self, resp: LoginResponse, homeserver: str) -> None: credentials = ClientCredentials( homeserver=homeserver, user_id=resp.user_id, device_id=resp.device_id, access_token=resp.access_token, ) self.credential_store.save(credentials) async def login_fresh(self) -> None: self.client = AsyncClient( homeserver=self.settings.homeserver, user=self.settings.user_id, store_path=str(self.settings.store_path), config=self.client_config, ssl=self.settings.verify_ssl, ) response = await self.client.login( password=self.settings.password, device_name=self.settings.device_name ) if isinstance(response, LoginResponse): self.save_credentials(response, self.settings.homeserver) else: logging.error( f'Login for "{self.settings.user_id}" via homeserver="{self.settings.homeserver}"' ) logging.info(f"Login failure response: {response}") sys.exit(1) async def login_with_credentials(self) -> None: credentials = self.credential_store.read() self.client = AsyncClient( homeserver=credentials.homeserver, user=credentials.user_id, device_id=credentials.device_id, store_path=str(self.store_path), config=self.client_config, ssl=True, ) self.client.restore_login( user_id=credentials.user_id, device_id=credentials.device_id, access_token=credentials.access_token, ) async def login(self) -> None: if self.credential_store.exists(): await self.login_with_credentials() else: await self.login_fresh() async def room_send( self, room: str, message: str, message_formatted: Optional[str] = None, ) -> None: if self.client is None: raise RuntimeError("Matrix client failed to initialize") content = { "msgtype": "m.text", "body": f"{message}", } if message_formatted is not None: content["format"] = "org.matrix.custom.html" content["formatted_body"] = markdown( message_formatted, extensions=["extra"] ) await self.client.room_send( room_id=room, message_type="m.room.message", content=content, ignore_unverified_devices=True, ) async def shutdown(self) -> None: if self.client is not None: await self.client.close()