Bridge worker updates

This commit is contained in:
Darren Clarke 2024-04-21 16:59:50 +02:00
parent a445762a37
commit f93c4ad317
33 changed files with 17584 additions and 161 deletions

View file

@ -1,8 +1,20 @@
import { PostgresDialect } from "kysely"; import { PostgresDialect, CamelCasePlugin } from "kysely";
import type { GeneratedAlways } from "kysely";
import { Pool } from "pg"; import { Pool } from "pg";
import { KyselyAuth } from "@auth/kysely-adapter"; import { KyselyAuth } from "@auth/kysely-adapter";
import { CamelCasePlugin } from "kysely";
import type { GeneratedAlways } from "kysely"; type GraphileJob = {
taskIdentifier: string;
payload: Record<string, any>;
priority: number;
maxAttempts: number;
key: string;
queueName: string;
};
export const addGraphileJob = async (jobInfo: GraphileJob) => {
await db().insertInto("graphile_worker.jobs").values(jobInfo).execute();
};
interface Database { interface Database {
User: { User: {

View file

@ -8,6 +8,5 @@
**/.env* **/.env*
**/coverage **/coverage
**/.next **/.next
**/amigo.*.json
**/cypress/videos **/cypress/videos
**/cypress/screenshots **/cypress/screenshots

View file

@ -1 +0,0 @@
require("../.eslintrc.js");

View file

@ -0,0 +1,32 @@
FROM node:18-alpine as base
FROM base AS builder
ARG APP_DIR=/opt/bridge
RUN mkdir -p ${APP_DIR}/
RUN npm i -g turbo
WORKDIR ${APP_DIR}
COPY . .
RUN turbo prune --scope=bridge-worker --docker
FROM base AS installer
ARG APP_DIR=/opt/bridge
WORKDIR ${APP_DIR}
COPY .gitignore .gitignore
COPY --from=builder ${APP_DIR}/out/json/ .
COPY --from=builder ${APP_DIR}/out/package-lock.json ./package-lock.json
COPY --from=builder ${APP_DIR}/out/full/ .
RUN npm ci
RUN npm i -g turbo
RUN turbo run build --filter=bridge-worker
FROM graphile/worker:0.16.5 as runner
ARG APP_DIR=/opt/bridge
RUN mkdir -p ${APP_DIR}/
ARG BUILD_DIR=${APP_DIR}/apps/bridge-worker/build/main
RUN mkdir -p ${APP_DIR}/
WORKDIR /worker
COPY --from=installer ${BUILD_DIR}/lib ${APP_DIR}/lib
COPY --from=installer ${BUILD_DIR}/tasks ${APP_DIR}/tasks
COPY --from=installer ${APP_DIR}/apps/bridge-worker/graphile.config.prod.js ./graphile.config.js
COPY --from=installer ${APP_DIR}/node_modules ${APP_DIR}/node_modules
COPY --from=installer ${APP_DIR}/package.json ${APP_DIR}/package.json

View file

@ -1,3 +0,0 @@
{
"presets": ["@digiresilience/babel-preset-bridge"]
}

View file

@ -0,0 +1,10 @@
module.exports = {
worker: {
connectionString: process.env.DATABASE_URL,
maxPoolSize: 10,
pollInterval: 2000,
concurrentJobs: 3,
taskDirectory: "/opt/bridge/tasks",
fileExtensions: [".js", ".cjs", ".mjs"],
},
};

View file

@ -0,0 +1,13 @@
import type {} from "graphile-config";
import type {} from "graphile-worker";
const preset: GraphileConfig.Preset = {
worker: {
connectionString: process.env.DATABASE_URL,
maxPoolSize: 10,
pollInterval: 2000,
fileExtensions: [".ts"],
},
};
export default preset;

View file

@ -1,10 +1,11 @@
import * as Worker from "graphile-worker"; import { run, Runner, RunnerOptions } from "graphile-worker";
import { parseCronItems } from "graphile-worker"; // import { parseCronItems } from "graphile-worker";
import { initPgp } from "./db.js"; // import { initPgp } from "./db.js";
import logger from "./logger.js"; // import logger from "./logger.js";
import workerUtils from "./utils.js"; // import workerUtils from "./utils.js";
import { assertFfmpegAvailable } from "./lib/media-convert.js"; // import { assertFfmpegAvailable } from "./lib/media-convert.js";
/*
const logFactory = (scope: any) => (level: any, message: any, meta: any) => { const logFactory = (scope: any) => (level: any, message: any, meta: any) => {
const pinoLevel = level === "warning" ? "warn" : level; const pinoLevel = level === "warning" ? "warn" : level;
const childLogger = logger.child({ scope }); const childLogger = logger.child({ scope });
@ -12,40 +13,28 @@ const logFactory = (scope: any) => (level: any, message: any, meta: any) => {
else childLogger[pinoLevel](message); else childLogger[pinoLevel](message);
}; };
export const configWorker = async (): Promise<Worker.RunnerOptions> => { */
const { const getConfig = (): RunnerOptions => {
connection, // logger.info({ concurrency, pollInterval }, "Starting worker");
concurrency,
pollInterval,
leafcutter: {
enabled: leafcutterEnabled
}
} = config.worker;
logger.info({ concurrency, pollInterval }, "Starting worker");
return { return {
concurrency, concurrency: 10,
pollInterval, pollInterval: 1000,
logger: new Worker.Logger(logFactory), // logger: new Worker.Logger(logFactory),
connectionString: connection, connectionString: process.env.DATABASE_URL,
// eslint-disable-next-line unicorn/prefer-module // eslint-disable-next-line unicorn/prefer-module
taskDirectory: `${__dirname}/tasks`, taskDirectory: `${__dirname}/tasks`,
parsedCronItems: parseCronItems(leafcutterEnabled ? // parsedCronItems: parseCronItems(
[{ task: "import-label-studio", pattern: "*/15 * * * *" }, // leafcutterEnabled
{ task: "import-leafcutter", pattern: "*/17 * * * *" }] : // ? [
[]) // { task: "import-label-studio", pattern: "*/15 * * * *" },
// { task: "import-leafcutter", pattern: "*/17 * * * *" },
// ]
// : [],
// )
}; };
}; };
export const startWorker = async (): Promise<Worker.Runner> => { export const startWorker = async (): Promise<Runner> => {
// ensure ffmpeg is installed and working
await assertFfmpegAvailable();
logger.info("ffmpeg found");
await workerUtils.migrate();
logger.info("worker database migrated");
initPgp();
const workerConfig = await configWorker(); const workerConfig = await configWorker();
const worker = await Worker.run(workerConfig); const worker = await Worker.run(workerConfig);
return worker; return worker;
@ -55,9 +44,15 @@ export const stopWorker = async (): Promise<void> => {
await worker.stop(); await worker.stop();
}; };
const worker = defState("worker", { const main = async () => {
start: startWorker, await worker.start();
stop: stopWorker, process.on("SIGTERM", async () => {
}); await worker.stop();
process.exit(0);
});
};
export default worker; main().catch((err) => {
console.error(err);
process.exit(1);
});

View file

@ -1,9 +1,11 @@
/* eslint-disable camelcase */ /* eslint-disable camelcase */
import { SavedVoiceProvider } from "@digiresilience/bridge-db"; // import { SavedVoiceProvider } from "@digiresilience/bridge-db";
import Twilio from "twilio"; import Twilio from "twilio";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call"; import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import { Zammad, getOrCreateUser } from "./zammad"; import { Zammad, getOrCreateUser } from "./zammad";
type SavedVoiceProvider = any;
export const twilioClientFor = ( export const twilioClientFor = (
provider: SavedVoiceProvider, provider: SavedVoiceProvider,
): Twilio.Twilio => { ): Twilio.Twilio => {

View file

@ -1,3 +1,4 @@
/*
import pgPromise from "pg-promise"; import pgPromise from "pg-promise";
import * as pgMonitor from "pg-monitor"; import * as pgMonitor from "pg-monitor";
import { import {
@ -37,8 +38,12 @@ const initDb = (): AppDatabase => {
export const stopDb = async (db: AppDatabase): Promise<void> => { export const stopDb = async (db: AppDatabase): Promise<void> => {
return db.$pool.end(); return db.$pool.end();
}; };
*/
export type AppDatabase = any;
export const withDb = <T>(f: (db: AppDatabase) => Promise<T>): Promise<T> => { export const withDb = <T>(f: (db: AppDatabase) => Promise<T>): Promise<T> => {
/*
const db = initDb(); const db = initDb();
initDiagnostics(config.logging.sql, pgpInitOptions); initDiagnostics(config.logging.sql, pgpInitOptions);
try { try {
@ -46,6 +51,6 @@ export const withDb = <T>(f: (db: AppDatabase) => Promise<T>): Promise<T> => {
} finally { } finally {
stopDiagnostics(); stopDiagnostics();
} }
*/
return f(null);
}; };
export type { AppDatabase } from "@digiresilience/bridge-db";

View file

View file

@ -0,0 +1,11 @@
//import { defState } from "@digiresilience/montar";
//import { configureLogger } from "@digiresilience/bridge-common";
// import config from "@digiresilience/bridge-config";
//export const logger = defState("workerLogger", {
// start: async () => configureLogger(config),
//});
//export default logger;
export const logger = {};
export default logger;

View file

@ -1,7 +1,8 @@
import * as Worker from "graphile-worker"; import * as Worker from "graphile-worker";
import { defState } from "@digiresilience/montar"; // import { defState } from "@digiresilience/montar";
import config from "@digiresilience/bridge-config"; //import config from "@digiresilience/bridge-config";
/*
const startWorkerUtils = async (): Promise<Worker.WorkerUtils> => { const startWorkerUtils = async (): Promise<Worker.WorkerUtils> => {
const workerUtils = await Worker.makeWorkerUtils({ const workerUtils = await Worker.makeWorkerUtils({
connectionString: config.worker.connection, connectionString: config.worker.connection,
@ -18,4 +19,8 @@ const workerUtils = defState("workerUtils", {
stop: stopWorkerUtils, stop: stopWorkerUtils,
}); });
*/
export const workerUtils: any = {};
export default workerUtils; export default workerUtils;

View file

View file

View file

@ -1,8 +0,0 @@
import { defState } from "@digiresilience/montar";
import { configureLogger } from "@digiresilience/bridge-common";
import config from "@digiresilience/bridge-config";
export const logger = defState("workerLogger", {
start: async () => configureLogger(config),
});
export default logger;

View file

@ -1,55 +1,34 @@
{ {
"name": "bridge-worker", "name": "bridge-worker",
"version": "0.2.0", "version": "0.2.0",
"main": "build/main/index.js",
"type": "module", "type": "module",
"author": "Abel Luck <abel@guardianproject.info>", "author": "Darren Clarke <darren@redaranj.com>",
"license": "AGPL-3.0-or-later", "license": "AGPL-3.0-or-later",
"scripts": {
"build": "tsc -p tsconfig.json",
"dev": "NODE_OPTIONS=\"--loader ts-node/esm\" graphile-worker"
},
"dependencies": { "dependencies": {
"@hapi/wreck": "^18.1.0",
"fluent-ffmpeg": "^2.1.2",
"graphile-worker": "^0.16.5", "graphile-worker": "^0.16.5",
"html-to-text": "^9.0.5", "html-to-text": "^9.0.5",
"node-fetch": "^3", "jest": "^29.7.0",
"pg-promise": "^11.6.0", "kysely": "^0.27.3",
"pg": "^8.11.5",
"remeda": "^1.60.1", "remeda": "^1.60.1",
"twilio": "^5.0.4" "twilio": "^5.0.4"
}, },
"devDependencies": { "devDependencies": {
"ts-config": "*",
"@babel/core": "7.24.4", "@babel/core": "7.24.4",
"@babel/preset-env": "7.24.4", "@babel/preset-env": "7.24.4",
"@babel/preset-typescript": "7.24.1", "@babel/preset-typescript": "7.24.1",
"@types/fluent-ffmpeg": "^2.1.24", "@types/fluent-ffmpeg": "^2.1.24",
"@types/jest": "^29.5.12",
"eslint": "^9.0.0", "eslint": "^9.0.0",
"jest": "^29.7.0",
"jest-circus": "^29.7.0",
"jest-junit": "^16.0.0",
"nodemon": "^3.1.0",
"pino-pretty": "^11.0.0",
"prettier": "^3.2.5", "prettier": "^3.2.5",
"ts-config": "*",
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"typedoc": "^0.25.13", "typedoc": "^0.25.13",
"typescript": "^5.4.5" "typescript": "^5.4.5"
},
"nodemonConfig": {
"ignore": [
"docs/*"
],
"ext": "ts,json,js"
},
"scripts": {
"build-xxx": "tsc -p tsconfig.json",
"build-test": "tsc -p tsconfig.json",
"doc:html": "typedoc src/ --exclude '**/*.test.ts' --exclude '**/*.spec.ts' --name $npm_package_name --readme README.md --target es2019 --mode file --out build/docs",
"doc": "yarn run doc:html",
"fix:lint": "eslint src --ext .ts --fix",
"fix:prettier": "prettier \"src/**/*.ts\" --write",
"test:jest": "JEST_CIRCUS=1 jest --coverage --forceExit --detectOpenHandles --reporters=default --reporters=jest-junit",
"test:jest-verbose": "yarn test:jest --verbose --silent=false",
"test": "yarn test:jest",
"lint": "yarn lint:lint && yarn lint:prettier",
"lint:lint": "eslint src --ext .ts",
"lint:prettier": "prettier \"src/**/*.ts\" --list-different",
"watch:test": "yarn test:jest --watchAll"
} }
} }

View file

@ -1,7 +1,7 @@
import Wreck from "@hapi/wreck"; import Wreck from "@hapi/wreck";
import * as R from "remeda"; import * as R from "remeda";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import logger from "../logger"; // import logger from "../logger";
export interface WebhookOptions { export interface WebhookOptions {
webhookId: string; webhookId: string;
@ -14,10 +14,10 @@ const notifyWebhooksTask = async (options: WebhookOptions): Promise<void> =>
const webhook = await db.webhooks.findById({ id: webhookId }); const webhook = await db.webhooks.findById({ id: webhookId });
if (!webhook) { if (!webhook) {
logger.debug( // logger.debug(
{ webhookId }, // { webhookId },
"notify-webhook: no webhook registered with id" // "notify-webhook: no webhook registered with id",
); // );
return; return;
} }
@ -28,7 +28,7 @@ const notifyWebhooksTask = async (options: WebhookOptions): Promise<void> =>
acc[h.header] = h.value; acc[h.header] = h.value;
return acc; return acc;
}, },
{} {},
); );
const wreck = Wreck.defaults({ const wreck = Wreck.defaults({
@ -38,18 +38,18 @@ const notifyWebhooksTask = async (options: WebhookOptions): Promise<void> =>
// http errors will bubble up and cause the job to fail and be retried // http errors will bubble up and cause the job to fail and be retried
try { try {
logger.debug( // logger.debug(
{ webhookId, endpointUrl, httpMethod }, // { webhookId, endpointUrl, httpMethod },
"notify-webhook: notifying" // "notify-webhook: notifying",
); // );
await (httpMethod === "post" await (httpMethod === "post"
? wreck.post(endpointUrl, { payload }) ? wreck.post(endpointUrl, { payload })
: wreck.put(endpointUrl, { payload })); : wreck.put(endpointUrl, { payload }));
} catch (error: any) { } catch (error: any) {
logger.error( // logger.error(
{ webhookId, error: error.output }, // { webhookId, error: error.output },
"notify-webhook failed with this error" // "notify-webhook failed with this error",
); // );
throw new Error(`webhook failed webhookId=${webhookId}`); throw new Error(`webhook failed webhookId=${webhookId}`);
} }
}); });

View file

@ -0,0 +1,12 @@
interface ReceiveFacebookMessageTaskOptions {}
const receiveFacebookMessageTask = async (
options: ReceiveFacebookMessageTaskOptions,
): Promise<void> => {
console.log(options);
// withDb(async (db: AppDatabase) => {
// await notifyWebhooks(db, options);
// });
};
export default receiveFacebookMessageTask;

View file

@ -0,0 +1,12 @@
interface SendFacebookMessageTaskOptions {}
const sendFacebookMessageTask = async (
options: SendFacebookMessageTaskOptions,
): Promise<void> => {
console.log(options);
// withDb(async (db: AppDatabase) => {
// await notifyWebhooks(db, options);
// });
};
export default sendFacebookMessageTask;

View file

@ -1,10 +1,11 @@
/* eslint-disable camelcase */ /* eslint-disable camelcase */
import { convert } from "html-to-text"; import { convert } from "html-to-text";
import fetch from "node-fetch";
import { URLSearchParams } from "url"; import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import { loadConfig } from "@digiresilience/bridge-config"; // import { loadConfig } from "@digiresilience/bridge-config";
import { tagMap } from "../lib/tag-map"; import { tagMap } from "../../lib/tag-map";
const config: any = {};
type FormattedZammadTicket = { type FormattedZammadTicket = {
data: Record<string, unknown>; data: Record<string, unknown>;
@ -17,7 +18,7 @@ const getZammadTickets = async (
): Promise<[boolean, FormattedZammadTicket[]]> => { ): Promise<[boolean, FormattedZammadTicket[]]> => {
const { const {
leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId }, leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId },
} = await loadConfig(); } = config;
const headers = { Authorization: `Token ${zammadApiKey}` }; const headers = { Authorization: `Token ${zammadApiKey}` };
let shouldContinue = false; let shouldContinue = false;
const docs = []; const docs = [];
@ -161,7 +162,7 @@ const fetchFromZammad = async (
const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => { const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => {
const { const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey }, leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = await loadConfig(); } = config;
const headers = { const headers = {
Authorization: `Token ${labelStudioApiKey}`, Authorization: `Token ${labelStudioApiKey}`,
@ -185,7 +186,7 @@ const importLabelStudioTask = async (): Promise<void> => {
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {
const { const {
leafcutter: { contributorName }, leafcutter: { contributorName },
} = await loadConfig(); } = config;
const settingName = `${contributorName}ImportLabelStudioTask`; const settingName = `${contributorName}ImportLabelStudioTask`;
const res: any = await db.settings.findByName(settingName); const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp const startTimestamp = res?.value?.minUpdatedTimestamp

View file

@ -1,8 +1,9 @@
/* eslint-disable camelcase */ /* eslint-disable camelcase */
import fetch from "node-fetch";
import { URLSearchParams } from "url"; import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import { loadConfig } from "@digiresilience/bridge-config"; // import { loadConfig } from "@digiresilience/bridge-config";
const config: any = {};
type LabelStudioTicket = { type LabelStudioTicket = {
id: string; id: string;
@ -32,8 +33,7 @@ const getLabelStudioTickets = async (
): Promise<LabelStudioTicket[]> => { ): Promise<LabelStudioTicket[]> => {
const { const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey }, leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = await loadConfig(); } = config;
const headers = { const headers = {
Authorization: `Token ${labelStudioApiKey}`, Authorization: `Token ${labelStudioApiKey}`,
Accept: "application/json", Accept: "application/json",
@ -90,7 +90,7 @@ const sendToLeafcutter = async (tickets: LabelStudioTicket[]) => {
opensearchUsername, opensearchUsername,
opensearchPassword, opensearchPassword,
}, },
} = await loadConfig(); } = config;
console.log({ tickets }); console.log({ tickets });
const filteredTickets = tickets.filter((ticket) => ticket.is_labeled); const filteredTickets = tickets.filter((ticket) => ticket.is_labeled);
@ -148,7 +148,7 @@ const importLeafcutterTask = async (): Promise<void> => {
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {
const { const {
leafcutter: { contributorName }, leafcutter: { contributorName },
} = await loadConfig(); } = config;
const settingName = `${contributorName}ImportLeafcutterTask`; const settingName = `${contributorName}ImportLeafcutterTask`;
const res: any = await db.settings.findByName(settingName); const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp const startTimestamp = res?.value?.minUpdatedTimestamp

View file

@ -1,8 +1,10 @@
/* eslint-disable camelcase */ /* eslint-disable camelcase */
import logger from "../logger"; // import logger from "../logger";
import { IncomingMessagev1 } from "@digiresilience/node-signald/build/main/generated"; // import { IncomingMessagev1 } from "@digiresilience/node-signald/build/main/generated";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import workerUtils from "../utils"; import workerUtils from "../../lib/utils";
type IncomingMessagev1 = any;
interface WebhookPayload { interface WebhookPayload {
to: string; to: string;
@ -25,7 +27,8 @@ interface SignaldMessageTaskOptions {
} }
const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => { const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
const { botId, botPhoneNumber, message, attachment, filename, mimetype } = opts; const { botId, botPhoneNumber, message, attachment, filename, mimetype } =
opts;
const { source, timestamp, data_message: dataMessage } = message; const { source, timestamp, data_message: dataMessage } = message;
const { number }: any = source; const { number }: any = source;
@ -46,7 +49,7 @@ const formatPayload = (opts: SignaldMessageTaskOptions): WebhookPayload => {
const notifyWebhooks = async ( const notifyWebhooks = async (
db: AppDatabase, db: AppDatabase,
messageInfo: SignaldMessageTaskOptions messageInfo: SignaldMessageTaskOptions,
) => { ) => {
const { const {
botId, botId,
@ -54,16 +57,16 @@ const notifyWebhooks = async (
} = messageInfo; } = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId("signal", botId); const webhooks = await db.webhooks.findAllByBackendId("signal", botId);
if (webhooks && webhooks.length === 0) { if (webhooks && webhooks.length === 0) {
logger.debug({ botId }, "no webhooks registered for signal bot"); // logger.debug({ botId }, "no webhooks registered for signal bot");
return; return;
} }
webhooks.forEach(({ id }) => { webhooks.forEach(({ id }) => {
const payload = formatPayload(messageInfo); const payload = formatPayload(messageInfo);
logger.debug( // logger.debug(
{ payload }, // { payload },
"formatted signal bot payload for notify-webhook" // "formatted signal bot payload for notify-webhook",
); // );
workerUtils.addJob( workerUtils.addJob(
"notify-webhook", "notify-webhook",
{ {
@ -73,13 +76,13 @@ const notifyWebhooks = async (
{ {
// this de-deduplicates the job // this de-deduplicates the job
jobKey: `webhook-${id}-message-${botId}-${timestamp}`, jobKey: `webhook-${id}-message-${botId}-${timestamp}`,
} },
); );
}); });
}; };
const signaldMessageTask = async ( const signaldMessageTask = async (
options: SignaldMessageTaskOptions options: SignaldMessageTaskOptions,
): Promise<void> => { ): Promise<void> => {
console.log(options); console.log(options);
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {

View file

@ -1,8 +1,8 @@
import Wreck from "@hapi/wreck"; import Wreck from "@hapi/wreck";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import { twilioClientFor } from "../common"; import { twilioClientFor } from "../../lib/common";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call"; import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import workerUtils from "../utils"; import workerUtils from "../../lib/utils";
interface WebhookPayload { interface WebhookPayload {
startTime: string; startTime: string;
@ -27,7 +27,7 @@ const getTwilioRecording = async (url: string) => {
const formatPayload = ( const formatPayload = (
call: CallInstance, call: CallInstance,
recording: Buffer recording: Buffer,
): WebhookPayload => { ): WebhookPayload => {
return { return {
startTime: call.startTime.toISOString(), startTime: call.startTime.toISOString(),
@ -45,7 +45,7 @@ const notifyWebhooks = async (
db: AppDatabase, db: AppDatabase,
voiceLineId: string, voiceLineId: string,
call: CallInstance, call: CallInstance,
recording: Buffer recording: Buffer,
) => { ) => {
const webhooks = await db.webhooks.findAllByBackendId("voice", voiceLineId); const webhooks = await db.webhooks.findAllByBackendId("voice", voiceLineId);
if (webhooks && webhooks.length === 0) return; if (webhooks && webhooks.length === 0) return;
@ -61,7 +61,7 @@ const notifyWebhooks = async (
{ {
// this de-depuplicates the job // this de-depuplicates the job
jobKey: `webhook-${id}-call-${call.sid}`, jobKey: `webhook-${id}-call-${call.sid}`,
} },
); );
}); });
}; };
@ -74,7 +74,7 @@ interface TwilioRecordingTaskOptions {
} }
const twilioRecordingTask = async ( const twilioRecordingTask = async (
options: TwilioRecordingTaskOptions options: TwilioRecordingTaskOptions,
): Promise<void> => ): Promise<void> =>
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {
const { voiceLineId, accountSid, callSid, recordingSid } = options; const { voiceLineId, accountSid, callSid, recordingSid } = options;

View file

@ -1,6 +1,6 @@
import { createHash } from "crypto"; import { createHash } from "crypto";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import { convert } from "../lib/media-convert"; import { convert } from "../../lib/media-convert";
interface VoiceLineAudioUpdateTaskOptions { interface VoiceLineAudioUpdateTaskOptions {
voiceLineId: string; voiceLineId: string;
@ -13,7 +13,7 @@ const sha1sum = (v: any) => {
}; };
const voiceLineAudioUpdateTask = async ( const voiceLineAudioUpdateTask = async (
payload: VoiceLineAudioUpdateTaskOptions payload: VoiceLineAudioUpdateTaskOptions,
): Promise<void> => ): Promise<void> =>
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {
const { voiceLineId } = payload; const { voiceLineId } = payload;
@ -41,7 +41,7 @@ const voiceLineAudioUpdateTask = async (
"audio/mpeg": mp3.toString("base64"), "audio/mpeg": mp3.toString("base64"),
checksum: webmSha1, checksum: webmSha1,
}, },
} },
); );
}); });

View file

@ -1,6 +1,8 @@
import Twilio from "twilio"; import Twilio from "twilio";
import config from "@digiresilience/bridge-config"; // import config from "@digiresilience/bridge-config";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
const config: any = {};
interface VoiceLineDeleteTaskOptions { interface VoiceLineDeleteTaskOptions {
voiceLineId: string; voiceLineId: string;

View file

@ -1,6 +1,8 @@
import Twilio from "twilio"; import Twilio from "twilio";
import config from "@digiresilience/bridge-config"; // import config from "@digiresilience/bridge-config";
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
const config: any = {};
interface VoiceLineUpdateTaskOptions { interface VoiceLineUpdateTaskOptions {
voiceLineId: string; voiceLineId: string;

View file

@ -1,6 +1,6 @@
/* eslint-disable camelcase */ /* eslint-disable camelcase */
import { withDb, AppDatabase } from "../db"; import { withDb, AppDatabase } from "../../lib/db";
import workerUtils from "../utils"; import workerUtils from "../../lib/utils";
interface WebhookPayload { interface WebhookPayload {
to: string; to: string;
@ -25,7 +25,7 @@ interface WhatsappMessageTaskOptions {
} }
const formatPayload = ( const formatPayload = (
messageInfo: WhatsappMessageTaskOptions messageInfo: WhatsappMessageTaskOptions,
): WebhookPayload => { ): WebhookPayload => {
const { const {
waMessageId, waMessageId,
@ -37,7 +37,8 @@ const formatPayload = (
botPhoneNumber, botPhoneNumber,
} = messageInfo; } = messageInfo;
const parsedMessage = JSON.parse(waMessage); const parsedMessage = JSON.parse(waMessage);
const message = parsedMessage.message?.conversation ?? const message =
parsedMessage.message?.conversation ??
parsedMessage.message?.extendedTextMessage?.text ?? parsedMessage.message?.extendedTextMessage?.text ??
parsedMessage.message?.imageMessage?.caption ?? parsedMessage.message?.imageMessage?.caption ??
parsedMessage.message?.videoMessage?.caption; parsedMessage.message?.videoMessage?.caption;
@ -56,12 +57,12 @@ const formatPayload = (
const notifyWebhooks = async ( const notifyWebhooks = async (
db: AppDatabase, db: AppDatabase,
messageInfo: WhatsappMessageTaskOptions messageInfo: WhatsappMessageTaskOptions,
) => { ) => {
const { waMessageId, whatsappBotId } = messageInfo; const { waMessageId, whatsappBotId } = messageInfo;
const webhooks = await db.webhooks.findAllByBackendId( const webhooks = await db.webhooks.findAllByBackendId(
"whatsapp", "whatsapp",
whatsappBotId whatsappBotId,
); );
if (webhooks && webhooks.length === 0) return; if (webhooks && webhooks.length === 0) return;
@ -77,13 +78,13 @@ const notifyWebhooks = async (
{ {
// this de-deduplicates the job // this de-deduplicates the job
jobKey: `webhook-${id}-message-${waMessageId}`, jobKey: `webhook-${id}-message-${waMessageId}`,
} },
); );
}); });
}; };
const whatsappMessageTask = async ( const whatsappMessageTask = async (
options: WhatsappMessageTaskOptions options: WhatsappMessageTaskOptions,
): Promise<void> => { ): Promise<void> => {
console.log(options); console.log(options);
withDb(async (db: AppDatabase) => { withDb(async (db: AppDatabase) => {

View file

@ -1,10 +1,16 @@
{ {
"extends": "tsconfig", "extends": "ts-config",
"compilerOptions": { "compilerOptions": {
"outDir": "build/main", "outDir": "build/main",
"module": "CommonJS",
"esModuleInterop": true, "esModuleInterop": true,
"skipLibCheck": true "skipLibCheck": true
}, },
"ts-node": {
"module": "ESNext",
"target": "ESNext",
"moduleResolution": "node"
},
"include": ["**/*.ts", "**/.*.ts"], "include": ["**/*.ts", "**/.*.ts"],
"exclude": ["node_modules", "build"] "exclude": ["node_modules", "build"]
} }

67
apps/index.ts Normal file
View file

@ -0,0 +1,67 @@
import { run, Runner, RunnerOptions } from "graphile-worker";
import { parseCronItems } from "graphile-worker";
// import { initPgp } from "./db.js";
// import logger from "./logger.js";
// import workerUtils from "./utils.js";
import { assertFfmpegAvailable } from "./lib/media-convert.js";
/*
const logFactory = (scope: any) => (level: any, message: any, meta: any) => {
const pinoLevel = level === "warning" ? "warn" : level;
const childLogger = logger.child({ scope });
if (meta) childLogger[pinoLevel](meta, message);
else childLogger[pinoLevel](message);
};
*/
const getConfig = (): RunnerOptions => {
// logger.info({ concurrency, pollInterval }, "Starting worker");
return {
concurrency: 10,
pollInterval: 1000,
// logger: new Worker.Logger(logFactory),
connectionString: process.env.DATABASE_URL,
// eslint-disable-next-line unicorn/prefer-module
taskDirectory: `${__dirname}/tasks`,
// parsedCronItems: parseCronItems(
// leafcutterEnabled
// ? [
// { task: "import-label-studio", pattern: "*/15 * * * *" },
// { task: "import-leafcutter", pattern: "*/17 * * * *" },
// ]
// : [],
// )
};
};
export const startWorker = async (): Promise<Runner> => {
// ensure ffmpeg is installed and working
await assertFfmpegAvailable();
// logger.info("ffmpeg found");
// await workerUtils.migrate();
// logger.info("worker database migrated");
// initPgp();
const workerConfig = await configWorker();
const worker = await Worker.run(workerConfig);
return worker;
};
export const stopWorker = async (): Promise<void> => {
await worker.stop();
};
const main = async () => {
await worker.start();
process.on("SIGTERM", async () => {
await worker.stop();
process.exit(0);
});
};
main().catch((err) => {
console.error(err);
process.exit(1);
});

17266
package-lock.json generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,7 @@ In `tsconfig.json`
```json ```json
{ {
"extends": "tsconfig", "extends": "ts-config",
"compilerOptions": { "compilerOptions": {
"incremental": true, "incremental": true,
"outDir": "build/main", "outDir": "build/main",