import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { promises as fs } from "node:fs"; import { SignaldAPI, SendResponsev1, IncomingMessagev1, ClientMessageWrapperv1, } from "@digiresilience/node-signald"; import { SavedSignalBot as Bot } from "@digiresilience/metamigo-db"; import workerUtils from "../../worker-utils.js"; export default class SignaldService extends Service { signald: SignaldAPI; subscriptions: Set; constructor(server: Server, options: never) { super(server, options); if (this.server.config().signald.enabled) { this.signald = new SignaldAPI(); this.signald.setLogger((level, msg, extra?) => { this.server.logger[level]({ extra }, msg); }); this.subscriptions = new Set(); } } async initialize(): Promise { if (this.server.config().signald.enabled && this.signald) { this.setupListeners(); this.connect(); } } async teardown(): Promise { if (this.server.config().signald.enabled && this.signald) this.signald.disconnect(); } private connect() { const { enabled, socket } = this.server.config().signald; if (!enabled) return; this.signald.connectWithBackoff(socket); } private async onConnected() { await this.subscribeAll(); } private setupListeners() { this.signald.on("transport_error", async (error) => { this.server.logger.info({ error }, "signald transport error"); }); this.signald.on("transport_connected", async () => { this.onConnected(); }); this.signald.on( "transport_received_payload", async (payload: ClientMessageWrapperv1) => { this.server.logger.debug({ payload }, "signald payload received"); if (payload.type === "IncomingMessage") { this.receiveMessage(payload.data); } } ); this.signald.on("transport_sent_payload", async (payload) => { this.server.logger.debug({ payload }, "signald payload sent"); }); } private async subscribeAll() { const result = await this.signald.listAccounts(); const accounts = result.accounts.map((account) => account.address.number); await Promise.all( accounts.map(async (account) => { await this.signald.subscribe(account); this.subscriptions.add(account); }) ); } private async unsubscribeAll() { await Promise.all( [...this.subscriptions].map(async (account) => { await this.signald.unsubscribe(account); this.subscriptions.delete(account); }) ); } async create( phoneNumber: string, description: string, email: string ): Promise { const db = this.server.db(); const user = await db.users.findBy({ email }); const row = await db.signalBots.insert({ phoneNumber, description, userId: user.id, }); return row; } async findAll(): Promise { const db = this.server.db(); return db.signalBots.findAll(); } async findById(id: string): Promise { const db = this.server.db(); return db.signalBots.findById({ id }); } async findByToken(token: string): Promise { const db = this.server.db(); return db.signalBots.findBy({ token }); } async register(bot: Bot, code: string): Promise { const address = await this.signald.verify(bot.phoneNumber, code); this.server.db().signalBots.updateAuthInfo(bot, address.address.uuid); } async send( bot: Bot, phoneNumber: string, message: string ): Promise { this.server.logger.debug( { us: bot.phoneNumber, them: phoneNumber, message }, "signald send" ); return this.signald.send( bot.phoneNumber, { number: phoneNumber }, undefined, message ); } async resetSession(bot: Bot, phoneNumber: string): Promise { return this.signald.resetSession(bot.phoneNumber, { number: phoneNumber, }); } async requestVoiceVerification(bot: Bot, captcha?: string): Promise { this.server.logger.debug( { number: bot.phoneNumber, captcha }, "requesting voice verification for" ); await this.signald.register(bot.phoneNumber, true, captcha); } async requestSMSVerification(bot: Bot, captcha?: string): Promise { this.server.logger.debug( { number: bot.phoneNumber, captcha }, "requesting sms verification for" ); await this.signald.register(bot.phoneNumber, false, captcha); } private async receiveMessage(message: IncomingMessagev1) { const { account } = message; if (!account) { this.server.logger.debug({ message }, "invalid message received"); this.server.logger.error("invalid message received"); } const bot = await this.server .db() .signalBots.findBy({ phoneNumber: account }); if (!bot) { this.server.logger.info("message received for unknown bot", { account, message, }); return; } await this.queueMessage(bot, message); } private async getAttachmentInfo(dataMessage: IncomingMessagev1) { if (dataMessage.attachments?.length > 0) { const attachmentInfo = dataMessage.attachments[0]; const buffer = await fs.readFile(attachmentInfo.storedFilename); const attachment = buffer.toString("base64"); const mimetype = attachmentInfo.contentType ?? "application/octet-stream"; const filename = attachmentInfo.customFilename ?? "unknown-filename"; return { attachment, mimetype, filename }; } return { attachment: undefined, mimetype: undefined, filename: undefined }; } private async queueMessage(bot: Bot, message: IncomingMessagev1) { const { timestamp, account, data_message: dataMessage } = message; if (!dataMessage?.body && !dataMessage?.attachments) { this.server.logger.info({ message }, "message received with no content"); return; } if (!timestamp || !account) { this.server.logger.debug({ message }, "invalid message received"); } const { attachment, mimetype, filename } = await this.getAttachmentInfo( dataMessage ); const receivedMessage = { message, botId: bot.id, botPhoneNumber: bot.phoneNumber, attachment, mimetype, filename, }; workerUtils.addJob("signald-message", receivedMessage, { jobKey: `signal-bot-${bot.id}-${timestamp}`, queueName: `signal-bot-${bot.id}`, }); } }