import { db, getWorkerUtils } from "@link-stack/bridge-common"; import * as signalApi from "@link-stack/signal-api"; const { Configuration, MessagesApi, AttachmentsApi } = signalApi; const config = new Configuration({ basePath: process.env.BRIDGE_SIGNAL_URL, }); const fetchAttachments = async (attachments: any[] | undefined) => { const formattedAttachments = []; if (attachments) { const attachmentsClient = new AttachmentsApi(config); for (const att of attachments) { const { id, contentType, filename: name } = att; const blob = await attachmentsClient.v1AttachmentsAttachmentGet({ attachment: id, }); const arrayBuffer = await blob.arrayBuffer(); const base64Attachment = Buffer.from(arrayBuffer).toString("base64"); const formattedAttachment = { filename: name, mimeType: contentType, attachment: base64Attachment, }; formattedAttachments.push(formattedAttachment); } } return formattedAttachments; }; type ProcessMessageArgs = { id: string; phoneNumber: string; message: any; }; const processMessage = async ({ id, phoneNumber, message: msg, }: ProcessMessageArgs): Promise[]> => { const { envelope } = msg; const { source, sourceUuid, dataMessage } = envelope; const isGroup = !!( dataMessage?.groupV2 || dataMessage?.groupContext || dataMessage?.groupInfo ); if (!dataMessage) return []; const { attachments } = dataMessage; const rawTimestamp = dataMessage?.timestamp; // Debug logging for group detection console.log(`[fetch-signal-messages] Processing message:`, { sourceUuid, source, rawTimestamp, hasGroupV2: !!dataMessage?.groupV2, hasGroupContext: !!dataMessage?.groupContext, hasGroupInfo: !!dataMessage?.groupInfo, isGroup, groupV2Id: dataMessage?.groupV2?.id, groupContextType: dataMessage?.groupContext?.type, groupInfoType: dataMessage?.groupInfo?.type, }); const timestamp = new Date(rawTimestamp); const formattedAttachments = await fetchAttachments(attachments); const primaryAttachment = formattedAttachments[0] ?? {}; const additionalAttachments = formattedAttachments.slice(1); const groupId = dataMessage?.groupV2?.id || dataMessage?.groupContext?.id || dataMessage?.groupInfo?.groupId; const toRecipient = groupId ? `group.${Buffer.from(groupId).toString("base64")}` : phoneNumber; const primaryMessage = { token: id, to: toRecipient, from: source, messageId: `${sourceUuid}-${rawTimestamp}`, message: dataMessage?.message, sentAt: timestamp.toISOString(), attachment: primaryAttachment.attachment, filename: primaryAttachment.filename, mimeType: primaryAttachment.mimeType, isGroup, }; const formattedMessages = [primaryMessage]; let count = 1; for (const attachment of additionalAttachments) { const additionalMessage = { ...primaryMessage, ...attachment, message: attachment.filename, messageId: `${sourceUuid}-${count}-${rawTimestamp}`, }; formattedMessages.push(additionalMessage); count++; } return formattedMessages; }; interface FetchSignalMessagesTaskOptions { scheduleTasks: string; } const fetchSignalMessagesTask = async ({ scheduleTasks = "false", }: FetchSignalMessagesTaskOptions): Promise => { const worker = await getWorkerUtils(); if (scheduleTasks === "true") { // because cron only has minimum 1 minute resolution for (const offset of [15000, 30000, 45000]) { await worker.addJob( "fetch-signal-messages", { scheduleTasks: "false" }, { maxAttempts: 1, runAt: new Date(Date.now() + offset), jobKey: `fetchSignalMessages-${offset}`, }, ); } } const messagesClient = new MessagesApi(config); const rows = await db.selectFrom("SignalBot").selectAll().execute(); for (const row of rows) { const { id, phoneNumber } = row; const messages = await messagesClient.v1ReceiveNumberGet({ number: phoneNumber, }); console.log( `[fetch-signal-messages] Fetching messages for bot ${id} (${phoneNumber})`, ); for (const message of messages) { const formattedMessages = await processMessage({ id, phoneNumber, message, }); for (const formattedMessage of formattedMessages) { if (formattedMessage.to !== formattedMessage.from) { console.log(`[fetch-signal-messages] Creating job for message:`, { messageId: formattedMessage.messageId, from: formattedMessage.from, to: formattedMessage.to, isGroup: formattedMessage.isGroup, hasMessage: !!formattedMessage.message, hasAttachment: !!formattedMessage.attachment, }); await worker.addJob( "signal/receive-signal-message", formattedMessage, ); } } } } }; export default fetchSignalMessagesTask;