Migrate changes from other Metamigo repo
This commit is contained in:
parent
8669b09224
commit
27810142b3
11 changed files with 615 additions and 235 deletions
176
apps/metamigo-worker/tasks/import-label-studio.ts
Normal file
176
apps/metamigo-worker/tasks/import-label-studio.ts
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
/* eslint-disable camelcase */
|
||||
import { convert } from "html-to-text";
|
||||
import fetch from "node-fetch";
|
||||
import { URLSearchParams } from "url";
|
||||
import { withDb, AppDatabase } from "../db";
|
||||
import { loadConfig } from "@digiresilience/metamigo-config";
|
||||
import { tagMap } from "../lib/tag-map"
|
||||
|
||||
type FormattedZammadTicket = {
|
||||
data: Record<string, unknown>,
|
||||
predictions: Record<string, unknown>[]
|
||||
}
|
||||
|
||||
const getZammadTickets = async (page: number, minUpdatedTimestamp: Date): Promise<[boolean, FormattedZammadTicket[]]> => {
|
||||
const { leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId } } = await loadConfig();
|
||||
const headers = { Authorization: `Token ${zammadApiKey}` };
|
||||
let shouldContinue = false;
|
||||
const docs = [];
|
||||
const ticketsQuery = new URLSearchParams({
|
||||
"expand": "true",
|
||||
"sort_by": "updated_at",
|
||||
"order_by": "asc",
|
||||
"query": "state.name: closed",
|
||||
"per_page": "25",
|
||||
"page": `${page}`,
|
||||
});
|
||||
const rawTickets = await fetch(`${zammadApiUrl}/tickets/search?${ticketsQuery}`,
|
||||
{ headers }
|
||||
);
|
||||
const tickets = await rawTickets.json();
|
||||
console.log({ tickets })
|
||||
if (!tickets || tickets.length === 0) {
|
||||
return [shouldContinue, docs];
|
||||
}
|
||||
|
||||
for await (const ticket of tickets) {
|
||||
const { id: source_id, created_at, updated_at, close_at } = ticket;
|
||||
const source_created_at = new Date(created_at);
|
||||
const source_updated_at = new Date(updated_at);
|
||||
const source_closed_at = new Date(close_at);
|
||||
shouldContinue = true;
|
||||
|
||||
if (source_closed_at <= minUpdatedTimestamp) {
|
||||
console.log(`Skipping ticket`, { source_id, source_updated_at, source_closed_at, minUpdatedTimestamp });
|
||||
continue;
|
||||
}
|
||||
|
||||
console.log(`Processing ticket`, { source_id, source_updated_at, source_closed_at, minUpdatedTimestamp });
|
||||
|
||||
const rawArticles = await fetch(`${zammadApiUrl}/ticket_articles/by_ticket/${source_id}`,
|
||||
{ headers }
|
||||
);
|
||||
const articles = await rawArticles.json();
|
||||
let articleText = "";
|
||||
|
||||
for (const article of articles) {
|
||||
const { content_type: contentType, body } = article;
|
||||
|
||||
if (contentType === "text/html") {
|
||||
const cleanArticleText = convert(body);
|
||||
articleText += cleanArticleText + "\n\n";
|
||||
} else {
|
||||
articleText += body + "\n\n";
|
||||
}
|
||||
}
|
||||
|
||||
const tagsQuery = new URLSearchParams({
|
||||
object: "Ticket",
|
||||
o_id: source_id,
|
||||
});
|
||||
|
||||
const rawTags = await fetch(`${zammadApiUrl}/tags?${tagsQuery}`, { headers });
|
||||
const { tags } = await rawTags.json();
|
||||
const transformedTags = [];
|
||||
for (const tag of tags) {
|
||||
const outputs = tagMap[tag];
|
||||
if (outputs) {
|
||||
transformedTags.push(...outputs);
|
||||
}
|
||||
}
|
||||
|
||||
const doc: FormattedZammadTicket = {
|
||||
data: {
|
||||
ticket: articleText,
|
||||
contributor_id: contributorId,
|
||||
source_id,
|
||||
source_closed_at,
|
||||
source_created_at,
|
||||
source_updated_at,
|
||||
},
|
||||
predictions: []
|
||||
};
|
||||
|
||||
const result = transformedTags.map((tag) => {
|
||||
return {
|
||||
type: "choices",
|
||||
value: {
|
||||
choices: [tag.value],
|
||||
},
|
||||
to_name: "ticket",
|
||||
from_name: tag.field,
|
||||
};
|
||||
});
|
||||
|
||||
if (result.length > 0) {
|
||||
doc.predictions.push({
|
||||
model_version: `${contributorName}TranslatorV1`,
|
||||
result,
|
||||
})
|
||||
}
|
||||
|
||||
docs.push(doc);
|
||||
}
|
||||
|
||||
return [shouldContinue, docs];
|
||||
}
|
||||
|
||||
const fetchFromZammad = async (minUpdatedTimestamp: Date): Promise<FormattedZammadTicket[]> => {
|
||||
const pages = [...Array.from({ length: 10000 }).keys()];
|
||||
const allTickets: FormattedZammadTicket[] = [];
|
||||
|
||||
for await (const page of pages) {
|
||||
const [shouldContinue, tickets] = await getZammadTickets(page + 1, minUpdatedTimestamp);
|
||||
|
||||
if (!shouldContinue) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (tickets.length > 0) {
|
||||
allTickets.push(...tickets);
|
||||
}
|
||||
}
|
||||
|
||||
return allTickets;
|
||||
};
|
||||
|
||||
const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => {
|
||||
const { leafcutter: { labelStudioApiUrl, labelStudioApiKey } } = await loadConfig();
|
||||
|
||||
const headers = {
|
||||
Authorization: `Token ${labelStudioApiKey}`,
|
||||
"Content-Type": "application/json",
|
||||
Accept: "application/json",
|
||||
};
|
||||
|
||||
for await (const ticket of tickets) {
|
||||
const res = await fetch(`${labelStudioApiUrl}/projects/1/import`, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify([ticket]),
|
||||
});
|
||||
const importResult = await res.json();
|
||||
|
||||
console.log(JSON.stringify(importResult, undefined, 2));
|
||||
}
|
||||
}
|
||||
|
||||
const importLabelStudioTask = async (): Promise<void> => {
|
||||
withDb(async (db: AppDatabase) => {
|
||||
const { leafcutter: { contributorName } } = await loadConfig();
|
||||
const settingName = `${contributorName}ImportLabelStudioTask`;
|
||||
const res: any = await db.settings.findByName(settingName);
|
||||
const startTimestamp = res?.value?.minUpdatedTimestamp ? new Date(res.value.minUpdatedTimestamp as string) : new Date("2023-03-01");
|
||||
const tickets = await fetchFromZammad(startTimestamp);
|
||||
|
||||
if (tickets.length > 0) {
|
||||
await sendToLabelStudio(tickets);
|
||||
const lastTicket = tickets.pop();
|
||||
const newLastTimestamp = lastTicket.data.source_closed_at;
|
||||
console.log({ newLastTimestamp })
|
||||
await db.settings.upsert(settingName, { minUpdatedTimestamp: newLastTimestamp })
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
export default importLabelStudioTask;
|
||||
164
apps/metamigo-worker/tasks/import-leafcutter.ts
Normal file
164
apps/metamigo-worker/tasks/import-leafcutter.ts
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
/* eslint-disable camelcase */
|
||||
import fetch from "node-fetch";
|
||||
import { URLSearchParams } from "url";
|
||||
import { withDb, AppDatabase } from "../db";
|
||||
import { loadConfig } from "@digiresilience/metamigo-config";
|
||||
|
||||
type LabelStudioTicket = {
|
||||
id: string
|
||||
is_labeled: boolean
|
||||
annotations: Record<string, unknown>[]
|
||||
data: Record<string, unknown>
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
type LeafcutterTicket = {
|
||||
id: string
|
||||
incident: string[]
|
||||
technology: string[]
|
||||
targeted_group: string[]
|
||||
country: string[]
|
||||
region: string[]
|
||||
continent: string[]
|
||||
date: Date
|
||||
origin: string
|
||||
origin_id: string
|
||||
source_created_at: string
|
||||
source_updated_at: string
|
||||
}
|
||||
|
||||
const getLabelStudioTickets = async (page: number): Promise<LabelStudioTicket[]> => {
|
||||
const {
|
||||
leafcutter: {
|
||||
labelStudioApiUrl,
|
||||
labelStudioApiKey,
|
||||
}
|
||||
} = await loadConfig();
|
||||
|
||||
const headers = {
|
||||
Authorization: `Token ${labelStudioApiKey}`,
|
||||
Accept: "application/json",
|
||||
};
|
||||
const ticketsQuery = new URLSearchParams({
|
||||
page_size: "50",
|
||||
page: `${page}`,
|
||||
});
|
||||
console.log({ url: `${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}` })
|
||||
const res = await fetch(`${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}`,
|
||||
{ headers });
|
||||
console.log({ res })
|
||||
const tasksResult = await res.json();
|
||||
console.log({ tasksResult });
|
||||
|
||||
return tasksResult;
|
||||
}
|
||||
|
||||
const fetchFromLabelStudio = async (minUpdatedTimestamp: Date): Promise<LabelStudioTicket[]> => {
|
||||
const pages = [...Array.from({ length: 10000 }).keys()];
|
||||
const allDocs: LabelStudioTicket[] = [];
|
||||
|
||||
for await (const page of pages) {
|
||||
const docs = await getLabelStudioTickets(page + 1);
|
||||
console.log({ page, docs })
|
||||
|
||||
if (docs && docs.length > 0) {
|
||||
for (const doc of docs) {
|
||||
const updatedAt = new Date(doc.updated_at);
|
||||
console.log({ updatedAt, minUpdatedTimestamp });
|
||||
if (updatedAt > minUpdatedTimestamp) {
|
||||
console.log(`Adding doc`, { doc })
|
||||
allDocs.push(doc)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
console.log({ allDocs })
|
||||
return allDocs;
|
||||
}
|
||||
|
||||
const sendToLeafcutter = async (tickets: LabelStudioTicket[]) => {
|
||||
const {
|
||||
leafcutter: {
|
||||
contributorId,
|
||||
opensearchApiUrl,
|
||||
opensearchUsername,
|
||||
opensearchPassword
|
||||
}
|
||||
} = await loadConfig();
|
||||
|
||||
console.log({ tickets })
|
||||
const filteredTickets = tickets.filter((ticket) => ticket.is_labeled);
|
||||
console.log({ filteredTickets })
|
||||
const finalTickets: LeafcutterTicket[] = filteredTickets.map((ticket) => {
|
||||
const {
|
||||
id,
|
||||
annotations,
|
||||
data: {
|
||||
source_id,
|
||||
source_created_at,
|
||||
source_updated_at
|
||||
}
|
||||
} = ticket;
|
||||
|
||||
const getTags = (tags: Record<string, any>[], name: string) =>
|
||||
tags
|
||||
.filter((tag) => tag.from_name === name)
|
||||
.map((tag) => tag.value.choices)
|
||||
.flat();
|
||||
|
||||
const allTags = annotations.map(({ result }) => result).flat();
|
||||
const incident = getTags(allTags, "incidentType tag");
|
||||
const technology = getTags(allTags, "platform tag");
|
||||
const country = getTags(allTags, "country tag");
|
||||
const targetedGroup = getTags(allTags, "targetedGroup tag");
|
||||
|
||||
return {
|
||||
id,
|
||||
incident,
|
||||
technology,
|
||||
targeted_group: targetedGroup,
|
||||
country,
|
||||
region: [],
|
||||
continent: [],
|
||||
date: new Date(source_created_at as string),
|
||||
origin: contributorId,
|
||||
origin_id: source_id as string,
|
||||
source_created_at: source_created_at as string,
|
||||
source_updated_at: source_updated_at as string
|
||||
};
|
||||
});
|
||||
|
||||
console.log("Sending to Leafcutter");
|
||||
console.log({ finalTickets })
|
||||
|
||||
const result = await fetch(opensearchApiUrl, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Basic ${Buffer.from(`${opensearchUsername}:${opensearchPassword}`).toString("base64")}`,
|
||||
},
|
||||
body: JSON.stringify({ tickets: finalTickets }),
|
||||
});
|
||||
console.log({ result });
|
||||
};
|
||||
|
||||
|
||||
const importLeafcutterTask = async (): Promise<void> => {
|
||||
withDb(async (db: AppDatabase) => {
|
||||
const { leafcutter: { contributorName } } = await loadConfig();
|
||||
const settingName = `${contributorName}ImportLeafcutterTask`;
|
||||
const res: any = await db.settings.findByName(settingName);
|
||||
const startTimestamp = res?.value?.minUpdatedTimestamp ? new Date(res.value.minUpdatedTimestamp as string) : new Date("2023-03-01");
|
||||
const newLastTimestamp = new Date();
|
||||
console.log({ contributorName, settingName, res, startTimestamp, newLastTimestamp });
|
||||
const tickets = await fetchFromLabelStudio(startTimestamp);
|
||||
console.log({ tickets })
|
||||
await sendToLeafcutter(tickets);
|
||||
await db.settings.upsert(settingName, { minUpdatedTimestamp: newLastTimestamp })
|
||||
});
|
||||
};
|
||||
|
||||
export default importLeafcutterTask;
|
||||
|
|
@ -1,76 +0,0 @@
|
|||
/* eslint-disable camelcase */
|
||||
import { withDb, AppDatabase } from "../db";
|
||||
import workerUtils from "../utils";
|
||||
|
||||
interface WebhookPayload {
|
||||
to: string;
|
||||
from: string;
|
||||
message_id: string;
|
||||
sent_at: string;
|
||||
message: string;
|
||||
attachment: string;
|
||||
filename: string;
|
||||
mime_type: string;
|
||||
}
|
||||
|
||||
interface SignalMessageTaskOptions {
|
||||
id: string;
|
||||
source: string;
|
||||
timestamp: string;
|
||||
message: string;
|
||||
attachments: unknown[];
|
||||
signalBotId: string;
|
||||
}
|
||||
|
||||
const formatPayload = (
|
||||
messageInfo: SignalMessageTaskOptions
|
||||
): WebhookPayload => {
|
||||
const { id, source, message, timestamp } = messageInfo;
|
||||
|
||||
return {
|
||||
to: "16464229653",
|
||||
from: source,
|
||||
message_id: id,
|
||||
sent_at: timestamp,
|
||||
message,
|
||||
attachment: "",
|
||||
filename: "test.png",
|
||||
mime_type: "image/png",
|
||||
};
|
||||
};
|
||||
|
||||
const notifyWebhooks = async (
|
||||
db: AppDatabase,
|
||||
messageInfo: SignalMessageTaskOptions
|
||||
) => {
|
||||
const { id: messageID, signalBotId } = messageInfo;
|
||||
const webhooks = await db.webhooks.findAllByBackendId("signal", signalBotId);
|
||||
if (webhooks && webhooks.length === 0) return;
|
||||
|
||||
webhooks.forEach(({ id }) => {
|
||||
const payload = formatPayload(messageInfo);
|
||||
console.log({ payload });
|
||||
workerUtils.addJob(
|
||||
"notify-webhook",
|
||||
{
|
||||
payload,
|
||||
webhookId: id,
|
||||
},
|
||||
{
|
||||
// this de-deduplicates the job
|
||||
jobKey: `webhook-${id}-message-${messageID}`,
|
||||
}
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
const signalMessageTask = async (
|
||||
options: SignalMessageTaskOptions
|
||||
): Promise<void> => {
|
||||
console.log(options);
|
||||
withDb(async (db: AppDatabase) => {
|
||||
await notifyWebhooks(db, options);
|
||||
});
|
||||
};
|
||||
|
||||
export default signalMessageTask;
|
||||
|
|
@ -19,10 +19,13 @@ interface SignaldMessageTaskOptions {
|
|||
message: IncomingMessagev1;
|
||||
botId: string;
|
||||
botPhoneNumber: string;
|
||||
attachment: string;
|
||||
filename: string;
|
||||
mimetype: string;
|
||||
}
|
||||
|
||||
const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
|
||||
const { botId, botPhoneNumber, message } = opts;
|
||||
const { botId, botPhoneNumber, message, attachment, filename, mimetype } = opts;
|
||||
const { source, timestamp, data_message: dataMessage } = message;
|
||||
|
||||
const { number }: any = source;
|
||||
|
|
@ -35,9 +38,9 @@ const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
|
|||
message_id: `${botId}-${timestamp}`,
|
||||
sent_at: `${timestamp}`,
|
||||
message: body,
|
||||
attachment: null,
|
||||
filename: null,
|
||||
mime_type: null,
|
||||
attachment,
|
||||
filename,
|
||||
mime_type: mimetype,
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue