From 31faf22fd59019b908da70d91bb43903c4aed29c Mon Sep 17 00:00:00 2001 From: Darren Clarke Date: Thu, 5 Sep 2024 10:03:55 +0200 Subject: [PATCH] Signal attachment updates --- apps/bridge-worker/crontab | 2 +- .../tasks/fetch-signal-messages.ts | 154 ++++++++++++++---- packages/signal-api/apis/AttachmentsApi.ts | 10 +- 3 files changed, 128 insertions(+), 38 deletions(-) diff --git a/apps/bridge-worker/crontab b/apps/bridge-worker/crontab index 0db6c6b..cb76da9 100644 --- a/apps/bridge-worker/crontab +++ b/apps/bridge-worker/crontab @@ -1 +1 @@ -*/1 * * * * fetch-signal-messages ?max=1 +*/1 * * * * fetch-signal-messages ?max=1&id=fetchSignalMessagesCron {"scheduleTasks": "true"} diff --git a/apps/bridge-worker/tasks/fetch-signal-messages.ts b/apps/bridge-worker/tasks/fetch-signal-messages.ts index 4b1df6a..ad49476 100644 --- a/apps/bridge-worker/tasks/fetch-signal-messages.ts +++ b/apps/bridge-worker/tasks/fetch-signal-messages.ts @@ -1,41 +1,135 @@ import { db, getWorkerUtils } from "@link-stack/bridge-common"; import * as signalApi from "@link-stack/signal-api"; -const { Configuration, MessagesApi } = signalApi; -const fetchSignalMessagesTask = async (): Promise => { +const { Configuration, MessagesApi, AttachmentsApi } = signalApi; +const config = new Configuration({ + basePath: process.env.BRIDGE_SIGNAL_URL, +}); + +const fetchAttachments = async (attachments: any[] | undefined) => { + const formattedAttachments = []; + + if (attachments) { + const attachmentsClient = new AttachmentsApi(config); + + for (const att of attachments) { + const { id, contentType, filename: name } = att; + + const blob = await attachmentsClient.v1AttachmentsAttachmentGet({ + attachment: id, + }); + const arrayBuffer = await blob.arrayBuffer(); + const base64Attachment = Buffer.from(arrayBuffer).toString("base64"); + + const formattedAttachment = { + filename: name, + mimeType: contentType, + attachment: base64Attachment, + }; + + formattedAttachments.push(formattedAttachment); + } + } + + return formattedAttachments; +}; + +type ProcessMessageArgs = { + id: string; + phoneNumber: string; + message: any; +}; + +const processMessage = async ({ + id, + phoneNumber, + message: msg, +}: ProcessMessageArgs): Promise[]> => { + const { envelope } = msg; + console.log(envelope); + const { source, sourceUuid, dataMessage } = envelope; + const { attachments } = dataMessage; + const rawTimestamp = dataMessage?.timestamp; + const timestamp = new Date(rawTimestamp); + + const formattedAttachments = await fetchAttachments(attachments); + const primaryAttachment = formattedAttachments[0] ?? {}; + const additionalAttachments = formattedAttachments.slice(1); + const primaryMessage = { + token: id, + to: phoneNumber, + from: source, + messageId: `${sourceUuid}-${rawTimestamp}`, + message: dataMessage?.message, + sentAt: timestamp.toISOString(), + attachment: primaryAttachment.attachment, + filename: primaryAttachment.filename, + mimeType: primaryAttachment.mimeType, + }; + const formattedMessages = [primaryMessage]; + + let count = 1; + for (const attachment of additionalAttachments) { + const additionalMessage = { + ...primaryMessage, + ...attachment, + message: attachment.filename, + messageId: `${sourceUuid}-${count}-${rawTimestamp}`, + }; + formattedMessages.push(additionalMessage); + count++; + } + + return formattedMessages; +}; + +interface FetchSignalMessagesTaskOptions { + scheduleTasks: string; +} + +const fetchSignalMessagesTask = async ({ + scheduleTasks = "false", +}: FetchSignalMessagesTaskOptions): Promise => { const worker = await getWorkerUtils(); - const rows = await db.selectFrom("SignalBot").selectAll().execute(); - const config = new Configuration({ - basePath: process.env.BRIDGE_SIGNAL_URL, - }); + + if (scheduleTasks === "true") { + // because cron only has minimum 1 minute resolution + for (const offset of [15000, 30000, 45000]) { + await worker.addJob( + "fetch-signal-messages", + { scheduleTasks: "false" }, + { + maxAttempts: 1, + runAt: new Date(Date.now() + offset), + jobKey: `fetchSignalMessages-${offset}`, + }, + ); + } + } + const messagesClient = new MessagesApi(config); + const rows = await db.selectFrom("SignalBot").selectAll().execute(); for (const row of rows) { - const { id, phoneNumber: number } = row; - const messages = await messagesClient.v1ReceiveNumberGet({ number }); + const { id, phoneNumber } = row; + const messages = await messagesClient.v1ReceiveNumberGet({ + number: phoneNumber, + }); - for (const msg of messages) { - const { envelope } = msg as any; - const { source, sourceUuid, dataMessage } = envelope; - const message = dataMessage?.message; - const rawTimestamp = dataMessage?.timestamp; - const timestamp = new Date(rawTimestamp); - const messageId = `${sourceUuid}-${rawTimestamp}`; - const attachment = undefined; - const mimeType = undefined; - const filename = undefined; - if (source !== number && message) { - await worker.addJob("signal/receive-signal-message", { - token: id, - to: number, - from: source, - messageId, - message, - sentAt: timestamp.toISOString(), - attachment, - filename, - mimeType, - }); + for (const message of messages) { + const formattedMessages = await processMessage({ + id, + phoneNumber, + message, + }); + console.log({ formattedMessages }); + for (const formattedMessage of formattedMessages) { + if (formattedMessage.to !== formattedMessage.from) { + await worker.addJob( + "signal/receive-signal-message", + formattedMessage, + ); + } } } } diff --git a/packages/signal-api/apis/AttachmentsApi.ts b/packages/signal-api/apis/AttachmentsApi.ts index af326f7..d7ccf00 100644 --- a/packages/signal-api/apis/AttachmentsApi.ts +++ b/packages/signal-api/apis/AttachmentsApi.ts @@ -89,7 +89,7 @@ export class AttachmentsApi extends runtime.BaseAPI { async v1AttachmentsAttachmentGetRaw( requestParameters: V1AttachmentsAttachmentGetRequest, initOverrides?: RequestInit | runtime.InitOverrideFunction, - ): Promise> { + ): Promise> { if (requestParameters["attachment"] == null) { throw new runtime.RequiredError( "attachment", @@ -114,11 +114,7 @@ export class AttachmentsApi extends runtime.BaseAPI { initOverrides, ); - if (this.isJsonMime(response.headers.get("content-type"))) { - return new runtime.JSONApiResponse(response); - } else { - return new runtime.TextApiResponse(response) as any; - } + return new runtime.BlobApiResponse(response) as any; } /** @@ -128,7 +124,7 @@ export class AttachmentsApi extends runtime.BaseAPI { async v1AttachmentsAttachmentGet( requestParameters: V1AttachmentsAttachmentGetRequest, initOverrides?: RequestInit | runtime.InitOverrideFunction, - ): Promise { + ): Promise { const response = await this.v1AttachmentsAttachmentGetRaw( requestParameters, initOverrides,