Metamigo -> Bridge

This commit is contained in:
Darren Clarke 2024-04-21 09:44:30 +02:00
parent 242f3cf6b8
commit a445762a37
145 changed files with 396 additions and 16668 deletions

View file

@ -0,0 +1,13 @@
.git
.idea
**/node_modules
!/node_modules
**/build
**/dist
**/tmp
**/.env*
**/coverage
**/.next
**/amigo.*.json
**/cypress/videos
**/cypress/screenshots

View file

@ -0,0 +1 @@
require("../.eslintrc.js");

View file

@ -0,0 +1,3 @@
{
"presets": ["@digiresilience/babel-preset-bridge"]
}

View file

@ -0,0 +1,69 @@
/* eslint-disable camelcase */
import { SavedVoiceProvider } from "@digiresilience/bridge-db";
import Twilio from "twilio";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import { Zammad, getOrCreateUser } from "./zammad";
export const twilioClientFor = (
provider: SavedVoiceProvider,
): Twilio.Twilio => {
const { accountSid, apiKeySid, apiKeySecret } = provider.credentials;
if (!accountSid || !apiKeySid || !apiKeySecret)
throw new Error(
`twilio provider ${provider.name} does not have credentials`,
);
return Twilio(apiKeySid, apiKeySecret, {
accountSid,
});
};
export const createZammadTicket = async (
call: CallInstance,
mp3: Buffer,
): Promise<void> => {
const title = `Call from ${call.fromFormatted} at ${call.startTime}`;
const body = `<ul>
<li>Caller: ${call.fromFormatted}</li>
<li>Service Number: ${call.toFormatted}</li>
<li>Call Duration: ${call.duration} seconds</li>
<li>Start Time: ${call.startTime}</li>
<li>End Time: ${call.endTime}</li>
</ul>
<p>See the attached recording.</p>`;
const filename = `${call.sid}-${call.startTime}.mp3`;
const zammad = Zammad(
{
token: "EviH_WL0p6YUlCoIER7noAZEAPsYA_fVU4FZCKdpq525Vmzzvl8d7dNuP_8d-Amb",
},
"https://demo.digiresilience.org",
);
try {
const customer = await getOrCreateUser(zammad, call.fromFormatted);
await zammad.ticket.create({
title,
group: "Finances",
note: "This ticket was created automaticaly from a recorded phone call.",
customer_id: customer.id,
article: {
body,
subject: title,
content_type: "text/html",
type: "note",
attachments: [
{
filename,
data: mp3.toString("base64"),
"mime-type": "audio/mpeg",
},
],
},
});
} catch (error: any) {
console.log(Object.keys(error));
if (error.isBoom) {
console.log(error.output);
throw new Error("Failed to create zamamd ticket");
}
}
};

51
apps/bridge-worker/db.ts Normal file
View file

@ -0,0 +1,51 @@
import pgPromise from "pg-promise";
import * as pgMonitor from "pg-monitor";
import {
dbInitOptions,
IRepositories,
AppDatabase,
} from "@digiresilience/bridge-db";
import config from "@digiresilience/bridge-config";
import type { IInitOptions } from "pg-promise";
export const initDiagnostics = (
logSql: boolean,
initOpts: IInitOptions<IRepositories>,
): void => {
if (logSql) {
pgMonitor.attach(initOpts);
} else {
pgMonitor.attach(initOpts, ["error"]);
}
};
export const stopDiagnostics = (): void => pgMonitor.detach();
let pgp: any;
let pgpInitOptions: any;
export const initPgp = (): void => {
pgpInitOptions = dbInitOptions(config);
pgp = pgPromise(pgpInitOptions);
};
const initDb = (): AppDatabase => {
const db = pgp(config.db.connection);
return db;
};
export const stopDb = async (db: AppDatabase): Promise<void> => {
return db.$pool.end();
};
export const withDb = <T>(f: (db: AppDatabase) => Promise<T>): Promise<T> => {
const db = initDb();
initDiagnostics(config.logging.sql, pgpInitOptions);
try {
return f(db);
} finally {
stopDiagnostics();
}
};
export type { AppDatabase } from "@digiresilience/bridge-db";

View file

@ -0,0 +1,63 @@
import * as Worker from "graphile-worker";
import { parseCronItems } from "graphile-worker";
import { initPgp } from "./db.js";
import logger from "./logger.js";
import workerUtils from "./utils.js";
import { assertFfmpegAvailable } from "./lib/media-convert.js";
const logFactory = (scope: any) => (level: any, message: any, meta: any) => {
const pinoLevel = level === "warning" ? "warn" : level;
const childLogger = logger.child({ scope });
if (meta) childLogger[pinoLevel](meta, message);
else childLogger[pinoLevel](message);
};
export const configWorker = async (): Promise<Worker.RunnerOptions> => {
const {
connection,
concurrency,
pollInterval,
leafcutter: {
enabled: leafcutterEnabled
}
} = config.worker;
logger.info({ concurrency, pollInterval }, "Starting worker");
return {
concurrency,
pollInterval,
logger: new Worker.Logger(logFactory),
connectionString: connection,
// eslint-disable-next-line unicorn/prefer-module
taskDirectory: `${__dirname}/tasks`,
parsedCronItems: parseCronItems(leafcutterEnabled ?
[{ task: "import-label-studio", pattern: "*/15 * * * *" },
{ task: "import-leafcutter", pattern: "*/17 * * * *" }] :
[])
};
};
export const startWorker = async (): Promise<Worker.Runner> => {
// ensure ffmpeg is installed and working
await assertFfmpegAvailable();
logger.info("ffmpeg found");
await workerUtils.migrate();
logger.info("worker database migrated");
initPgp();
const workerConfig = await configWorker();
const worker = await Worker.run(workerConfig);
return worker;
};
export const stopWorker = async (): Promise<void> => {
await worker.stop();
};
const worker = defState("worker", {
start: startWorker,
stop: stopWorker,
});
export default worker;

View file

@ -0,0 +1,84 @@
import { Readable } from "stream";
import ffmpeg from "fluent-ffmpeg";
import * as R from "remeda";
const requiredCodecs = ["mp3", "webm", "wav"];
export interface AudioConvertOpts {
bitrate?: string;
audioCodec?: string;
format?: string;
}
const defaultAudioConvertOpts = {
bitrate: "32k",
audioCodec: "libmp3lame",
format: "mp3",
};
/**
* Converts an audio file to a different format. defaults to converting to mp3 with a 32k bitrate using the libmp3lame codec
*
* @param input the buffer containing the binary data of the input file
* @param opts options to control how the audio file is converted
* @return resolves to a buffer containing the binary data of the converted file
**/
export const convert = (
input: Buffer,
opts?: AudioConvertOpts
): Promise<Buffer> => {
const settings = { ...defaultAudioConvertOpts, ...opts };
return new Promise((resolve, reject) => {
const stream = Readable.from(input);
let out = Buffer.alloc(0);
const cmd = ffmpeg(stream)
.audioCodec(settings.audioCodec)
.audioBitrate(settings.bitrate)
.toFormat(settings.format)
.on("error", (err, stdout, stderr) => {
console.error(err.message);
console.log("FFMPEG OUTPUT");
console.log(stdout);
console.log("FFMPEG ERROR");
console.log(stderr);
reject(err);
})
.on("end", () => {
resolve(out);
});
const outstream = cmd.pipe();
outstream.on("data", (chunk: Buffer) => {
out = Buffer.concat([out, chunk]);
});
});
};
/**
* Check if ffmpeg is installed and usable. Checks for required codecs and a working ffmpeg installation.
*
* @return resolves to true if ffmpeg is installed and usable
* */
export const selfCheck = (): Promise<boolean> => {
return new Promise((resolve) => {
ffmpeg.getAvailableFormats((err, codecs) => {
if (err) {
console.error("FFMPEG error:", err);
resolve(false);
}
const preds = R.map(requiredCodecs, (codec) => (available: any) =>
available[codec] && available[codec].canDemux && available[codec].canMux
);
resolve(R.allPass(codecs, preds));
});
});
};
export const assertFfmpegAvailable = async (): Promise<void> => {
const r = await selfCheck();
if (!r)
throw new Error(
`ffmpeg is not installed, could not be located, or does not support the required codecs: ${requiredCodecs}`
);
};

View file

@ -0,0 +1,69 @@
export const tagMap = {
AccountImpersonation: [
{ field: "incidentType tag", value: "account-impersonation" },
],
AppleID: [{ field: "incidentType tag", value: "malfunction-failure" }],
Blocked: [{ field: "incidentType tag", value: "account-deactivation" }],
CyberBullying: [{ field: "incidentType tag", value: "cyber-bullying" }],
DeviceSuspiciousBehavior: [
{ field: "incidentType tag", value: "compromise-device" },
],
Doxxing: [{ field: "incidentType tag", value: "doxxing" }],
DSTips: [{ field: "incidentType tag", value: "informational" }],
HackedLaptop: [
{ field: "incidentType tag", value: "compromised-device" },
{ field: "device tag", value: "laptop" },
],
"Hacked/StolenAccount": [
{ field: "incidentType tag", value: "compromised-account" },
],
HateSpeech: [{ field: "incidentType tag", value: "hate-speech" }],
InfectedPhone: [
{ field: "incidentType tag", value: "malware" },
{ field: "device tag", value: "smartphone" },
],
Kidnapping: [{ field: "incidentType tag", value: "kidnapping" }],
LaptopGiveaway: [{ field: "incidentType tag", value: "other" }],
ForensicAnalysis: [{ field: "incidentType tag", value: "malware" }],
ISF: [{ field: "incidentType tag", value: "other" }],
NumberBanned: [
{ field: "incidentType tag", value: "disruption" },
{ field: "device tag", value: "smartphone" },
],
OnlineHarassment: [{ field: "incidentType tag", value: "online-harassment" }],
PhoneHarassment: [{ field: "incidentType tag", value: "phone-harassment" }],
PoliticalAds: [{ field: "incidentType tag", value: "spam" }],
SeizedPhone: [
{ field: "incidentType tag", value: "confiscation" },
{ field: "device tag", value: "smartphone" },
],
SexED: [{ field: "incidentType tag", value: "informational" }],
Sextortion: [{ field: "incidentType tag", value: "sextortion" }],
Spam: [{ field: "incidentType tag", value: "spam" }],
SuspendedAccount: [
{ field: "incidentType tag", value: "account-suspension" },
],
SuspendedActivities: [
{ field: "incidentType tag", value: "content-moderation" },
],
SuspendedGroup: [{ field: "incidentType tag", value: "account-suspension" }],
SuspendedPage: [{ field: "incidentType tag", value: "account-suspension" }],
"Stolen/LostPhone": [
{ field: "incidentType tag", value: "loss" },
{ field: "device tag", value: "smartphone" },
],
Facebook: [{ field: "platform tag", value: "facebook" }],
Google: [{ field: "platform tag", value: "google" }],
Instagram: [{ field: "platform tag", value: "instagram" }],
SMS: [{ field: "service tag", value: "sms" }],
Twitter: [{ field: "platform tag", value: "twitter" }],
Website: [{ field: "service tag", value: "website" }],
WhatsApp: [{ field: "platform tag", value: "whatsapp" }],
YouTube: [{ field: "platform tag", value: "youtube" }],
Linkedin: [{ field: "platform tag", value: "linkedin" }],
PoliticalActivist: [{ field: "targetedGroup tag", value: "policy-politics" }],
ElectoralCandidate: [
{ field: "targetedGroup tag", value: "policy-politics" },
],
PhishingLink: [{ field: "incidentType tag", value: "phishing" }],
};

View file

@ -0,0 +1,8 @@
import { defState } from "@digiresilience/montar";
import { configureLogger } from "@digiresilience/bridge-common";
import config from "@digiresilience/bridge-config";
export const logger = defState("workerLogger", {
start: async () => configureLogger(config),
});
export default logger;

View file

@ -0,0 +1,55 @@
{
"name": "bridge-worker",
"version": "0.2.0",
"main": "build/main/index.js",
"type": "module",
"author": "Abel Luck <abel@guardianproject.info>",
"license": "AGPL-3.0-or-later",
"dependencies": {
"graphile-worker": "^0.16.5",
"html-to-text": "^9.0.5",
"node-fetch": "^3",
"pg-promise": "^11.6.0",
"remeda": "^1.60.1",
"twilio": "^5.0.4"
},
"devDependencies": {
"ts-config": "*",
"@babel/core": "7.24.4",
"@babel/preset-env": "7.24.4",
"@babel/preset-typescript": "7.24.1",
"@types/fluent-ffmpeg": "^2.1.24",
"@types/jest": "^29.5.12",
"eslint": "^9.0.0",
"jest": "^29.7.0",
"jest-circus": "^29.7.0",
"jest-junit": "^16.0.0",
"nodemon": "^3.1.0",
"pino-pretty": "^11.0.0",
"prettier": "^3.2.5",
"ts-node": "^10.9.2",
"typedoc": "^0.25.13",
"typescript": "^5.4.5"
},
"nodemonConfig": {
"ignore": [
"docs/*"
],
"ext": "ts,json,js"
},
"scripts": {
"build-xxx": "tsc -p tsconfig.json",
"build-test": "tsc -p tsconfig.json",
"doc:html": "typedoc src/ --exclude '**/*.test.ts' --exclude '**/*.spec.ts' --name $npm_package_name --readme README.md --target es2019 --mode file --out build/docs",
"doc": "yarn run doc:html",
"fix:lint": "eslint src --ext .ts --fix",
"fix:prettier": "prettier \"src/**/*.ts\" --write",
"test:jest": "JEST_CIRCUS=1 jest --coverage --forceExit --detectOpenHandles --reporters=default --reporters=jest-junit",
"test:jest-verbose": "yarn test:jest --verbose --silent=false",
"test": "yarn test:jest",
"lint": "yarn lint:lint && yarn lint:prettier",
"lint:lint": "eslint src --ext .ts",
"lint:prettier": "prettier \"src/**/*.ts\" --list-different",
"watch:test": "yarn test:jest --watchAll"
}
}

View file

@ -0,0 +1,208 @@
/* eslint-disable camelcase */
import { convert } from "html-to-text";
import fetch from "node-fetch";
import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../db";
import { loadConfig } from "@digiresilience/bridge-config";
import { tagMap } from "../lib/tag-map";
type FormattedZammadTicket = {
data: Record<string, unknown>;
predictions: Record<string, unknown>[];
};
const getZammadTickets = async (
page: number,
minUpdatedTimestamp: Date,
): Promise<[boolean, FormattedZammadTicket[]]> => {
const {
leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId },
} = await loadConfig();
const headers = { Authorization: `Token ${zammadApiKey}` };
let shouldContinue = false;
const docs = [];
const ticketsQuery = new URLSearchParams({
expand: "true",
sort_by: "updated_at",
order_by: "asc",
query: "state.name: closed",
per_page: "25",
page: `${page}`,
});
const rawTickets = await fetch(
`${zammadApiUrl}/tickets/search?${ticketsQuery}`,
{ headers },
);
const tickets: any = await rawTickets.json();
console.log({ tickets });
if (!tickets || tickets.length === 0) {
return [shouldContinue, docs];
}
for await (const ticket of tickets) {
const { id: source_id, created_at, updated_at, close_at } = ticket;
const source_created_at = new Date(created_at);
const source_updated_at = new Date(updated_at);
const source_closed_at = new Date(close_at);
shouldContinue = true;
if (source_closed_at <= minUpdatedTimestamp) {
console.log(`Skipping ticket`, {
source_id,
source_updated_at,
source_closed_at,
minUpdatedTimestamp,
});
continue;
}
console.log(`Processing ticket`, {
source_id,
source_updated_at,
source_closed_at,
minUpdatedTimestamp,
});
const rawArticles = await fetch(
`${zammadApiUrl}/ticket_articles/by_ticket/${source_id}`,
{ headers },
);
const articles: any = await rawArticles.json();
let articleText = "";
for (const article of articles) {
const { content_type: contentType, body } = article;
if (contentType === "text/html") {
const cleanArticleText = convert(body);
articleText += cleanArticleText + "\n\n";
} else {
articleText += body + "\n\n";
}
}
const tagsQuery = new URLSearchParams({
object: "Ticket",
o_id: source_id,
});
const rawTags = await fetch(`${zammadApiUrl}/tags?${tagsQuery}`, {
headers,
});
const { tags }: any = await rawTags.json();
const transformedTags = [];
for (const tag of tags) {
const outputs = tagMap[tag];
if (outputs) {
transformedTags.push(...outputs);
}
}
const doc: FormattedZammadTicket = {
data: {
ticket: articleText,
contributor_id: contributorId,
source_id,
source_closed_at,
source_created_at,
source_updated_at,
},
predictions: [],
};
const result = transformedTags.map((tag) => {
return {
type: "choices",
value: {
choices: [tag.value],
},
to_name: "ticket",
from_name: tag.field,
};
});
if (result.length > 0) {
doc.predictions.push({
model_version: `${contributorName}TranslatorV1`,
result,
});
}
docs.push(doc);
}
return [shouldContinue, docs];
};
const fetchFromZammad = async (
minUpdatedTimestamp: Date,
): Promise<FormattedZammadTicket[]> => {
const pages = [...Array.from({ length: 10000 }).keys()];
const allTickets: FormattedZammadTicket[] = [];
for await (const page of pages) {
const [shouldContinue, tickets] = await getZammadTickets(
page + 1,
minUpdatedTimestamp,
);
if (!shouldContinue) {
break;
}
if (tickets.length > 0) {
allTickets.push(...tickets);
}
}
return allTickets;
};
const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => {
const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = await loadConfig();
const headers = {
Authorization: `Token ${labelStudioApiKey}`,
"Content-Type": "application/json",
Accept: "application/json",
};
for await (const ticket of tickets) {
const res = await fetch(`${labelStudioApiUrl}/projects/1/import`, {
method: "POST",
headers,
body: JSON.stringify([ticket]),
});
const importResult = await res.json();
console.log(JSON.stringify(importResult, undefined, 2));
}
};
const importLabelStudioTask = async (): Promise<void> => {
withDb(async (db: AppDatabase) => {
const {
leafcutter: { contributorName },
} = await loadConfig();
const settingName = `${contributorName}ImportLabelStudioTask`;
const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp
? new Date(res.value.minUpdatedTimestamp as string)
: new Date("2023-03-01");
const tickets = await fetchFromZammad(startTimestamp);
if (tickets.length > 0) {
await sendToLabelStudio(tickets);
const lastTicket = tickets.pop();
const newLastTimestamp = lastTicket.data.source_closed_at;
console.log({ newLastTimestamp });
await db.settings.upsert(settingName, {
minUpdatedTimestamp: newLastTimestamp,
});
}
});
};
export default importLabelStudioTask;

View file

@ -0,0 +1,174 @@
/* eslint-disable camelcase */
import fetch from "node-fetch";
import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../db";
import { loadConfig } from "@digiresilience/bridge-config";
type LabelStudioTicket = {
id: string;
is_labeled: boolean;
annotations: Record<string, unknown>[];
data: Record<string, unknown>;
updated_at: string;
};
type LeafcutterTicket = {
id: string;
incident: string[];
technology: string[];
targeted_group: string[];
country: string[];
region: string[];
continent: string[];
date: Date;
origin: string;
origin_id: string;
source_created_at: string;
source_updated_at: string;
};
const getLabelStudioTickets = async (
page: number,
): Promise<LabelStudioTicket[]> => {
const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = await loadConfig();
const headers = {
Authorization: `Token ${labelStudioApiKey}`,
Accept: "application/json",
};
const ticketsQuery = new URLSearchParams({
page_size: "50",
page: `${page}`,
});
console.log({ url: `${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}` });
const res = await fetch(
`${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}`,
{ headers },
);
console.log({ res });
const tasksResult: any = await res.json();
console.log({ tasksResult });
return tasksResult;
};
const fetchFromLabelStudio = async (
minUpdatedTimestamp: Date,
): Promise<LabelStudioTicket[]> => {
const pages = [...Array.from({ length: 10000 }).keys()];
const allDocs: LabelStudioTicket[] = [];
for await (const page of pages) {
const docs = await getLabelStudioTickets(page + 1);
console.log({ page, docs });
if (docs && docs.length > 0) {
for (const doc of docs) {
const updatedAt = new Date(doc.updated_at);
console.log({ updatedAt, minUpdatedTimestamp });
if (updatedAt > minUpdatedTimestamp) {
console.log(`Adding doc`, { doc });
allDocs.push(doc);
}
}
} else {
break;
}
}
console.log({ allDocs });
return allDocs;
};
const sendToLeafcutter = async (tickets: LabelStudioTicket[]) => {
const {
leafcutter: {
contributorId,
opensearchApiUrl,
opensearchUsername,
opensearchPassword,
},
} = await loadConfig();
console.log({ tickets });
const filteredTickets = tickets.filter((ticket) => ticket.is_labeled);
console.log({ filteredTickets });
const finalTickets: LeafcutterTicket[] = filteredTickets.map((ticket) => {
const {
id,
annotations,
data: { source_id, source_created_at, source_updated_at },
} = ticket;
const getTags = (tags: Record<string, any>[], name: string) =>
tags
.filter((tag) => tag.from_name === name)
.map((tag) => tag.value.choices)
.flat();
const allTags = annotations.map(({ result }) => result).flat();
const incident = getTags(allTags, "incidentType tag");
const technology = getTags(allTags, "platform tag");
const country = getTags(allTags, "country tag");
const targetedGroup = getTags(allTags, "targetedGroup tag");
return {
id,
incident,
technology,
targeted_group: targetedGroup,
country,
region: [],
continent: [],
date: new Date(source_created_at as string),
origin: contributorId,
origin_id: source_id as string,
source_created_at: source_created_at as string,
source_updated_at: source_updated_at as string,
};
});
console.log("Sending to Leafcutter");
console.log({ finalTickets });
const result = await fetch(opensearchApiUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Basic ${Buffer.from(`${opensearchUsername}:${opensearchPassword}`).toString("base64")}`,
},
body: JSON.stringify({ tickets: finalTickets }),
});
console.log({ result });
};
const importLeafcutterTask = async (): Promise<void> => {
withDb(async (db: AppDatabase) => {
const {
leafcutter: { contributorName },
} = await loadConfig();
const settingName = `${contributorName}ImportLeafcutterTask`;
const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp
? new Date(res.value.minUpdatedTimestamp as string)
: new Date("2023-03-01");
const newLastTimestamp = new Date();
console.log({
contributorName,
settingName,
res,
startTimestamp,
newLastTimestamp,
});
const tickets = await fetchFromLabelStudio(startTimestamp);
console.log({ tickets });
await sendToLeafcutter(tickets);
await db.settings.upsert(settingName, {
minUpdatedTimestamp: newLastTimestamp,
});
});
};
export default importLeafcutterTask;

View file

@ -0,0 +1,57 @@
import Wreck from "@hapi/wreck";
import * as R from "remeda";
import { withDb, AppDatabase } from "../db";
import logger from "../logger";
export interface WebhookOptions {
webhookId: string;
payload: any;
}
const notifyWebhooksTask = async (options: WebhookOptions): Promise<void> =>
withDb(async (db: AppDatabase) => {
const { webhookId, payload } = options;
const webhook = await db.webhooks.findById({ id: webhookId });
if (!webhook) {
logger.debug(
{ webhookId },
"notify-webhook: no webhook registered with id"
);
return;
}
const { endpointUrl, httpMethod, headers } = webhook;
const headersFormatted = R.reduce(
headers || [],
(acc: any, h: any) => {
acc[h.header] = h.value;
return acc;
},
{}
);
const wreck = Wreck.defaults({
json: true,
headers: headersFormatted,
});
// http errors will bubble up and cause the job to fail and be retried
try {
logger.debug(
{ webhookId, endpointUrl, httpMethod },
"notify-webhook: notifying"
);
await (httpMethod === "post"
? wreck.post(endpointUrl, { payload })
: wreck.put(endpointUrl, { payload }));
} catch (error: any) {
logger.error(
{ webhookId, error: error.output },
"notify-webhook failed with this error"
);
throw new Error(`webhook failed webhookId=${webhookId}`);
}
});
export default notifyWebhooksTask;

View file

@ -0,0 +1,90 @@
/* eslint-disable camelcase */
import logger from "../logger";
import { IncomingMessagev1 } from "@digiresilience/node-signald/build/main/generated";
import { withDb, AppDatabase } from "../db";
import workerUtils from "../utils";
interface WebhookPayload {
to: string;
from: string;
message_id: string;
sent_at: string;
message: string;
attachment: string | null;
filename: string | null;
mime_type: string | null;
}
interface SignaldMessageTaskOptions {
message: IncomingMessagev1;
botId: string;
botPhoneNumber: string;
attachment: string;
filename: string;
mimetype: string;
}
const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
const { botId, botPhoneNumber, message, attachment, filename, mimetype } = opts;
const { source, timestamp, data_message: dataMessage } = message;
const { number }: any = source;
const { body, attachments }: any = dataMessage;
return {
to: botPhoneNumber,
from: number,
message_id: `${botId}-${timestamp}`,
sent_at: `${timestamp}`,
message: body,
attachment,
filename,
mime_type: mimetype,
};
};
const notifyWebhooks = async (
db: AppDatabase,
messageInfo: SignaldMessageTaskOptions
) => {
const {
botId,
message: { timestamp },
} = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId("signal", botId);
if (webhooks && webhooks.length === 0) {
logger.debug({ botId }, "no webhooks registered for signal bot");
return;
}
webhooks.forEach(({ id }) => {
const payload = formatPayload(messageInfo);
logger.debug(
{ payload },
"formatted signal bot payload for notify-webhook"
);
workerUtils.addJob(
"notify-webhook",
{
payload,
webhookId: id,
},
{
// this de-deduplicates the job
jobKey: `webhook-${id}-message-${botId}-${timestamp}`,
}
);
});
};
const signaldMessageTask = async (
options: SignaldMessageTaskOptions
): Promise<void> => {
console.log(options);
withDb(async (db: AppDatabase) => {
await notifyWebhooks(db, options);
});
};
export default signaldMessageTask;

View file

@ -0,0 +1,101 @@
import Wreck from "@hapi/wreck";
import { withDb, AppDatabase } from "../db";
import { twilioClientFor } from "../common";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import workerUtils from "../utils";
interface WebhookPayload {
startTime: string;
endTime: string;
to: string;
from: string;
duration: string;
callSid: string;
recording: string;
mimeType: string;
}
const getTwilioRecording = async (url: string) => {
try {
const { payload } = await Wreck.get(url);
return { recording: payload as Buffer };
} catch (error: any) {
console.error(error.output);
return { error: error.output };
}
};
const formatPayload = (
call: CallInstance,
recording: Buffer
): WebhookPayload => {
return {
startTime: call.startTime.toISOString(),
endTime: call.endTime.toISOString(),
to: call.toFormatted,
from: call.fromFormatted,
duration: call.duration,
callSid: call.sid,
recording: recording.toString("base64"),
mimeType: "audio/mpeg",
};
};
const notifyWebhooks = async (
db: AppDatabase,
voiceLineId: string,
call: CallInstance,
recording: Buffer
) => {
const webhooks = await db.webhooks.findAllByBackendId("voice", voiceLineId);
if (webhooks && webhooks.length === 0) return;
webhooks.forEach(({ id }) => {
const payload = formatPayload(call, recording);
workerUtils.addJob(
"notify-webhook",
{
payload,
webhookId: id,
},
{
// this de-depuplicates the job
jobKey: `webhook-${id}-call-${call.sid}`,
}
);
});
};
interface TwilioRecordingTaskOptions {
accountSid: string;
callSid: string;
recordingSid: string;
voiceLineId: string;
}
const twilioRecordingTask = async (
options: TwilioRecordingTaskOptions
): Promise<void> =>
withDb(async (db: AppDatabase) => {
const { voiceLineId, accountSid, callSid, recordingSid } = options;
const voiceLine = await db.voiceLines.findById({ id: voiceLineId });
if (!voiceLine) return;
const provider = await db.voiceProviders.findByTwilioAccountSid(accountSid);
if (!provider) return;
const client = twilioClientFor(provider);
const meta = await client.recordings(recordingSid).fetch();
const mp3Url = "https://api.twilio.com/" + meta.uri.slice(0, -4) + "mp3";
const { recording, error } = await getTwilioRecording(mp3Url);
if (error) {
throw new Error(`failed to get recording for call ${callSid}`);
}
const call = await client.calls(callSid).fetch();
await notifyWebhooks(db, voiceLineId, call, recording!);
});
export default twilioRecordingTask;

View file

@ -0,0 +1,48 @@
import { createHash } from "crypto";
import { withDb, AppDatabase } from "../db";
import { convert } from "../lib/media-convert";
interface VoiceLineAudioUpdateTaskOptions {
voiceLineId: string;
}
const sha1sum = (v: any) => {
const shasum = createHash("sha1");
shasum.update(v);
return shasum.digest("hex");
};
const voiceLineAudioUpdateTask = async (
payload: VoiceLineAudioUpdateTaskOptions
): Promise<void> =>
withDb(async (db: AppDatabase) => {
const { voiceLineId } = payload;
const voiceLine = await db.voiceLines.findById({ id: voiceLineId });
if (!voiceLine) return;
if (!voiceLine?.promptAudio?.["audio/webm"]) return;
const webm = Buffer.from(voiceLine.promptAudio["audio/webm"], "base64");
const webmSha1 = sha1sum(webm);
if (
voiceLine.promptAudio.checksum &&
voiceLine.promptAudio.checksum === webmSha1
) {
// already converted
return;
}
const mp3 = await convert(webm);
await db.voiceLines.updateById(
{ id: voiceLine.id },
{
promptAudio: {
...voiceLine.promptAudio,
"audio/mpeg": mp3.toString("base64"),
checksum: webmSha1,
},
}
);
});
export default voiceLineAudioUpdateTask;

View file

@ -0,0 +1,41 @@
import Twilio from "twilio";
import config from "@digiresilience/bridge-config";
import { withDb, AppDatabase } from "../db";
interface VoiceLineDeleteTaskOptions {
voiceLineId: string;
providerId: string;
providerLineSid: string;
}
const voiceLineDeleteTask = async (
payload: VoiceLineDeleteTaskOptions,
): Promise<void> =>
withDb(async (db: AppDatabase) => {
const { voiceLineId, providerId, providerLineSid } = payload;
const provider = await db.voiceProviders.findById({ id: providerId });
if (!provider) return;
const { accountSid, apiKeySid, apiKeySecret } = provider.credentials;
if (!accountSid || !apiKeySid || !apiKeySecret)
throw new Error(
`twilio provider ${provider.name} does not have credentials`,
);
const client = Twilio(apiKeySid, apiKeySecret, {
accountSid,
});
const number = await client.incomingPhoneNumbers(providerLineSid).fetch();
if (
number &&
number.voiceUrl ===
`${config.frontend.url}/api/v1/voice/twilio/record/${voiceLineId}`
)
await client.incomingPhoneNumbers(providerLineSid).update({
voiceUrl: "",
voiceMethod: "POST",
});
});
export default voiceLineDeleteTask;

View file

@ -0,0 +1,38 @@
import Twilio from "twilio";
import config from "@digiresilience/bridge-config";
import { withDb, AppDatabase } from "../db";
interface VoiceLineUpdateTaskOptions {
voiceLineId: string;
}
const voiceLineUpdateTask = async (
payload: VoiceLineUpdateTaskOptions,
): Promise<void> =>
withDb(async (db: AppDatabase) => {
const { voiceLineId } = payload;
const voiceLine = await db.voiceLines.findById({ id: voiceLineId });
if (!voiceLine) return;
const provider = await db.voiceProviders.findById({
id: voiceLine.providerId,
});
if (!provider) return;
const { accountSid, apiKeySid, apiKeySecret } = provider.credentials;
if (!accountSid || !apiKeySid || !apiKeySecret)
throw new Error(
`twilio provider ${provider.name} does not have credentials`,
);
const client = Twilio(apiKeySid, apiKeySecret, {
accountSid,
});
await client.incomingPhoneNumbers(voiceLine.providerLineSid).update({
voiceUrl: `${config.frontend.url}/api/v1/voice/twilio/record/${voiceLineId}`,
voiceMethod: "POST",
});
});
export default voiceLineUpdateTask;

View file

@ -0,0 +1,94 @@
/* eslint-disable camelcase */
import { withDb, AppDatabase } from "../db";
import workerUtils from "../utils";
interface WebhookPayload {
to: string;
from: string;
message_id: string;
sent_at: string;
message: string;
attachment: string;
filename: string;
mime_type: string;
}
interface WhatsappMessageTaskOptions {
waMessageId: string;
waMessage: string;
waTimestamp: string;
attachment: string;
filename: string;
mimetype: string;
botPhoneNumber: string;
whatsappBotId: string;
}
const formatPayload = (
messageInfo: WhatsappMessageTaskOptions
): WebhookPayload => {
const {
waMessageId,
waMessage,
waTimestamp,
attachment,
filename,
mimetype,
botPhoneNumber,
} = messageInfo;
const parsedMessage = JSON.parse(waMessage);
const message = parsedMessage.message?.conversation ??
parsedMessage.message?.extendedTextMessage?.text ??
parsedMessage.message?.imageMessage?.caption ??
parsedMessage.message?.videoMessage?.caption;
return {
to: botPhoneNumber,
from: parsedMessage.key.remoteJid,
message_id: waMessageId,
sent_at: waTimestamp,
message,
attachment,
filename,
mime_type: mimetype,
};
};
const notifyWebhooks = async (
db: AppDatabase,
messageInfo: WhatsappMessageTaskOptions
) => {
const { waMessageId, whatsappBotId } = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId(
"whatsapp",
whatsappBotId
);
if (webhooks && webhooks.length === 0) return;
webhooks.forEach(({ id }) => {
const payload = formatPayload(messageInfo);
console.log({ payload });
workerUtils.addJob(
"notify-webhook",
{
payload,
webhookId: id,
},
{
// this de-deduplicates the job
jobKey: `webhook-${id}-message-${waMessageId}`,
}
);
});
};
const whatsappMessageTask = async (
options: WhatsappMessageTaskOptions
): Promise<void> => {
console.log(options);
withDb(async (db: AppDatabase) => {
await notifyWebhooks(db, options);
});
};
export default whatsappMessageTask;

View file

@ -0,0 +1,10 @@
{
"extends": "tsconfig",
"compilerOptions": {
"outDir": "build/main",
"esModuleInterop": true,
"skipLibCheck": true
},
"include": ["**/*.ts", "**/.*.ts"],
"exclude": ["node_modules", "build"]
}

View file

@ -0,0 +1,21 @@
import * as Worker from "graphile-worker";
import { defState } from "@digiresilience/montar";
import config from "@digiresilience/bridge-config";
const startWorkerUtils = async (): Promise<Worker.WorkerUtils> => {
const workerUtils = await Worker.makeWorkerUtils({
connectionString: config.worker.connection,
});
return workerUtils;
};
const stopWorkerUtils = async (): Promise<void> => {
return workerUtils.release();
};
const workerUtils = defState("workerUtils", {
start: startWorkerUtils,
stop: stopWorkerUtils,
});
export default workerUtils;

View file

@ -0,0 +1,106 @@
/* eslint-disable camelcase,@typescript-eslint/explicit-module-boundary-types,@typescript-eslint/no-explicit-any */
import querystring from "querystring";
import Wreck from "@hapi/wreck";
export interface User {
id: number;
firstname?: string;
lastname?: string;
email?: string;
phone?: string;
}
export interface Ticket {
id: number;
title?: string;
group_id?: number;
customer_id?: number;
}
export interface ZammadClient {
ticket: {
create: (data: any) => Promise<Ticket>;
};
user: {
search: (data: any) => Promise<User[]>;
create: (data: any) => Promise<User>;
};
}
export type ZammadCredentials =
| { username: string; password: string }
| { token: string };
export interface ZammadClientOpts {
headers?: Record<string, any>;
}
const formatAuth = (credentials: any) => {
if (credentials.username) {
return (
"Basic " +
Buffer.from(`${credentials.username}:${credentials.password}`).toString(
"base64"
)
);
}
if (credentials.token) {
return `Token ${credentials.token}`;
}
throw new Error("invalid zammad credentials type");
};
export const Zammad = (
credentials: ZammadCredentials,
host: string,
opts?: ZammadClientOpts
): ZammadClient => {
const extraHeaders = (opts && opts.headers) || {};
const wreck = Wreck.defaults({
baseUrl: `${host}/api/v1/`,
headers: {
authorization: formatAuth(credentials),
...extraHeaders,
},
json: true,
});
return {
ticket: {
create: async (payload) => {
const { payload: result } = await wreck.post("tickets", { payload });
return result as Ticket;
},
},
user: {
search: async (query) => {
const qp = querystring.stringify({ query });
const { payload: result } = await wreck.get(`users/search?${qp}`);
return result as User[];
},
create: async (payload) => {
const { payload: result } = await wreck.post("users", { payload });
return result as User;
},
},
};
};
export const getUser = async (zammad: ZammadClient, phoneNumber: string) => {
const mungedNumber = phoneNumber.replace("+", "");
const results = await zammad.user.search(`phone:${mungedNumber}`);
if (results.length > 0) return results[0];
return undefined;
};
export const getOrCreateUser = async (zammad: ZammadClient, phoneNumber: string) => {
const customer = await getUser(zammad, phoneNumber);
if (customer) return customer;
return zammad.user.create({
phone: phoneNumber,
note: "User created by Grabadora from incoming voice call",
});
};