link-stack/apps/bridge-worker/tasks/fetch-signal-messages.ts
Darren Clarke 3d8f794cab Add user ID support for Baileys 7 LIDs and Signal UUIDs
Baileys 7 uses LIDs (Linked IDs) instead of phone numbers in remoteJid for
some messages. This caused messages to be matched to wrong tickets because
the LID was used as the sender identifier. This commit adds proper support
for both phone numbers and user IDs across WhatsApp and Signal channels.

Changes:

Database:
- Add migration for whatsapp_user_id and signal_user_id fields on users table

Zammad controllers:
- Update user lookup with 3-step fallback: phone → dedicated user_id field →
  user_id in phone field (legacy)
- Store user IDs in dedicated fields when available
- Update phone field when we receive actual phone number for legacy records
- Fix redundant condition in Signal controller

Bridge services:
- Extract both phone (from senderPn/participantPn) and LID (from remoteJid)
- Send both identifiers to Zammad via webhooks
- Use camelCase (userId) in bridge-whatsapp, convert to snake_case (user_id)
  in bridge-worker for Zammad compatibility

Baileys 7 compliance:
- Remove broken loadAllUnreadMessages() call (removed in Baileys 7)
- Return descriptive error directing users to use webhooks instead

Misc:
- Add docs/ to .gitignore
2026-01-15 13:08:56 +01:00

259 lines
7.1 KiB
TypeScript

import { db, getWorkerUtils } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
import * as signalApi from "@link-stack/signal-api";
const logger = createLogger("fetch-signal-messages");
const { Configuration, MessagesApi, AttachmentsApi } = signalApi;
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const fetchAttachments = async (attachments: any[] | undefined) => {
const formattedAttachments = [];
if (attachments) {
const attachmentsClient = new AttachmentsApi(config);
for (const att of attachments) {
const { id, contentType, filename: name } = att;
const blob = await attachmentsClient.v1AttachmentsAttachmentGet({
attachment: id,
});
const arrayBuffer = await blob.arrayBuffer();
const base64Attachment = Buffer.from(arrayBuffer).toString("base64");
// Generate default filename if not provided by Signal API
let defaultFilename = name;
if (!defaultFilename) {
// Check if id already has an extension
const hasExtension = id.includes(".");
if (hasExtension) {
// ID already includes extension
defaultFilename = id;
} else {
// Add extension based on content type
const extension = contentType?.split("/")[1] || "bin";
defaultFilename = `${id}.${extension}`;
}
}
const formattedAttachment = {
filename: defaultFilename,
mimeType: contentType,
attachment: base64Attachment,
};
formattedAttachments.push(formattedAttachment);
}
}
return formattedAttachments;
};
type ProcessMessageArgs = {
id: string;
phoneNumber: string;
message: any;
};
const processMessage = async ({
id,
phoneNumber,
message: msg,
}: ProcessMessageArgs): Promise<Record<string, any>[]> => {
const { envelope } = msg;
const { source, sourceUuid, dataMessage, syncMessage, receiptMessage, typingMessage } =
envelope;
// Log all envelope types to understand what events we're receiving
logger.info(
{
source,
sourceUuid,
hasDataMessage: !!dataMessage,
hasSyncMessage: !!syncMessage,
hasReceiptMessage: !!receiptMessage,
hasTypingMessage: !!typingMessage,
envelopeKeys: Object.keys(envelope),
},
"Received Signal envelope",
);
const isGroup = !!(
dataMessage?.groupV2 ||
dataMessage?.groupContext ||
dataMessage?.groupInfo
);
// Check if this is a group membership change event
const groupInfo = dataMessage?.groupInfo;
if (groupInfo) {
logger.info(
{
type: groupInfo.type,
groupId: groupInfo.groupId,
source,
groupInfoKeys: Object.keys(groupInfo),
fullGroupInfo: groupInfo,
},
"Received group info event",
);
// If user joined the group, notify Zammad
if (groupInfo.type === "JOIN" || groupInfo.type === "JOINED") {
const worker = await getWorkerUtils();
const groupId = groupInfo.groupId
? `group.${Buffer.from(groupInfo.groupId).toString("base64")}`
: null;
if (groupId) {
await worker.addJob("common/notify-webhooks", {
backendId: id,
payload: {
event: "group_member_joined",
group_id: groupId,
member_phone: source,
timestamp: new Date().toISOString(),
},
});
logger.info(
{
groupId,
memberPhone: source,
},
"User joined Signal group, notifying Zammad",
);
}
}
}
if (!dataMessage) return [];
const { attachments } = dataMessage;
const rawTimestamp = dataMessage?.timestamp;
logger.debug(
{
sourceUuid,
source,
rawTimestamp,
hasGroupV2: !!dataMessage?.groupV2,
hasGroupContext: !!dataMessage?.groupContext,
hasGroupInfo: !!dataMessage?.groupInfo,
isGroup,
groupV2Id: dataMessage?.groupV2?.id,
groupContextType: dataMessage?.groupContext?.type,
groupInfoType: dataMessage?.groupInfo?.type,
},
"Processing message",
);
const timestamp = new Date(rawTimestamp);
const formattedAttachments = await fetchAttachments(attachments);
const primaryAttachment = formattedAttachments[0] ?? {};
const additionalAttachments = formattedAttachments.slice(1);
const groupId =
dataMessage?.groupV2?.id ||
dataMessage?.groupContext?.id ||
dataMessage?.groupInfo?.groupId;
const toRecipient = groupId
? `group.${Buffer.from(groupId).toString("base64")}`
: phoneNumber;
const primaryMessage = {
token: id,
to: toRecipient,
from: source,
userId: sourceUuid, // Signal user UUID for user identification
messageId: `${sourceUuid}-${rawTimestamp}`,
message: dataMessage?.message,
sentAt: timestamp.toISOString(),
attachment: primaryAttachment.attachment,
filename: primaryAttachment.filename,
mimeType: primaryAttachment.mimeType,
isGroup,
};
const formattedMessages = [primaryMessage];
let count = 1;
for (const attachment of additionalAttachments) {
const additionalMessage = {
...primaryMessage,
...attachment,
message: attachment.filename,
messageId: `${sourceUuid}-${count}-${rawTimestamp}`,
};
formattedMessages.push(additionalMessage);
count++;
}
return formattedMessages;
};
interface FetchSignalMessagesTaskOptions {
scheduleTasks: string;
}
const fetchSignalMessagesTask = async ({
scheduleTasks = "false",
}: FetchSignalMessagesTaskOptions): Promise<void> => {
const worker = await getWorkerUtils();
if (scheduleTasks === "true") {
// because cron only has minimum 1 minute resolution
for (const offset of [15000, 30000, 45000]) {
await worker.addJob(
"fetch-signal-messages",
{ scheduleTasks: "false" },
{
maxAttempts: 1,
runAt: new Date(Date.now() + offset),
jobKey: `fetchSignalMessages-${offset}`,
},
);
}
}
const messagesClient = new MessagesApi(config);
const rows = await db.selectFrom("SignalBot").selectAll().execute();
for (const row of rows) {
const { id, phoneNumber } = row;
const messages = await messagesClient.v1ReceiveNumberGet({
number: phoneNumber,
});
logger.debug({ botId: id, phoneNumber }, "Fetching messages for bot");
for (const message of messages) {
const formattedMessages = await processMessage({
id,
phoneNumber,
message,
});
for (const formattedMessage of formattedMessages) {
if (formattedMessage.to !== formattedMessage.from) {
logger.debug(
{
messageId: formattedMessage.messageId,
from: formattedMessage.from,
to: formattedMessage.to,
isGroup: formattedMessage.isGroup,
hasMessage: !!formattedMessage.message,
hasAttachment: !!formattedMessage.attachment,
},
"Creating job for message",
);
await worker.addJob("signal/receive-signal-message", formattedMessage);
}
}
}
}
};
export default fetchSignalMessagesTask;