2023-02-13 12:41:30 +00:00
|
|
|
import * as Worker from "graphile-worker";
|
2023-03-13 15:53:21 +00:00
|
|
|
import { parseCronItems } from "graphile-worker";
|
2023-02-13 12:41:30 +00:00
|
|
|
import { defState } from "@digiresilience/montar";
|
2023-03-13 22:14:52 +00:00
|
|
|
import config from "@digiresilience/metamigo-config";
|
2023-06-02 14:05:20 +00:00
|
|
|
import { initPgp } from "./db.js";
|
|
|
|
|
import logger from "./logger.js";
|
|
|
|
|
import workerUtils from "./utils.js";
|
|
|
|
|
import { assertFfmpegAvailable } from "./lib/media-convert.js";
|
2023-02-13 12:41:30 +00:00
|
|
|
|
|
|
|
|
const logFactory = (scope: any) => (level: any, message: any, meta: any) => {
|
|
|
|
|
const pinoLevel = level === "warning" ? "warn" : level;
|
|
|
|
|
const childLogger = logger.child({ scope });
|
|
|
|
|
if (meta) childLogger[pinoLevel](meta, message);
|
|
|
|
|
else childLogger[pinoLevel](message);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
export const configWorker = async (): Promise<Worker.RunnerOptions> => {
|
2023-03-13 15:53:21 +00:00
|
|
|
const {
|
|
|
|
|
connection,
|
|
|
|
|
concurrency,
|
|
|
|
|
pollInterval,
|
|
|
|
|
leafcutter: {
|
|
|
|
|
enabled: leafcutterEnabled
|
|
|
|
|
}
|
|
|
|
|
} = config.worker;
|
2023-02-13 12:41:30 +00:00
|
|
|
logger.info({ concurrency, pollInterval }, "Starting worker");
|
|
|
|
|
return {
|
|
|
|
|
concurrency,
|
|
|
|
|
pollInterval,
|
|
|
|
|
logger: new Worker.Logger(logFactory),
|
|
|
|
|
connectionString: connection,
|
|
|
|
|
// eslint-disable-next-line unicorn/prefer-module
|
|
|
|
|
taskDirectory: `${__dirname}/tasks`,
|
2023-03-13 15:53:21 +00:00
|
|
|
parsedCronItems: parseCronItems(leafcutterEnabled ?
|
|
|
|
|
[{ task: "import-label-studio", pattern: "*/15 * * * *" },
|
|
|
|
|
{ task: "import-leafcutter", pattern: "*/17 * * * *" }] :
|
|
|
|
|
[])
|
2023-02-13 12:41:30 +00:00
|
|
|
};
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
export const startWorker = async (): Promise<Worker.Runner> => {
|
|
|
|
|
// ensure ffmpeg is installed and working
|
|
|
|
|
await assertFfmpegAvailable();
|
|
|
|
|
logger.info("ffmpeg found");
|
|
|
|
|
|
|
|
|
|
await workerUtils.migrate();
|
|
|
|
|
logger.info("worker database migrated");
|
|
|
|
|
|
|
|
|
|
initPgp();
|
|
|
|
|
|
|
|
|
|
const workerConfig = await configWorker();
|
|
|
|
|
const worker = await Worker.run(workerConfig);
|
|
|
|
|
return worker;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
export const stopWorker = async (): Promise<void> => {
|
|
|
|
|
await worker.stop();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const worker = defState("worker", {
|
|
|
|
|
start: startWorker,
|
|
|
|
|
stop: stopWorker,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
export default worker;
|