import json import logging import pathlib import sys from typing import List, Optional, Protocol from markdown import markdown from nio import AsyncClient, AsyncClientConfig, LoginResponse from pydantic import BaseModel, BaseSettings class ClientCredentials(BaseModel): homeserver: str user_id: str device_id: str access_token: str class CredentialStorage(Protocol): def save_config(self, config: ClientCredentials) -> None: """Save config""" def read_config(self) -> ClientCredentials: """Load config""" class LocalCredentialStore: def __init__(self, config_file_path: pathlib.Path): self.credential_file: pathlib.Path = config_file_path def save(self, config: ClientCredentials) -> None: with self.credential_file.open(mode="w") as f: json.dump(config.dict(), f) def read(self) -> ClientCredentials: with self.credential_file.open(mode="r") as f: return ClientCredentials(**json.load(f)) def exists(self) -> bool: return self.credential_file.exists() class MatrixClientSettings(BaseSettings): homeserver: str user_id: str password: str device_name: str store_path: str join_rooms: Optional[List[str]] verify_ssl: Optional[bool] = True class Config: env_prefix = "MATRIX_" case_sensitive = False class MatrixClient: def __init__(self, settings: MatrixClientSettings): self.settings = settings self.store_path = pathlib.Path(settings.store_path) self.credential_store = LocalCredentialStore( self.store_path.joinpath("credentials.json") ) self.client: AsyncClient = None self.client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) self.greeting_sent = False async def start(self) -> None: await self.login() if self.client.should_upload_keys: await self.client.keys_upload() if self.settings.join_rooms: for room in self.settings.join_rooms: await self.client.join(room) await self.client.joined_rooms() await self.client.sync_forever(timeout=300000, full_state=True) def save_credentials(self, resp: LoginResponse, homeserver: str) -> None: credentials = ClientCredentials( homeserver=homeserver, user_id=resp.user_id, device_id=resp.device_id, access_token=resp.access_token, ) self.credential_store.save(credentials) async def login_fresh(self) -> None: self.client = AsyncClient( self.settings.homeserver, self.settings.user_id, self.settings.store_path, config=self.client_config, ssl=self.settings.verify_ssl, ) response = await self.client.login( password=self.settings.password, device_name=self.settings.device_name ) if isinstance(response, LoginResponse): self.save_credentials(response, self.settings.homeserver) else: logging.error( f'Login for "{self.settings.user_id}" via homeserver="{self.settings.homeserver}"' ) logging.info(f"Login failure response: {response}") sys.exit(1) async def login_with_credentials(self) -> None: credentials = self.credential_store.read() self.client = AsyncClient( credentials.homeserver, credentials.user_id, device_id=credentials.device_id, store_path=self.store_path, config=self.client_config, ssl=True, ) self.client.restore_login( user_id=credentials.user_id, device_id=credentials.device_id, access_token=credentials.access_token, ) async def login(self) -> None: if self.credential_store.exists(): await self.login_with_credentials() else: await self.login_fresh() async def room_send( self, room: str, message: str, message_formatted: Optional[str] = None, ) -> None: content = { "msgtype": "m.text", "body": f"{message}", } if message_formatted is not None: content["format"] = "org.matrix.custom.html" content["formatted_body"] = markdown( message_formatted, extensions=["extra"] ) await self.client.room_send( room_id=room, message_type="m.room.message", content=content, ignore_unverified_devices=True, ) async def shutdown(self) -> None: await self.client.close()