From 27810142b3fb492b2bab02cfc7f364e8054fa0c6 Mon Sep 17 00:00:00 2001 From: Darren Clarke Date: Mon, 13 Mar 2023 15:53:21 +0000 Subject: [PATCH] Migrate changes from other Metamigo repo --- .../src/app/routes/whatsapp/index.ts | 21 ++ apps/metamigo-api/src/app/services/signald.ts | 22 ++ .../metamigo-api/src/app/services/whatsapp.ts | 273 +++++++++--------- .../whatsapp/bots/WhatsappBotShow.tsx | 20 +- apps/metamigo-worker/index.ts | 14 +- apps/metamigo-worker/package.json | 4 +- apps/metamigo-worker/tag-map.ts | 69 +++++ .../tasks/import-label-studio.ts | 176 +++++++++++ .../tasks/import-leafcutter.ts | 164 +++++++++++ apps/metamigo-worker/tasks/signal-message.ts | 76 ----- apps/metamigo-worker/tasks/signald-message.ts | 11 +- 11 files changed, 615 insertions(+), 235 deletions(-) create mode 100644 apps/metamigo-worker/tag-map.ts create mode 100644 apps/metamigo-worker/tasks/import-label-studio.ts create mode 100644 apps/metamigo-worker/tasks/import-leafcutter.ts delete mode 100644 apps/metamigo-worker/tasks/signal-message.ts diff --git a/apps/metamigo-api/src/app/routes/whatsapp/index.ts b/apps/metamigo-api/src/app/routes/whatsapp/index.ts index 08cfa74..9edb747 100644 --- a/apps/metamigo-api/src/app/routes/whatsapp/index.ts +++ b/apps/metamigo-api/src/app/routes/whatsapp/index.ts @@ -141,6 +141,27 @@ export const RegisterBotRoute = Helpers.withDefaults({ }, }); +export const UnverifyBotRoute = Helpers.withDefaults({ + method: "post", + path: "/api/whatsapp/bots/{id}/unverify", + options: { + description: "Unverify bot", + async handler: (request: Hapi.Request, _h: Hapi.ResponseToolkit) { + const { id } = request.params; + const { whatsappService } = request.services(); + + const bot = await whatsappService.findById(id); + + if (bot) { + return whatsappService.unverify(bot); + } + + throw Boom.notFound("Bot not found"); + }, + }, +}); + + export const RefreshBotRoute = Helpers.withDefaults({ method: "get", path: "/api/whatsapp/bots/{id}/refresh", diff --git a/apps/metamigo-api/src/app/services/signald.ts b/apps/metamigo-api/src/app/services/signald.ts index 16cfd27..e16f5b2 100644 --- a/apps/metamigo-api/src/app/services/signald.ts +++ b/apps/metamigo-api/src/app/services/signald.ts @@ -1,5 +1,6 @@ import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; +import { promises as fs } from "fs"; import { SignaldAPI, IncomingMessagev1, @@ -179,6 +180,22 @@ export default class SignaldService extends Service { await this.queueMessage(bot, message); } + private async getAttachmentInfo(dataMessage: any) { + if (dataMessage.attachments?.length > 0) { + const attachmentInfo = dataMessage.attachments[0]; + const buffer = await fs.readFile(attachmentInfo.storedFilename); + const attachment = buffer.toString("base64"); + const mimetype = attachmentInfo.contentType ?? "application/octet-stream"; + const filename = attachmentInfo.customFilename ?? "unknown-filename"; + + + return { attachment, mimetype, filename } + } + + return { attachment: null, mimetype: null, filename: null }; + } + + private async queueMessage(bot: Bot, message: IncomingMessagev1) { const { timestamp, account, data_message: dataMessage } = message; if (!dataMessage?.body && !dataMessage?.attachments) { @@ -190,10 +207,15 @@ export default class SignaldService extends Service { this.server.logger.debug({ message }, "invalid message received"); } + const { attachment, mimetype, filename } = await this.getAttachmentInfo(dataMessage); + const receivedMessage = { message, botId: bot.id, botPhoneNumber: bot.phoneNumber, + attachment, + mimetype, + filename }; workerUtils.addJob("signald-message", receivedMessage, { diff --git a/apps/metamigo-api/src/app/services/whatsapp.ts b/apps/metamigo-api/src/app/services/whatsapp.ts index a7d32c6..7efdbd0 100644 --- a/apps/metamigo-api/src/app/services/whatsapp.ts +++ b/apps/metamigo-api/src/app/services/whatsapp.ts @@ -1,32 +1,17 @@ +/* eslint-disable unicorn/no-abusive-eslint-disable */ +/* eslint-disable */ import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { SavedWhatsappBot as Bot } from "@digiresilience/metamigo-db"; -import makeWASocket, { - DisconnectReason, - proto, - downloadContentFromMessage, - MediaType, - AnyMessageContent, - WAProto, - UserFacingSocketConfig, - MiscMessageGenerationOptions, -} from "@adiwajshing/baileys"; +import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType, fetchLatestBaileysVersion, isJidBroadcast, isJidStatusBroadcast, MessageRetryMap, useMultiFileAuthState } from "@adiwajshing/baileys"; +import fs from "fs"; import workerUtils from "../../worker-utils"; -import { useDatabaseAuthState } from "../lib/whatsapp-key-store"; export type AuthCompleteCallback = (error?: string) => void; -export type Connection = { - end: (error: Error | undefined) => void; - sendMessage: ( - jid: string, - content: AnyMessageContent, - options?: MiscMessageGenerationOptions - ) => Promise; -}; - export default class WhatsappService extends Service { - connections: { [key: string]: Connection } = {}; + connections: { [key: string]: any } = {}; + loginConnections: { [key: string]: any } = {}; static browserDescription: [string, string, string] = [ "Metamigo", @@ -38,6 +23,10 @@ export default class WhatsappService extends Service { super(server, options); } + getAuthDirectory(bot: Bot): string { + return `/baileys/${bot.id}`; + } + async initialize(): Promise { this.updateConnections(); } @@ -47,93 +36,82 @@ export default class WhatsappService extends Service { } private async sleep(ms: number): Promise { - console.log(`pausing ${ms}`); - // eslint-disable-next-line no-new - new Promise((resolve) => { - setTimeout(resolve, ms); - }); + console.log(`pausing ${ms}`) + return new Promise(resolve => setTimeout(resolve, ms)); } private async resetConnections() { for (const connection of Object.values(this.connections)) { try { - connection.end(undefined); + connection.end(null) } catch (error) { console.log(error); } } - this.connections = {}; } - private createConnection( - bot: Bot, - server: Server, - options: Omit, - authCompleteCallback?: () => void - ) { - const { state, saveState } = useDatabaseAuthState(bot, server); - const connection = makeWASocket({ ...options, auth: state }); + + private async createConnection(bot: Bot, server: Server, options: any, authCompleteCallback?: any) { + const directory = this.getAuthDirectory(bot); + const { state, saveCreds } = await useMultiFileAuthState(directory); + const msgRetryCounterMap: MessageRetryMap = {} + const socket = makeWASocket({ + ...options, + auth: state, + msgRetryCounterMap, + shouldIgnoreJid: jid => isJidBroadcast(jid) || isJidStatusBroadcast(jid), + }); let pause = 5000; - connection.ev.on("connection.update", async (update) => { - console.log(`Connection updated ${JSON.stringify(update, undefined, 2)}`); - const { - connection: connectionState, - lastDisconnect, - qr, - isNewLogin, - } = update; - if (qr) { - console.log("got qr code"); - await this.server.db().whatsappBots.updateQR(bot, qr); - } else if (isNewLogin) { - console.log("got new login"); - } else if (connectionState === "open") { - console.log("opened connection"); - } else if (connectionState === "close") { - console.log("connection closed due to", lastDisconnect.error); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const disconnectStatusCode = (lastDisconnect?.error as any)?.output - ?.statusCode; - if (disconnectStatusCode === DisconnectReason.restartRequired) { - console.log("reconnecting after got new login"); - const updatedBot = await this.findById(bot.id); - this.createConnection(updatedBot, server, options); - authCompleteCallback(); - } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { - console.log("reconnecting"); - await this.sleep(pause); - pause *= 2; - this.createConnection(bot, server, options); + + socket.ev.process( + async (events) => { + if (events['connection.update']) { + const update = events['connection.update'] + const { connection: connectionState, lastDisconnect, qr, isNewLogin } = update + if (qr) { + console.log('got qr code') + await this.server.db().whatsappBots.updateQR(bot, qr); + } else if (isNewLogin) { + console.log("got new login") + await this.server.db().whatsappBots.updateVerified(bot, true); + } else if (connectionState === 'open') { + console.log('opened connection') + } else if (connectionState === "close") { + console.log('connection closed due to ', lastDisconnect.error) + const disconnectStatusCode = (lastDisconnect?.error as any)?.output?.statusCode + + if (disconnectStatusCode === DisconnectReason.restartRequired) { + console.log('reconnecting after got new login') + const updatedBot = await this.findById(bot.id); + await this.createConnection(updatedBot, server, options) + authCompleteCallback?.() + } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { + console.log('reconnecting') + await this.sleep(pause) + pause *= 2 + this.createConnection(bot, server, options) + } + } + } + + if (events['creds.update']) { + console.log("creds update") + await saveCreds() + } + + if (events['messages.upsert']) { + console.log("messages upsert") + const upsert = events['messages.upsert'] + const { messages } = upsert + if (messages) { + await this.queueUnreadMessages(bot, messages); + } } } - }); + ) - connection.ev.process(async (events) => { - if (events["messaging-history.set"]) { - const { chats, contacts, messages, isLatest } = - events["messaging-history.set"]; - console.log( - `recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})` - ); - } - }); - - connection.ev.on("messages.upsert", async (m) => { - console.log("messages upsert"); - const { messages } = m; - if (messages) { - await this.queueUnreadMessages(bot, messages); - } - }); - connection.ev.on("messages.update", (m) => console.log(m)); - connection.ev.on("message-receipt.update", (m) => console.log(m)); - connection.ev.on("presence.update", (m) => console.log(m)); - connection.ev.on("chats.update", (m) => console.log(m)); - connection.ev.on("contacts.upsert", (m) => console.log(m)); - connection.ev.on("creds.update", saveState); - - this.connections[bot.id] = connection; + this.connections[bot.id] = { socket, msgRetryCounterMap }; } private async updateConnections() { @@ -142,66 +120,69 @@ export default class WhatsappService extends Service { const bots = await this.server.db().whatsappBots.findAll(); for await (const bot of bots) { if (bot.isVerified) { - this.createConnection(bot, this.server, { - browser: WhatsappService.browserDescription, - printQRInTerminal: false, - version: [2, 2204, 13], - }); + const { version, isLatest } = await fetchLatestBaileysVersion() + console.log(`using WA v${version.join('.')}, isLatest: ${isLatest}`) + + await this.createConnection( + bot, + this.server, + { + browser: WhatsappService.browserDescription, + printQRInTerminal: false, + version + }) } } } - private async queueMessage(bot: Bot, webMessageInfo: proto.IWebMessageInfo) { - const { key, message, messageTimestamp } = webMessageInfo; - const { remoteJid } = key; + private async queueMessage(bot: Bot, webMessageInfo: proto.WebMessageInfo) { + const { key: { id, fromMe }, message, messageTimestamp } = webMessageInfo; - const {fromMe, id: keyId} = key; - - if (!fromMe && message && remoteJid !== "status@broadcast") { - const { audioMessage, documentMessage, imageMessage, videoMessage } = - message; + if (!fromMe && message) { const isMediaMessage = - audioMessage || documentMessage || imageMessage || videoMessage; + message.audioMessage || + message.documentMessage || + message.imageMessage || + message.videoMessage; - const messageContent = Object.values(message)[0]; + const messageContent = Object.values(message)[0] let messageType: MediaType; let attachment: string; let filename: string; let mimetype: string; if (isMediaMessage) { - if (audioMessage) { + if (message.audioMessage) { messageType = "audio"; - filename = keyId + "." + audioMessage.mimetype.split("/").pop(); - mimetype = audioMessage.mimetype; - } else if (documentMessage) { + filename = + id + "." + message.audioMessage.mimetype.split("/").pop(); + mimetype = message.audioMessage.mimetype; + } else if (message.documentMessage) { messageType = "document"; - filename = documentMessage.fileName; - mimetype = documentMessage.mimetype; - } else if (imageMessage) { + filename = message.documentMessage.fileName; + mimetype = message.documentMessage.mimetype; + } else if (message.imageMessage) { messageType = "image"; - filename = keyId + "." + imageMessage.mimetype.split("/").pop(); - mimetype = imageMessage.mimetype; - } else if (videoMessage) { - messageType = "video"; - filename = keyId + "." + videoMessage.mimetype.split("/").pop(); - mimetype = videoMessage.mimetype; + filename = + id + "." + message.imageMessage.mimetype.split("/").pop(); + mimetype = message.imageMessage.mimetype; + } else if (message.videoMessage) { + messageType = "video" + filename = + id + "." + message.videoMessage.mimetype.split("/").pop(); + mimetype = message.videoMessage.mimetype; } - const stream = await downloadContentFromMessage( - messageContent, - messageType - ); - let buffer = Buffer.from([]); + const stream = await downloadContentFromMessage(messageContent, messageType) + let buffer = Buffer.from([]) for await (const chunk of stream) { - buffer = Buffer.concat([buffer, chunk]); + buffer = Buffer.concat([buffer, chunk]) } - attachment = buffer.toString("base64"); } if (messageContent || attachment) { const receivedMessage = { - waMessageId: keyId, + waMessageId: id, waMessage: JSON.stringify(webMessageInfo), waTimestamp: new Date((messageTimestamp as number) * 1000), attachment, @@ -212,13 +193,13 @@ export default class WhatsappService extends Service { }; workerUtils.addJob("whatsapp-message", receivedMessage, { - jobKey: keyId, + jobKey: id, }); } } } - private async queueUnreadMessages(bot: Bot, messages: proto.IWebMessageInfo[]) { + private async queueUnreadMessages(bot: Bot, messages: any[]) { for await (const message of messages) { await this.queueMessage(bot, message); } @@ -239,6 +220,18 @@ export default class WhatsappService extends Service { return row; } + async unverify(bot: Bot): Promise { + const directory = this.getAuthDirectory(bot); + fs.rmSync(directory, { recursive: true, force: true }); + return this.server.db().whatsappBots.updateVerified(bot, false); + } + + async remove(bot: Bot): Promise { + const directory = this.getAuthDirectory(bot); + fs.rmSync(directory, { recursive: true, force: true }); + return this.server.db().whatsappBots.remove(bot); + } + async findAll(): Promise { return this.server.db().whatsappBots.findAll(); } @@ -252,22 +245,18 @@ export default class WhatsappService extends Service { } async register(bot: Bot, callback: AuthCompleteCallback): Promise { - await this.createConnection( - bot, - this.server, - { version: [2, 2204, 13] }, - callback - ); + const { version } = await fetchLatestBaileysVersion() + await this.createConnection(bot, this.server, { version }, callback); } async send(bot: Bot, phoneNumber: string, message: string): Promise { - const connection = this.connections[bot.id]; + const connection = this.connections[bot.id]?.socket; const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`; await connection.sendMessage(recipient, { text: message }); } async receiveSince(bot: Bot, lastReceivedDate: Date): Promise { - const connection = this.connections[bot.id]; + const connection = this.connections[bot.id]?.socket; const messages = await connection.messagesReceivedAfter( lastReceivedDate, false @@ -277,12 +266,8 @@ export default class WhatsappService extends Service { } } - async receive(bot: Bot, _lastReceivedDate: Date): Promise { - const connection = this.connections[bot.id]; - // const messages = await connection.messagesReceivedAfter( - // lastReceivedDate, - // false - // ); + async receive(bot: Bot, lastReceivedDate: Date): Promise { + const connection = this.connections[bot.id]?.socket; const messages = await connection.loadAllUnreadMessages(); return messages; diff --git a/apps/metamigo-frontend/components/whatsapp/bots/WhatsappBotShow.tsx b/apps/metamigo-frontend/components/whatsapp/bots/WhatsappBotShow.tsx index 7dba8a8..455041c 100644 --- a/apps/metamigo-frontend/components/whatsapp/bots/WhatsappBotShow.tsx +++ b/apps/metamigo-frontend/components/whatsapp/bots/WhatsappBotShow.tsx @@ -123,14 +123,6 @@ const WhatsappBotShow = (props: ShowProps) => { resource: "whatsappBots", payload: { id: props.id }, }); - const [unverify] = useMutation({ - type: "update", - resource: "whatsappBots", - payload: { - id: props.id, - data: { isVerified: false, qrCode: null, authInfo: null }, - }, - }); const { data: registerData, error: registerError } = useSWR( data && !data?.isVerified @@ -139,6 +131,16 @@ const WhatsappBotShow = (props: ShowProps) => { { refreshInterval: 59000 } ); + const unverifyBot = async () => { + await fetch(`/api/v1/whatsapp/bots/${props.id}/unverify`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ verified: false }), + }); + }; + console.log({ registerData, registerError }); useEffect(() => { @@ -160,7 +162,7 @@ const WhatsappBotShow = (props: ShowProps) => { color="primary" size="small" style={{ color: "black", backgroundColor: "#ddd" }} - onClick={unverify} + onClick={async () => unverifyBot()} > Unverify diff --git a/apps/metamigo-worker/index.ts b/apps/metamigo-worker/index.ts index 0d1fb5c..7cbfec4 100644 --- a/apps/metamigo-worker/index.ts +++ b/apps/metamigo-worker/index.ts @@ -1,4 +1,5 @@ import * as Worker from "graphile-worker"; +import { parseCronItems } from "graphile-worker"; import { defState } from "@digiresilience/montar"; import config from "config"; import { initPgp } from "./db"; @@ -14,7 +15,14 @@ const logFactory = (scope: any) => (level: any, message: any, meta: any) => { }; export const configWorker = async (): Promise => { - const { connection, concurrency, pollInterval } = config.worker; + const { + connection, + concurrency, + pollInterval, + leafcutter: { + enabled: leafcutterEnabled + } + } = config.worker; logger.info({ concurrency, pollInterval }, "Starting worker"); return { concurrency, @@ -23,6 +31,10 @@ export const configWorker = async (): Promise => { connectionString: connection, // eslint-disable-next-line unicorn/prefer-module taskDirectory: `${__dirname}/tasks`, + parsedCronItems: parseCronItems(leafcutterEnabled ? + [{ task: "import-label-studio", pattern: "*/15 * * * *" }, + { task: "import-leafcutter", pattern: "*/17 * * * *" }] : + []) }; }; diff --git a/apps/metamigo-worker/package.json b/apps/metamigo-worker/package.json index 98c10ee..e23cefe 100644 --- a/apps/metamigo-worker/package.json +++ b/apps/metamigo-worker/package.json @@ -7,6 +7,8 @@ "dependencies": { "graphile-worker": "^0.13.0", "remeda": "^1.6.0", + "html-to-text": "^8.2.0", + "node-fetch": "^2", "@digiresilience/montar": "*", "@digiresilience/metamigo-common": "*", "@digiresilience/metamigo-config": "*", @@ -52,4 +54,4 @@ "watch:build": "tsc -p tsconfig.json -w", "watch:test": "yarn test:jest --watchAll" } -} \ No newline at end of file +} diff --git a/apps/metamigo-worker/tag-map.ts b/apps/metamigo-worker/tag-map.ts new file mode 100644 index 0000000..c215121 --- /dev/null +++ b/apps/metamigo-worker/tag-map.ts @@ -0,0 +1,69 @@ +export const tagMap = { + AccountImpersonation: [ + { field: "incidentType tag", value: "account-impersonation" }, + ], + AppleID: [{ field: "incidentType tag", value: "malfunction-failure" }], + Blocked: [{ field: "incidentType tag", value: "account-deactivation" }], + CyberBullying: [{ field: "incidentType tag", value: "cyber-bullying" }], + DeviceSuspiciousBehavior: [ + { field: "incidentType tag", value: "compromise-device" }, + ], + Doxxing: [{ field: "incidentType tag", value: "doxxing" }], + DSTips: [{ field: "incidentType tag", value: "informational" }], + HackedLaptop: [ + { field: "incidentType tag", value: "compromised-device" }, + { field: "device tag", value: "laptop" }, + ], + "Hacked/StolenAccount": [ + { field: "incidentType tag", value: "compromised-account" }, + ], + HateSpeech: [{ field: "incidentType tag", value: "hate-speech" }], + InfectedPhone: [ + { field: "incidentType tag", value: "malware" }, + { field: "device tag", value: "smartphone" }, + ], + Kidnapping: [{ field: "incidentType tag", value: "kidnapping" }], + LaptopGiveaway: [{ field: "incidentType tag", value: "other" }], + ForensicAnalysis: [{ field: "incidentType tag", value: "malware" }], + ISF: [{ field: "incidentType tag", value: "other" }], + NumberBanned: [ + { field: "incidentType tag", value: "disruption" }, + { field: "device tag", value: "smartphone" }, + ], + OnlineHarassment: [{ field: "incidentType tag", value: "online-harassment" }], + PhoneHarassment: [{ field: "incidentType tag", value: "phone-harassment" }], + PoliticalAds: [{ field: "incidentType tag", value: "spam" }], + SeizedPhone: [ + { field: "incidentType tag", value: "confiscation" }, + { field: "device tag", value: "smartphone" }, + ], + SexED: [{ field: "incidentType tag", value: "informational" }], + Sextortion: [{ field: "incidentType tag", value: "sextortion" }], + Spam: [{ field: "incidentType tag", value: "spam" }], + SuspendedAccount: [ + { field: "incidentType tag", value: "account-suspension" }, + ], + SuspendedActivities: [ + { field: "incidentType tag", value: "content-moderation" }, + ], + SuspendedGroup: [{ field: "incidentType tag", value: "account-suspension" }], + SuspendedPage: [{ field: "incidentType tag", value: "account-suspension" }], + "Stolen/LostPhone": [ + { field: "incidentType tag", value: "loss" }, + { field: "device tag", value: "smartphone" }, + ], + Facebook: [{ field: "platform tag", value: "facebook" }], + Google: [{ field: "platform tag", value: "google" }], + Instagram: [{ field: "platform tag", value: "instagram" }], + SMS: [{ field: "service tag", value: "sms" }], + Twitter: [{ field: "platform tag", value: "twitter" }], + Website: [{ field: "service tag", value: "website" }], + WhatsApp: [{ field: "platform tag", value: "whatsapp" }], + YouTube: [{ field: "platform tag", value: "youtube" }], + Linkedin: [{ field: "platform tag", value: "linkedin" }], + PoliticalActivist: [{ field: "targetedGroup tag", value: "policy-politics" }], + ElectoralCandidate: [ + { field: "targetedGroup tag", value: "policy-politics" }, + ], + PhishingLink: [{ field: "incidentType tag", value: "phishing" }], +}; diff --git a/apps/metamigo-worker/tasks/import-label-studio.ts b/apps/metamigo-worker/tasks/import-label-studio.ts new file mode 100644 index 0000000..084ccf7 --- /dev/null +++ b/apps/metamigo-worker/tasks/import-label-studio.ts @@ -0,0 +1,176 @@ +/* eslint-disable camelcase */ +import { convert } from "html-to-text"; +import fetch from "node-fetch"; +import { URLSearchParams } from "url"; +import { withDb, AppDatabase } from "../db"; +import { loadConfig } from "@digiresilience/metamigo-config"; +import { tagMap } from "../lib/tag-map" + +type FormattedZammadTicket = { + data: Record, + predictions: Record[] +} + +const getZammadTickets = async (page: number, minUpdatedTimestamp: Date): Promise<[boolean, FormattedZammadTicket[]]> => { + const { leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId } } = await loadConfig(); + const headers = { Authorization: `Token ${zammadApiKey}` }; + let shouldContinue = false; + const docs = []; + const ticketsQuery = new URLSearchParams({ + "expand": "true", + "sort_by": "updated_at", + "order_by": "asc", + "query": "state.name: closed", + "per_page": "25", + "page": `${page}`, + }); + const rawTickets = await fetch(`${zammadApiUrl}/tickets/search?${ticketsQuery}`, + { headers } + ); + const tickets = await rawTickets.json(); + console.log({ tickets }) + if (!tickets || tickets.length === 0) { + return [shouldContinue, docs]; + } + + for await (const ticket of tickets) { + const { id: source_id, created_at, updated_at, close_at } = ticket; + const source_created_at = new Date(created_at); + const source_updated_at = new Date(updated_at); + const source_closed_at = new Date(close_at); + shouldContinue = true; + + if (source_closed_at <= minUpdatedTimestamp) { + console.log(`Skipping ticket`, { source_id, source_updated_at, source_closed_at, minUpdatedTimestamp }); + continue; + } + + console.log(`Processing ticket`, { source_id, source_updated_at, source_closed_at, minUpdatedTimestamp }); + + const rawArticles = await fetch(`${zammadApiUrl}/ticket_articles/by_ticket/${source_id}`, + { headers } + ); + const articles = await rawArticles.json(); + let articleText = ""; + + for (const article of articles) { + const { content_type: contentType, body } = article; + + if (contentType === "text/html") { + const cleanArticleText = convert(body); + articleText += cleanArticleText + "\n\n"; + } else { + articleText += body + "\n\n"; + } + } + + const tagsQuery = new URLSearchParams({ + object: "Ticket", + o_id: source_id, + }); + + const rawTags = await fetch(`${zammadApiUrl}/tags?${tagsQuery}`, { headers }); + const { tags } = await rawTags.json(); + const transformedTags = []; + for (const tag of tags) { + const outputs = tagMap[tag]; + if (outputs) { + transformedTags.push(...outputs); + } + } + + const doc: FormattedZammadTicket = { + data: { + ticket: articleText, + contributor_id: contributorId, + source_id, + source_closed_at, + source_created_at, + source_updated_at, + }, + predictions: [] + }; + + const result = transformedTags.map((tag) => { + return { + type: "choices", + value: { + choices: [tag.value], + }, + to_name: "ticket", + from_name: tag.field, + }; + }); + + if (result.length > 0) { + doc.predictions.push({ + model_version: `${contributorName}TranslatorV1`, + result, + }) + } + + docs.push(doc); + } + + return [shouldContinue, docs]; +} + +const fetchFromZammad = async (minUpdatedTimestamp: Date): Promise => { + const pages = [...Array.from({ length: 10000 }).keys()]; + const allTickets: FormattedZammadTicket[] = []; + + for await (const page of pages) { + const [shouldContinue, tickets] = await getZammadTickets(page + 1, minUpdatedTimestamp); + + if (!shouldContinue) { + break; + } + + if (tickets.length > 0) { + allTickets.push(...tickets); + } + } + + return allTickets; +}; + +const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => { + const { leafcutter: { labelStudioApiUrl, labelStudioApiKey } } = await loadConfig(); + + const headers = { + Authorization: `Token ${labelStudioApiKey}`, + "Content-Type": "application/json", + Accept: "application/json", + }; + + for await (const ticket of tickets) { + const res = await fetch(`${labelStudioApiUrl}/projects/1/import`, { + method: "POST", + headers, + body: JSON.stringify([ticket]), + }); + const importResult = await res.json(); + + console.log(JSON.stringify(importResult, undefined, 2)); + } +} + +const importLabelStudioTask = async (): Promise => { + withDb(async (db: AppDatabase) => { + const { leafcutter: { contributorName } } = await loadConfig(); + const settingName = `${contributorName}ImportLabelStudioTask`; + const res: any = await db.settings.findByName(settingName); + const startTimestamp = res?.value?.minUpdatedTimestamp ? new Date(res.value.minUpdatedTimestamp as string) : new Date("2023-03-01"); + const tickets = await fetchFromZammad(startTimestamp); + + if (tickets.length > 0) { + await sendToLabelStudio(tickets); + const lastTicket = tickets.pop(); + const newLastTimestamp = lastTicket.data.source_closed_at; + console.log({ newLastTimestamp }) + await db.settings.upsert(settingName, { minUpdatedTimestamp: newLastTimestamp }) + } + }); +}; + +export default importLabelStudioTask; diff --git a/apps/metamigo-worker/tasks/import-leafcutter.ts b/apps/metamigo-worker/tasks/import-leafcutter.ts new file mode 100644 index 0000000..9d8f389 --- /dev/null +++ b/apps/metamigo-worker/tasks/import-leafcutter.ts @@ -0,0 +1,164 @@ +/* eslint-disable camelcase */ +import fetch from "node-fetch"; +import { URLSearchParams } from "url"; +import { withDb, AppDatabase } from "../db"; +import { loadConfig } from "@digiresilience/metamigo-config"; + +type LabelStudioTicket = { + id: string + is_labeled: boolean + annotations: Record[] + data: Record + updated_at: string; +} + +type LeafcutterTicket = { + id: string + incident: string[] + technology: string[] + targeted_group: string[] + country: string[] + region: string[] + continent: string[] + date: Date + origin: string + origin_id: string + source_created_at: string + source_updated_at: string +} + +const getLabelStudioTickets = async (page: number): Promise => { + const { + leafcutter: { + labelStudioApiUrl, + labelStudioApiKey, + } + } = await loadConfig(); + + const headers = { + Authorization: `Token ${labelStudioApiKey}`, + Accept: "application/json", + }; + const ticketsQuery = new URLSearchParams({ + page_size: "50", + page: `${page}`, + }); + console.log({ url: `${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}` }) + const res = await fetch(`${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}`, + { headers }); + console.log({ res }) + const tasksResult = await res.json(); + console.log({ tasksResult }); + + return tasksResult; +} + +const fetchFromLabelStudio = async (minUpdatedTimestamp: Date): Promise => { + const pages = [...Array.from({ length: 10000 }).keys()]; + const allDocs: LabelStudioTicket[] = []; + + for await (const page of pages) { + const docs = await getLabelStudioTickets(page + 1); + console.log({ page, docs }) + + if (docs && docs.length > 0) { + for (const doc of docs) { + const updatedAt = new Date(doc.updated_at); + console.log({ updatedAt, minUpdatedTimestamp }); + if (updatedAt > minUpdatedTimestamp) { + console.log(`Adding doc`, { doc }) + allDocs.push(doc) + } + } + } else { + break; + } + } + + console.log({ allDocs }) + return allDocs; +} + +const sendToLeafcutter = async (tickets: LabelStudioTicket[]) => { + const { + leafcutter: { + contributorId, + opensearchApiUrl, + opensearchUsername, + opensearchPassword + } + } = await loadConfig(); + + console.log({ tickets }) + const filteredTickets = tickets.filter((ticket) => ticket.is_labeled); + console.log({ filteredTickets }) + const finalTickets: LeafcutterTicket[] = filteredTickets.map((ticket) => { + const { + id, + annotations, + data: { + source_id, + source_created_at, + source_updated_at + } + } = ticket; + + const getTags = (tags: Record[], name: string) => + tags + .filter((tag) => tag.from_name === name) + .map((tag) => tag.value.choices) + .flat(); + + const allTags = annotations.map(({ result }) => result).flat(); + const incident = getTags(allTags, "incidentType tag"); + const technology = getTags(allTags, "platform tag"); + const country = getTags(allTags, "country tag"); + const targetedGroup = getTags(allTags, "targetedGroup tag"); + + return { + id, + incident, + technology, + targeted_group: targetedGroup, + country, + region: [], + continent: [], + date: new Date(source_created_at as string), + origin: contributorId, + origin_id: source_id as string, + source_created_at: source_created_at as string, + source_updated_at: source_updated_at as string + }; + }); + + console.log("Sending to Leafcutter"); + console.log({ finalTickets }) + + const result = await fetch(opensearchApiUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Basic ${Buffer.from(`${opensearchUsername}:${opensearchPassword}`).toString("base64")}`, + }, + body: JSON.stringify({ tickets: finalTickets }), + }); + console.log({ result }); +}; + + +const importLeafcutterTask = async (): Promise => { + withDb(async (db: AppDatabase) => { + const { leafcutter: { contributorName } } = await loadConfig(); + const settingName = `${contributorName}ImportLeafcutterTask`; + const res: any = await db.settings.findByName(settingName); + const startTimestamp = res?.value?.minUpdatedTimestamp ? new Date(res.value.minUpdatedTimestamp as string) : new Date("2023-03-01"); + const newLastTimestamp = new Date(); + console.log({ contributorName, settingName, res, startTimestamp, newLastTimestamp }); + const tickets = await fetchFromLabelStudio(startTimestamp); + console.log({ tickets }) + await sendToLeafcutter(tickets); + await db.settings.upsert(settingName, { minUpdatedTimestamp: newLastTimestamp }) + }); +}; + +export default importLeafcutterTask; diff --git a/apps/metamigo-worker/tasks/signal-message.ts b/apps/metamigo-worker/tasks/signal-message.ts deleted file mode 100644 index d76d807..0000000 --- a/apps/metamigo-worker/tasks/signal-message.ts +++ /dev/null @@ -1,76 +0,0 @@ -/* eslint-disable camelcase */ -import { withDb, AppDatabase } from "../db"; -import workerUtils from "../utils"; - -interface WebhookPayload { - to: string; - from: string; - message_id: string; - sent_at: string; - message: string; - attachment: string; - filename: string; - mime_type: string; -} - -interface SignalMessageTaskOptions { - id: string; - source: string; - timestamp: string; - message: string; - attachments: unknown[]; - signalBotId: string; -} - -const formatPayload = ( - messageInfo: SignalMessageTaskOptions -): WebhookPayload => { - const { id, source, message, timestamp } = messageInfo; - - return { - to: "16464229653", - from: source, - message_id: id, - sent_at: timestamp, - message, - attachment: "", - filename: "test.png", - mime_type: "image/png", - }; -}; - -const notifyWebhooks = async ( - db: AppDatabase, - messageInfo: SignalMessageTaskOptions -) => { - const { id: messageID, signalBotId } = messageInfo; - const webhooks = await db.webhooks.findAllByBackendId("signal", signalBotId); - if (webhooks && webhooks.length === 0) return; - - webhooks.forEach(({ id }) => { - const payload = formatPayload(messageInfo); - console.log({ payload }); - workerUtils.addJob( - "notify-webhook", - { - payload, - webhookId: id, - }, - { - // this de-deduplicates the job - jobKey: `webhook-${id}-message-${messageID}`, - } - ); - }); -}; - -const signalMessageTask = async ( - options: SignalMessageTaskOptions -): Promise => { - console.log(options); - withDb(async (db: AppDatabase) => { - await notifyWebhooks(db, options); - }); -}; - -export default signalMessageTask; diff --git a/apps/metamigo-worker/tasks/signald-message.ts b/apps/metamigo-worker/tasks/signald-message.ts index 359bbdd..6cea8a5 100644 --- a/apps/metamigo-worker/tasks/signald-message.ts +++ b/apps/metamigo-worker/tasks/signald-message.ts @@ -19,10 +19,13 @@ interface SignaldMessageTaskOptions { message: IncomingMessagev1; botId: string; botPhoneNumber: string; + attachment: string; + filename: string; + mimetype: string; } const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => { - const { botId, botPhoneNumber, message } = opts; + const { botId, botPhoneNumber, message, attachment, filename, mimetype } = opts; const { source, timestamp, data_message: dataMessage } = message; const { number }: any = source; @@ -35,9 +38,9 @@ const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => { message_id: `${botId}-${timestamp}`, sent_at: `${timestamp}`, message: body, - attachment: null, - filename: null, - mime_type: null, + attachment, + filename, + mime_type: mimetype, }; };