import * as Worker from "graphile-worker"; import { parseCronItems } from "graphile-worker"; import { defState } from "@digiresilience/montar"; import config from "config"; import { initPgp } from "./db"; import logger from "./logger"; import workerUtils from "./utils"; import { assertFfmpegAvailable } from "./lib/media-convert"; const logFactory = (scope: any) => (level: any, message: any, meta: any) => { const pinoLevel = level === "warning" ? "warn" : level; const childLogger = logger.child({ scope }); if (meta) childLogger[pinoLevel](meta, message); else childLogger[pinoLevel](message); }; export const configWorker = async (): Promise => { const { connection, concurrency, pollInterval, leafcutter: { enabled: leafcutterEnabled } } = config.worker; logger.info({ concurrency, pollInterval }, "Starting worker"); return { concurrency, pollInterval, logger: new Worker.Logger(logFactory), connectionString: connection, // eslint-disable-next-line unicorn/prefer-module taskDirectory: `${__dirname}/tasks`, parsedCronItems: parseCronItems(leafcutterEnabled ? [{ task: "import-label-studio", pattern: "*/15 * * * *" }, { task: "import-leafcutter", pattern: "*/17 * * * *" }] : []) }; }; export const startWorker = async (): Promise => { // ensure ffmpeg is installed and working await assertFfmpegAvailable(); logger.info("ffmpeg found"); await workerUtils.migrate(); logger.info("worker database migrated"); initPgp(); const workerConfig = await configWorker(); const worker = await Worker.run(workerConfig); return worker; }; export const stopWorker = async (): Promise => { await worker.stop(); }; const worker = defState("worker", { start: startWorker, stop: stopWorker, }); export default worker;