import { db, getWorkerUtils } from "@link-stack/bridge-common"; import { createLogger } from "@link-stack/logger"; import * as signalApi from "@link-stack/signal-api"; const logger = createLogger("fetch-signal-messages"); const { Configuration, MessagesApi, AttachmentsApi } = signalApi; const config = new Configuration({ basePath: process.env.BRIDGE_SIGNAL_URL, }); const fetchAttachments = async (attachments: any[] | undefined) => { const formattedAttachments = []; if (attachments) { const attachmentsClient = new AttachmentsApi(config); for (const att of attachments) { const { id, contentType, filename: name } = att; const blob = await attachmentsClient.v1AttachmentsAttachmentGet({ attachment: id, }); const arrayBuffer = await blob.arrayBuffer(); const base64Attachment = Buffer.from(arrayBuffer).toString("base64"); // Generate default filename if not provided by Signal API let defaultFilename = name; if (!defaultFilename) { // Check if id already has an extension const hasExtension = id.includes("."); if (hasExtension) { // ID already includes extension defaultFilename = id; } else { // Add extension based on content type const extension = contentType?.split("/")[1] || "bin"; defaultFilename = `${id}.${extension}`; } } const formattedAttachment = { filename: defaultFilename, mimeType: contentType, attachment: base64Attachment, }; formattedAttachments.push(formattedAttachment); } } return formattedAttachments; }; type ProcessMessageArgs = { id: string; phoneNumber: string; message: any; }; const processMessage = async ({ id, phoneNumber, message: msg, }: ProcessMessageArgs): Promise[]> => { const { envelope } = msg; const { source, sourceUuid, dataMessage, syncMessage, receiptMessage, typingMessage } = envelope; // Log all envelope types to understand what events we're receiving logger.info( { source, sourceUuid, hasDataMessage: !!dataMessage, hasSyncMessage: !!syncMessage, hasReceiptMessage: !!receiptMessage, hasTypingMessage: !!typingMessage, envelopeKeys: Object.keys(envelope), }, "Received Signal envelope", ); const isGroup = !!( dataMessage?.groupV2 || dataMessage?.groupContext || dataMessage?.groupInfo ); // Check if this is a group membership change event const groupInfo = dataMessage?.groupInfo; if (groupInfo) { logger.info( { type: groupInfo.type, groupId: groupInfo.groupId, source, groupInfoKeys: Object.keys(groupInfo), fullGroupInfo: groupInfo, }, "Received group info event", ); // If user joined the group, notify Zammad if (groupInfo.type === "JOIN" || groupInfo.type === "JOINED") { const worker = await getWorkerUtils(); const groupId = groupInfo.groupId ? `group.${Buffer.from(groupInfo.groupId).toString("base64")}` : null; if (groupId) { await worker.addJob("common/notify-webhooks", { backendId: id, payload: { event: "group_member_joined", group_id: groupId, member_phone: source, timestamp: new Date().toISOString(), }, }); logger.info( { groupId, memberPhone: source, }, "User joined Signal group, notifying Zammad", ); } } } if (!dataMessage) return []; const { attachments } = dataMessage; const rawTimestamp = dataMessage?.timestamp; logger.debug( { sourceUuid, source, rawTimestamp, hasGroupV2: !!dataMessage?.groupV2, hasGroupContext: !!dataMessage?.groupContext, hasGroupInfo: !!dataMessage?.groupInfo, isGroup, groupV2Id: dataMessage?.groupV2?.id, groupContextType: dataMessage?.groupContext?.type, groupInfoType: dataMessage?.groupInfo?.type, }, "Processing message", ); const timestamp = new Date(rawTimestamp); const formattedAttachments = await fetchAttachments(attachments); const primaryAttachment = formattedAttachments[0] ?? {}; const additionalAttachments = formattedAttachments.slice(1); const groupId = dataMessage?.groupV2?.id || dataMessage?.groupContext?.id || dataMessage?.groupInfo?.groupId; const toRecipient = groupId ? `group.${Buffer.from(groupId).toString("base64")}` : phoneNumber; const primaryMessage = { token: id, to: toRecipient, from: source, userId: sourceUuid, // Signal user UUID for user identification messageId: `${sourceUuid}-${rawTimestamp}`, message: dataMessage?.message, sentAt: timestamp.toISOString(), attachment: primaryAttachment.attachment, filename: primaryAttachment.filename, mimeType: primaryAttachment.mimeType, isGroup, }; const formattedMessages = [primaryMessage]; let count = 1; for (const attachment of additionalAttachments) { const additionalMessage = { ...primaryMessage, ...attachment, message: attachment.filename, messageId: `${sourceUuid}-${count}-${rawTimestamp}`, }; formattedMessages.push(additionalMessage); count++; } return formattedMessages; }; interface FetchSignalMessagesTaskOptions { scheduleTasks: string; } const fetchSignalMessagesTask = async ({ scheduleTasks = "false", }: FetchSignalMessagesTaskOptions): Promise => { const worker = await getWorkerUtils(); if (scheduleTasks === "true") { // because cron only has minimum 1 minute resolution // schedule one additional job at 30s to achieve 30-second polling await worker.addJob( "fetch-signal-messages", { scheduleTasks: "false" }, { maxAttempts: 1, runAt: new Date(Date.now() + 30000), jobKey: "fetchSignalMessages-30000", }, ); } const messagesClient = new MessagesApi(config); const rows = await db.selectFrom("SignalBot").selectAll().execute(); for (const row of rows) { const { id, phoneNumber } = row; const messages = await messagesClient.v1ReceiveNumberGet({ number: phoneNumber, }); logger.debug({ botId: id, phoneNumber }, "Fetching messages for bot"); for (const message of messages) { const formattedMessages = await processMessage({ id, phoneNumber, message, }); for (const formattedMessage of formattedMessages) { if (formattedMessage.to !== formattedMessage.from) { logger.debug( { messageId: formattedMessage.messageId, from: formattedMessage.from, to: formattedMessage.to, isGroup: formattedMessage.isGroup, hasMessage: !!formattedMessage.message, hasAttachment: !!formattedMessage.attachment, }, "Creating job for message", ); await worker.addJob("signal/receive-signal-message", formattedMessage); } } } } }; export default fetchSignalMessagesTask;