link-stack/apps/bridge-worker/tasks/fetch-signal-messages.ts

181 lines
4.9 KiB
TypeScript
Raw Normal View History

2024-06-05 15:12:48 +02:00
import { db, getWorkerUtils } from "@link-stack/bridge-common";
import * as signalApi from "@link-stack/signal-api";
2024-09-05 10:03:55 +02:00
const { Configuration, MessagesApi, AttachmentsApi } = signalApi;
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const fetchAttachments = async (attachments: any[] | undefined) => {
const formattedAttachments = [];
if (attachments) {
const attachmentsClient = new AttachmentsApi(config);
for (const att of attachments) {
const { id, contentType, filename: name } = att;
const blob = await attachmentsClient.v1AttachmentsAttachmentGet({
attachment: id,
});
const arrayBuffer = await blob.arrayBuffer();
const base64Attachment = Buffer.from(arrayBuffer).toString("base64");
const formattedAttachment = {
filename: name,
mimeType: contentType,
attachment: base64Attachment,
};
formattedAttachments.push(formattedAttachment);
}
}
return formattedAttachments;
};
type ProcessMessageArgs = {
id: string;
phoneNumber: string;
message: any;
};
const processMessage = async ({
id,
phoneNumber,
message: msg,
}: ProcessMessageArgs): Promise<Record<string, any>[]> => {
const { envelope } = msg;
const { source, sourceUuid, dataMessage } = envelope;
2024-09-05 16:19:58 +02:00
2025-06-10 14:02:21 +02:00
const isGroup = !!(
dataMessage?.groupV2 ||
dataMessage?.groupContext ||
dataMessage?.groupInfo
);
2024-09-05 16:19:58 +02:00
if (!dataMessage) return [];
2024-09-05 10:03:55 +02:00
const { attachments } = dataMessage;
const rawTimestamp = dataMessage?.timestamp;
2025-07-08 18:03:01 +02:00
2025-07-07 20:02:54 +02:00
// Debug logging for group detection
console.log(`[fetch-signal-messages] Processing message:`, {
sourceUuid,
source,
rawTimestamp,
hasGroupV2: !!dataMessage?.groupV2,
hasGroupContext: !!dataMessage?.groupContext,
hasGroupInfo: !!dataMessage?.groupInfo,
isGroup,
groupV2Id: dataMessage?.groupV2?.id,
groupContextType: dataMessage?.groupContext?.type,
groupInfoType: dataMessage?.groupInfo?.type,
});
2024-09-05 10:03:55 +02:00
const timestamp = new Date(rawTimestamp);
const formattedAttachments = await fetchAttachments(attachments);
const primaryAttachment = formattedAttachments[0] ?? {};
const additionalAttachments = formattedAttachments.slice(1);
2025-07-08 18:03:01 +02:00
const groupId =
dataMessage?.groupV2?.id ||
dataMessage?.groupContext?.id ||
dataMessage?.groupInfo?.groupId;
const toRecipient = groupId ?? phoneNumber;
2024-09-05 10:03:55 +02:00
const primaryMessage = {
token: id,
2025-07-07 20:02:54 +02:00
to: toRecipient,
2024-09-05 10:03:55 +02:00
from: source,
messageId: `${sourceUuid}-${rawTimestamp}`,
message: dataMessage?.message,
sentAt: timestamp.toISOString(),
attachment: primaryAttachment.attachment,
filename: primaryAttachment.filename,
mimeType: primaryAttachment.mimeType,
2025-06-10 14:02:21 +02:00
isGroup,
2024-09-05 10:03:55 +02:00
};
const formattedMessages = [primaryMessage];
let count = 1;
for (const attachment of additionalAttachments) {
const additionalMessage = {
...primaryMessage,
...attachment,
message: attachment.filename,
messageId: `${sourceUuid}-${count}-${rawTimestamp}`,
};
formattedMessages.push(additionalMessage);
count++;
}
return formattedMessages;
};
interface FetchSignalMessagesTaskOptions {
scheduleTasks: string;
}
const fetchSignalMessagesTask = async ({
scheduleTasks = "false",
}: FetchSignalMessagesTaskOptions): Promise<void> => {
2024-06-05 15:12:48 +02:00
const worker = await getWorkerUtils();
2024-09-05 10:03:55 +02:00
if (scheduleTasks === "true") {
// because cron only has minimum 1 minute resolution
for (const offset of [15000, 30000, 45000]) {
await worker.addJob(
"fetch-signal-messages",
{ scheduleTasks: "false" },
{
maxAttempts: 1,
runAt: new Date(Date.now() + offset),
jobKey: `fetchSignalMessages-${offset}`,
},
);
}
}
2024-06-05 15:12:48 +02:00
const messagesClient = new MessagesApi(config);
2024-09-05 10:03:55 +02:00
const rows = await db.selectFrom("SignalBot").selectAll().execute();
2024-06-05 15:12:48 +02:00
for (const row of rows) {
2024-09-05 10:03:55 +02:00
const { id, phoneNumber } = row;
const messages = await messagesClient.v1ReceiveNumberGet({
number: phoneNumber,
});
2025-07-08 18:03:01 +02:00
console.log(
`[fetch-signal-messages] Fetching messages for bot ${id} (${phoneNumber})`,
);
2024-09-05 10:03:55 +02:00
for (const message of messages) {
const formattedMessages = await processMessage({
id,
phoneNumber,
message,
});
for (const formattedMessage of formattedMessages) {
if (formattedMessage.to !== formattedMessage.from) {
2025-07-07 20:02:54 +02:00
console.log(`[fetch-signal-messages] Creating job for message:`, {
messageId: formattedMessage.messageId,
from: formattedMessage.from,
to: formattedMessage.to,
isGroup: formattedMessage.isGroup,
hasMessage: !!formattedMessage.message,
hasAttachment: !!formattedMessage.attachment,
});
2025-07-08 18:03:01 +02:00
2024-09-05 10:03:55 +02:00
await worker.addJob(
"signal/receive-signal-message",
formattedMessage,
);
}
2024-06-05 15:12:48 +02:00
}
}
}
};
export default fetchSignalMessagesTask;