import { db, getWorkerUtils } from "@link-stack/bridge-common"; import * as signalApi from "@link-stack/signal-api"; const { Configuration, MessagesApi, AttachmentsApi } = signalApi; const config = new Configuration({ basePath: process.env.BRIDGE_SIGNAL_URL, }); const fetchAttachments = async (attachments: any[] | undefined) => { const formattedAttachments = []; if (attachments) { const attachmentsClient = new AttachmentsApi(config); for (const att of attachments) { const { id, contentType, filename: name } = att; const blob = await attachmentsClient.v1AttachmentsAttachmentGet({ attachment: id, }); const arrayBuffer = await blob.arrayBuffer(); const base64Attachment = Buffer.from(arrayBuffer).toString("base64"); const formattedAttachment = { filename: name, mimeType: contentType, attachment: base64Attachment, }; formattedAttachments.push(formattedAttachment); } } return formattedAttachments; }; type ProcessMessageArgs = { id: string; phoneNumber: string; message: any; }; const processMessage = async ({ id, phoneNumber, message: msg, }: ProcessMessageArgs): Promise[]> => { const { envelope } = msg; console.log(envelope); const { source, sourceUuid, dataMessage } = envelope; const { attachments } = dataMessage; const rawTimestamp = dataMessage?.timestamp; const timestamp = new Date(rawTimestamp); const formattedAttachments = await fetchAttachments(attachments); const primaryAttachment = formattedAttachments[0] ?? {}; const additionalAttachments = formattedAttachments.slice(1); const primaryMessage = { token: id, to: phoneNumber, from: source, messageId: `${sourceUuid}-${rawTimestamp}`, message: dataMessage?.message, sentAt: timestamp.toISOString(), attachment: primaryAttachment.attachment, filename: primaryAttachment.filename, mimeType: primaryAttachment.mimeType, }; const formattedMessages = [primaryMessage]; let count = 1; for (const attachment of additionalAttachments) { const additionalMessage = { ...primaryMessage, ...attachment, message: attachment.filename, messageId: `${sourceUuid}-${count}-${rawTimestamp}`, }; formattedMessages.push(additionalMessage); count++; } return formattedMessages; }; interface FetchSignalMessagesTaskOptions { scheduleTasks: string; } const fetchSignalMessagesTask = async ({ scheduleTasks = "false", }: FetchSignalMessagesTaskOptions): Promise => { const worker = await getWorkerUtils(); if (scheduleTasks === "true") { // because cron only has minimum 1 minute resolution for (const offset of [15000, 30000, 45000]) { await worker.addJob( "fetch-signal-messages", { scheduleTasks: "false" }, { maxAttempts: 1, runAt: new Date(Date.now() + offset), jobKey: `fetchSignalMessages-${offset}`, }, ); } } const messagesClient = new MessagesApi(config); const rows = await db.selectFrom("SignalBot").selectAll().execute(); for (const row of rows) { const { id, phoneNumber } = row; const messages = await messagesClient.v1ReceiveNumberGet({ number: phoneNumber, }); for (const message of messages) { const formattedMessages = await processMessage({ id, phoneNumber, message, }); console.log({ formattedMessages }); for (const formattedMessage of formattedMessages) { if (formattedMessage.to !== formattedMessage.from) { await worker.addJob( "signal/receive-signal-message", formattedMessage, ); } } } } }; export default fetchSignalMessagesTask;