matrix-ops-bot/ops_bot/matrix.py

165 lines
4.7 KiB
Python

import json
import logging
import pathlib
import sys
from typing import List, Optional, Protocol
from markdown import markdown
from nio import AsyncClient, AsyncClientConfig, LoginResponse
from pydantic import BaseModel, BaseSettings
class ClientCredentials(BaseModel):
homeserver: str
user_id: str
device_id: str
access_token: str
class CredentialStorage(Protocol):
def save_config(self, config: ClientCredentials) -> None:
"""Save config"""
def read_config(self) -> ClientCredentials:
"""Load config"""
class LocalCredentialStore:
def __init__(self, config_file_path: pathlib.Path):
self.credential_file: pathlib.Path = config_file_path
def save(self, config: ClientCredentials) -> None:
with self.credential_file.open(mode="w") as f:
json.dump(config.dict(), f)
def read(self) -> ClientCredentials:
with self.credential_file.open(mode="r") as f:
return ClientCredentials(**json.load(f))
def exists(self) -> bool:
return self.credential_file.exists()
class MatrixClientSettings(BaseSettings):
homeserver: str
user_id: str
password: str
device_name: str
store_path: str
verify_ssl: Optional[bool] = True
class Config:
env_prefix = "MATRIX_"
case_sensitive = False
class MatrixClient:
def __init__(self, settings: MatrixClientSettings, join_rooms: List[str]):
self.settings = settings
self.store_path = pathlib.Path(settings.store_path)
self.join_rooms = join_rooms
self.credential_store = LocalCredentialStore(
self.store_path.joinpath("credentials.json")
)
self.client: AsyncClient = None
self.client_config = AsyncClientConfig(
max_limit_exceeded=0,
max_timeouts=0,
store_sync_tokens=True,
encryption_enabled=True,
)
self.greeting_sent = False
async def start(self) -> None:
await self.login()
if self.client.should_upload_keys:
await self.client.keys_upload()
for room in self.join_rooms:
await self.client.join(room)
await self.client.joined_rooms()
await self.client.sync_forever(timeout=300000, full_state=True)
def save_credentials(self, resp: LoginResponse, homeserver: str) -> None:
credentials = ClientCredentials(
homeserver=homeserver,
user_id=resp.user_id,
device_id=resp.device_id,
access_token=resp.access_token,
)
self.credential_store.save(credentials)
async def login_fresh(self) -> None:
self.client = AsyncClient(
self.settings.homeserver,
self.settings.user_id,
self.settings.store_path,
config=self.client_config,
ssl=self.settings.verify_ssl,
)
response = await self.client.login(
password=self.settings.password, device_name=self.settings.device_name
)
if isinstance(response, LoginResponse):
self.save_credentials(response, self.settings.homeserver)
else:
logging.error(
f'Login for "{self.settings.user_id}" via homeserver="{self.settings.homeserver}"'
)
logging.info(f"Login failure response: {response}")
sys.exit(1)
async def login_with_credentials(self) -> None:
credentials = self.credential_store.read()
self.client = AsyncClient(
credentials.homeserver,
credentials.user_id,
device_id=credentials.device_id,
store_path=self.store_path,
config=self.client_config,
ssl=True,
)
self.client.restore_login(
user_id=credentials.user_id,
device_id=credentials.device_id,
access_token=credentials.access_token,
)
async def login(self) -> None:
if self.credential_store.exists():
await self.login_with_credentials()
else:
await self.login_fresh()
async def room_send(
self,
room: str,
message: str,
message_formatted: Optional[str] = None,
) -> None:
content = {
"msgtype": "m.text",
"body": f"{message}",
}
if message_formatted is not None:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = markdown(
message_formatted, extensions=["extra"]
)
await self.client.room_send(
room_id=room,
message_type="m.room.message",
content=content,
ignore_unverified_devices=True,
)
async def shutdown(self) -> None:
await self.client.close()