Signal attachment updates

This commit is contained in:
Darren Clarke 2024-09-05 10:03:55 +02:00
parent 8c6e954fdf
commit 31faf22fd5
3 changed files with 128 additions and 38 deletions

View file

@ -1 +1 @@
*/1 * * * * fetch-signal-messages ?max=1 */1 * * * * fetch-signal-messages ?max=1&id=fetchSignalMessagesCron {"scheduleTasks": "true"}

View file

@ -1,41 +1,135 @@
import { db, getWorkerUtils } from "@link-stack/bridge-common"; import { db, getWorkerUtils } from "@link-stack/bridge-common";
import * as signalApi from "@link-stack/signal-api"; import * as signalApi from "@link-stack/signal-api";
const { Configuration, MessagesApi } = signalApi;
const fetchSignalMessagesTask = async (): Promise<void> => { const { Configuration, MessagesApi, AttachmentsApi } = signalApi;
const worker = await getWorkerUtils(); const config = new Configuration({
const rows = await db.selectFrom("SignalBot").selectAll().execute();
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL, basePath: process.env.BRIDGE_SIGNAL_URL,
});
const fetchAttachments = async (attachments: any[] | undefined) => {
const formattedAttachments = [];
if (attachments) {
const attachmentsClient = new AttachmentsApi(config);
for (const att of attachments) {
const { id, contentType, filename: name } = att;
const blob = await attachmentsClient.v1AttachmentsAttachmentGet({
attachment: id,
}); });
const messagesClient = new MessagesApi(config); const arrayBuffer = await blob.arrayBuffer();
const base64Attachment = Buffer.from(arrayBuffer).toString("base64");
for (const row of rows) { const formattedAttachment = {
const { id, phoneNumber: number } = row; filename: name,
const messages = await messagesClient.v1ReceiveNumberGet({ number }); mimeType: contentType,
attachment: base64Attachment,
};
for (const msg of messages) { formattedAttachments.push(formattedAttachment);
const { envelope } = msg as any; }
}
return formattedAttachments;
};
type ProcessMessageArgs = {
id: string;
phoneNumber: string;
message: any;
};
const processMessage = async ({
id,
phoneNumber,
message: msg,
}: ProcessMessageArgs): Promise<Record<string, any>[]> => {
const { envelope } = msg;
console.log(envelope);
const { source, sourceUuid, dataMessage } = envelope; const { source, sourceUuid, dataMessage } = envelope;
const message = dataMessage?.message; const { attachments } = dataMessage;
const rawTimestamp = dataMessage?.timestamp; const rawTimestamp = dataMessage?.timestamp;
const timestamp = new Date(rawTimestamp); const timestamp = new Date(rawTimestamp);
const messageId = `${sourceUuid}-${rawTimestamp}`;
const attachment = undefined; const formattedAttachments = await fetchAttachments(attachments);
const mimeType = undefined; const primaryAttachment = formattedAttachments[0] ?? {};
const filename = undefined; const additionalAttachments = formattedAttachments.slice(1);
if (source !== number && message) { const primaryMessage = {
await worker.addJob("signal/receive-signal-message", {
token: id, token: id,
to: number, to: phoneNumber,
from: source, from: source,
messageId, messageId: `${sourceUuid}-${rawTimestamp}`,
message, message: dataMessage?.message,
sentAt: timestamp.toISOString(), sentAt: timestamp.toISOString(),
attachment, attachment: primaryAttachment.attachment,
filename, filename: primaryAttachment.filename,
mimeType, mimeType: primaryAttachment.mimeType,
};
const formattedMessages = [primaryMessage];
let count = 1;
for (const attachment of additionalAttachments) {
const additionalMessage = {
...primaryMessage,
...attachment,
message: attachment.filename,
messageId: `${sourceUuid}-${count}-${rawTimestamp}`,
};
formattedMessages.push(additionalMessage);
count++;
}
return formattedMessages;
};
interface FetchSignalMessagesTaskOptions {
scheduleTasks: string;
}
const fetchSignalMessagesTask = async ({
scheduleTasks = "false",
}: FetchSignalMessagesTaskOptions): Promise<void> => {
const worker = await getWorkerUtils();
if (scheduleTasks === "true") {
// because cron only has minimum 1 minute resolution
for (const offset of [15000, 30000, 45000]) {
await worker.addJob(
"fetch-signal-messages",
{ scheduleTasks: "false" },
{
maxAttempts: 1,
runAt: new Date(Date.now() + offset),
jobKey: `fetchSignalMessages-${offset}`,
},
);
}
}
const messagesClient = new MessagesApi(config);
const rows = await db.selectFrom("SignalBot").selectAll().execute();
for (const row of rows) {
const { id, phoneNumber } = row;
const messages = await messagesClient.v1ReceiveNumberGet({
number: phoneNumber,
}); });
for (const message of messages) {
const formattedMessages = await processMessage({
id,
phoneNumber,
message,
});
console.log({ formattedMessages });
for (const formattedMessage of formattedMessages) {
if (formattedMessage.to !== formattedMessage.from) {
await worker.addJob(
"signal/receive-signal-message",
formattedMessage,
);
}
} }
} }
} }

View file

@ -89,7 +89,7 @@ export class AttachmentsApi extends runtime.BaseAPI {
async v1AttachmentsAttachmentGetRaw( async v1AttachmentsAttachmentGetRaw(
requestParameters: V1AttachmentsAttachmentGetRequest, requestParameters: V1AttachmentsAttachmentGetRequest,
initOverrides?: RequestInit | runtime.InitOverrideFunction, initOverrides?: RequestInit | runtime.InitOverrideFunction,
): Promise<runtime.ApiResponse<string>> { ): Promise<runtime.ApiResponse<Blob>> {
if (requestParameters["attachment"] == null) { if (requestParameters["attachment"] == null) {
throw new runtime.RequiredError( throw new runtime.RequiredError(
"attachment", "attachment",
@ -114,11 +114,7 @@ export class AttachmentsApi extends runtime.BaseAPI {
initOverrides, initOverrides,
); );
if (this.isJsonMime(response.headers.get("content-type"))) { return new runtime.BlobApiResponse(response) as any;
return new runtime.JSONApiResponse<string>(response);
} else {
return new runtime.TextApiResponse(response) as any;
}
} }
/** /**
@ -128,7 +124,7 @@ export class AttachmentsApi extends runtime.BaseAPI {
async v1AttachmentsAttachmentGet( async v1AttachmentsAttachmentGet(
requestParameters: V1AttachmentsAttachmentGetRequest, requestParameters: V1AttachmentsAttachmentGetRequest,
initOverrides?: RequestInit | runtime.InitOverrideFunction, initOverrides?: RequestInit | runtime.InitOverrideFunction,
): Promise<string> { ): Promise<Blob> {
const response = await this.v1AttachmentsAttachmentGetRaw( const response = await this.v1AttachmentsAttachmentGetRaw(
requestParameters, requestParameters,
initOverrides, initOverrides,