Standardize bridge send/receive params

This commit is contained in:
Darren Clarke 2024-07-18 11:08:01 +02:00
parent 69abe9bee1
commit c32c26088f
23 changed files with 7042 additions and 1276 deletions

View file

@ -20,20 +20,20 @@
"jest": "^29.7.0",
"kysely": "^0.27.3",
"pg": "^8.12.0",
"remeda": "^2.2.2",
"remeda": "^2.6.0",
"twilio": "^5.2.2"
},
"devDependencies": {
"@babel/core": "7.24.7",
"@babel/preset-env": "7.24.7",
"@babel/core": "7.24.9",
"@babel/preset-env": "7.24.8",
"@babel/preset-typescript": "7.24.7",
"@types/fluent-ffmpeg": "^2.1.24",
"dotenv-cli": "^7.4.2",
"@link-stack/eslint-config": "*",
"prettier": "^3.3.2",
"prettier": "^3.3.3",
"@link-stack/typescript-config": "*",
"ts-node": "^10.9.2",
"typedoc": "^0.26.3",
"typedoc": "^0.26.4",
"typescript": "^5.5.3"
}
}

View file

@ -19,8 +19,11 @@ const receiveFacebookMessageTask = async ({
.executeTakeFirstOrThrow();
const backendId = row.id;
const payload = {
text: messaging.message.text,
recipient: messaging.sender.id,
to: pageId,
from: messaging.sender.id,
sent_at: new Date(messaging.timestamp).toISOString(),
message: messaging.message.text,
message_id: messaging.message.mid,
};
await worker.addJob("common/notify-webhooks", { backendId, payload });

View file

@ -2,14 +2,14 @@ import { db } from "@link-stack/bridge-common";
interface SendFacebookMessageTaskOptions {
token: string;
recipient: string;
text: string;
to: string;
message: string;
}
const sendFacebookMessageTask = async (
options: SendFacebookMessageTaskOptions,
): Promise<void> => {
const { token, text, recipient } = options;
const { token, to, message } = options;
const { pageId, pageAccessToken } = await db
.selectFrom("FacebookBot")
.selectAll()
@ -19,17 +19,23 @@ const sendFacebookMessageTask = async (
const endpoint = `https://graph.facebook.com/v19.0/${pageId}/messages`;
const outgoingMessage = {
recipient: { id: recipient },
message: { text },
recipient: { id: to },
message: { text: message },
messaging_type: "RESPONSE",
access_token: pageAccessToken,
};
await fetch(endpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(outgoingMessage),
});
try {
const response = await fetch(endpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(outgoingMessage),
});
console.log({ response });
} catch (error) {
console.error({ error });
throw error;
}
};
export default sendFacebookMessageTask;

View file

@ -17,16 +17,23 @@ const fetchSignalMessagesTask = async (): Promise<void> => {
for (const msg of messages) {
const { envelope } = msg as any;
const { source, sourceUuid, dataMessage } = envelope;
const messageID = sourceUuid;
const messageId = sourceUuid;
const message = dataMessage?.message;
const timestamp = new Date(dataMessage?.timestamp);
const attachment = undefined;
const mimeType = undefined;
const filename = undefined;
if (source !== number && message) {
await worker.addJob("signal/receive-signal-message", {
token: id,
sender: source,
messageID,
to: number,
from: source,
messageId,
message,
timestamp: timestamp.toISOString(),
sentAt: timestamp.toISOString(),
attachment,
filename,
mimeType,
});
}
}

View file

@ -2,20 +2,28 @@ import { db, getWorkerUtils } from "@link-stack/bridge-common";
interface ReceiveSignalMessageTaskOptions {
token: string;
sender: string;
to: string;
from: string;
messageId: string;
sentAt: string;
message: string;
messageID: string;
timestamp: string;
attachment?: string;
filename?: string;
mimeType?: string;
}
const receiveSignalMessageTask = async ({
token,
sender,
to,
from,
messageId,
sentAt,
message,
messageID,
timestamp,
attachment,
filename,
mimeType,
}: ReceiveSignalMessageTaskOptions): Promise<void> => {
console.log({ token, sender, message, messageID, timestamp });
console.log({ token, to, from });
const worker = await getWorkerUtils();
const row = await db
.selectFrom("SignalBot")
@ -23,15 +31,16 @@ const receiveSignalMessageTask = async ({
.where("id", "=", token)
.executeTakeFirstOrThrow();
console.log(row);
console.log(message);
const backendId = row.id;
const payload = {
to: row.phoneNumber,
from: sender,
sent_at: timestamp,
to,
from,
message_id: messageId,
sent_at: sentAt,
message,
message_id: messageID,
attachment,
filename,
mime_type: mimeType,
};
await worker.addJob("common/notify-webhooks", { backendId, payload });

View file

@ -4,15 +4,16 @@ const { Configuration, MessagesApi } = signalApi;
interface SendSignalMessageTaskOptions {
token: string;
recipient: string;
to: string;
message: any;
}
const sendSignalMessageTask = async ({
message,
recipient,
token,
to,
message,
}: SendSignalMessageTaskOptions): Promise<void> => {
console.log({ token, to });
const bot = await db
.selectFrom("SignalBot")
.selectAll()
@ -24,15 +25,20 @@ const sendSignalMessageTask = async ({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const messagesClient = new MessagesApi(config);
const response = await messagesClient.v2SendPost({
data: {
number,
recipients: [recipient],
message,
},
});
console.log({ response });
try {
const response = await messagesClient.v2SendPost({
data: {
number,
recipients: [to],
message,
},
});
console.log({ response });
} catch (error) {
console.error({ error });
throw error;
}
};
export default sendSignalMessageTask;

View file

@ -1,93 +0,0 @@
/* eslint-disable camelcase */
// import logger from "../logger";
// import { IncomingMessagev1 } from "@digiresilience/node-signald/build/main/generated";
import { withDb, AppDatabase } from "../../lib/db.js";
import workerUtils from "../../lib/utils.js";
type IncomingMessagev1 = any;
interface WebhookPayload {
to: string;
from: string;
message_id: string;
sent_at: string;
message: string;
attachment: string | null;
filename: string | null;
mime_type: string | null;
}
interface SignaldMessageTaskOptions {
message: IncomingMessagev1;
botId: string;
botPhoneNumber: string;
attachment: string;
filename: string;
mimetype: string;
}
const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
const { botId, botPhoneNumber, message, attachment, filename, mimetype } =
opts;
const { source, timestamp, data_message: dataMessage } = message;
const { number }: any = source;
const { body, attachments }: any = dataMessage;
return {
to: botPhoneNumber,
from: number,
message_id: `${botId}-${timestamp}`,
sent_at: `${timestamp}`,
message: body,
attachment,
filename,
mime_type: mimetype,
};
};
const notifyWebhooks = async (
db: AppDatabase,
messageInfo: SignaldMessageTaskOptions,
) => {
const {
botId,
message: { timestamp },
} = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId("signal", botId);
if (webhooks && webhooks.length === 0) {
// logger.debug({ botId }, "no webhooks registered for signal bot");
return;
}
webhooks.forEach(({ id }: any) => {
const payload = formatPayload(messageInfo);
// logger.debug(
// { payload },
// "formatted signal bot payload for notify-webhook",
// );
workerUtils.addJob(
"notify-webhook",
{
payload,
webhookId: id,
},
{
// this de-deduplicates the job
jobKey: `webhook-${id}-message-${botId}-${timestamp}`,
},
);
});
};
const signaldMessageTask = async (
options: SignaldMessageTaskOptions,
): Promise<void> => {
console.log(options);
withDb(async (db: AppDatabase) => {
await notifyWebhooks(db, options);
});
};
export default signaldMessageTask;

View file

@ -2,130 +2,48 @@ import { db, getWorkerUtils } from "@link-stack/bridge-common";
interface ReceiveWhatsappMessageTaskOptions {
token: string;
sender: string;
to: string;
from: string;
messageId: string;
sentAt: string;
message: string;
attachment?: string;
filename?: string;
mimeType?: string;
}
const receiveWhatsappMessageTask = async ({
token,
sender,
to,
from,
messageId,
sentAt,
message,
attachment,
filename,
mimeType,
}: ReceiveWhatsappMessageTaskOptions): Promise<void> => {
console.log({ token, sender, message });
console.log({ token, to, from });
const worker = await getWorkerUtils();
const row = await db
.selectFrom("WhatsappBot")
.selectAll()
.where("id", "=", token)
.executeTakeFirstOrThrow();
console.log(row);
const backendId = row.id;
const payload = {
to,
from,
message_id: messageId,
sent_at: sentAt,
message,
recipient: sender,
attachment,
filename,
mime_type: mimeType,
};
await worker.addJob("common/notify-webhooks", { backendId, payload });
};
export default receiveWhatsappMessageTask;
/* eslint-disable camelcase */
/*
import { withDb, AppDatabase } from "../../lib/db";
import workerUtils from "../../lib/utils";
interface WebhookPayload {
to: string;
from: string;
message_id: string;
sent_at: string;
message: string;
attachment: string;
filename: string;
mime_type: string;
}
interface WhatsappMessageTaskOptions {
waMessageId: string;
waMessage: string;
waTimestamp: string;
attachment: string;
filename: string;
mimetype: string;
botPhoneNumber: string;
whatsappBotId: string;
}
const formatPayload = (
messageInfo: WhatsappMessageTaskOptions,
): WebhookPayload => {
const {
waMessageId,
waMessage,
waTimestamp,
attachment,
filename,
mimetype,
botPhoneNumber,
} = messageInfo;
const parsedMessage = JSON.parse(waMessage);
const message =
parsedMessage.message?.conversation ??
parsedMessage.message?.extendedTextMessage?.text ??
parsedMessage.message?.imageMessage?.caption ??
parsedMessage.message?.videoMessage?.caption;
return {
to: botPhoneNumber,
from: parsedMessage.key.remoteJid,
message_id: waMessageId,
sent_at: waTimestamp,
message,
attachment,
filename,
mime_type: mimetype,
};
};
const notifyWebhooks = async (
db: AppDatabase,
messageInfo: WhatsappMessageTaskOptions,
) => {
const { waMessageId, whatsappBotId } = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId(
"whatsapp",
whatsappBotId,
);
if (webhooks && webhooks.length === 0) return;
webhooks.forEach(({ id }) => {
const payload = formatPayload(messageInfo);
console.log({ payload });
workerUtils.addJob(
"notify-webhook",
{
payload,
webhookId: id,
},
{
// this de-deduplicates the job
jobKey: `webhook-${id}-message-${waMessageId}`,
},
);
});
};
const whatsappMessageTask = async (
options: WhatsappMessageTaskOptions,
): Promise<void> => {
console.log(options);
withDb(async (db: AppDatabase) => {
await notifyWebhooks(db, options);
});
};
export default whatsappMessageTask;
*/

View file

@ -2,13 +2,13 @@ import { db } from "@link-stack/bridge-common";
interface SendWhatsappMessageTaskOptions {
token: string;
recipient: string;
to: string;
message: any;
}
const sendWhatsappMessageTask = async ({
message,
recipient,
to,
token,
}: SendWhatsappMessageTaskOptions): Promise<void> => {
const bot = await db
@ -18,15 +18,18 @@ const sendWhatsappMessageTask = async ({
.executeTakeFirstOrThrow();
const url = `${process.env.BRIDGE_WHATSAPP_URL}/api/bots/${bot.id}/send`;
const params = { message, phoneNumber: recipient };
console.log({ params });
const result = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(params),
});
console.log({ result });
const params = { message, phoneNumber: to };
try {
const result = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(params),
});
console.log({ result });
} catch (error) {
console.error({ error });
throw new Error("Failed to send message");
}
};
export default sendWhatsappMessageTask;