WhatsApp/Signal/Formstack/admin updates

This commit is contained in:
Darren Clarke 2025-11-21 14:55:28 +01:00
parent bcecf61a46
commit d0cc5a21de
451 changed files with 16139 additions and 39623 deletions

View file

@ -2,26 +2,35 @@ FROM node:22-bookworm-slim AS base
FROM base AS builder
ARG APP_DIR=/opt/bridge-worker
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
RUN mkdir -p ${APP_DIR}/
RUN npm i -g turbo
RUN corepack enable && corepack prepare pnpm@9.15.4 --activate
RUN pnpm add -g turbo
WORKDIR ${APP_DIR}
COPY . .
RUN turbo prune --scope=@link-stack/bridge-worker --docker
FROM base AS installer
ARG APP_DIR=/opt/bridge-worker
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
WORKDIR ${APP_DIR}
RUN corepack enable && corepack prepare pnpm@9.15.4 --activate
COPY --from=builder ${APP_DIR}/out/json/ .
COPY --from=builder ${APP_DIR}/out/full/ .
COPY --from=builder ${APP_DIR}/out/package-lock.json ./package-lock.json
RUN npm ci
RUN npm i -g turbo
COPY --from=builder ${APP_DIR}/out/pnpm-lock.yaml ./pnpm-lock.yaml
RUN pnpm install --frozen-lockfile
RUN pnpm add -g turbo
RUN turbo run build --filter=@link-stack/bridge-worker
FROM base as runner
ARG BUILD_DATE
ARG VERSION
ARG APP_DIR=/opt/bridge-worker
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
RUN corepack enable && corepack prepare pnpm@9.15.4 --activate
RUN mkdir -p ${APP_DIR}/
RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
apt-get install -y --no-install-recommends \

View file

@ -0,0 +1,144 @@
# Bridge Worker
Background job processor for handling asynchronous tasks in the CDR Link communication bridge system.
## Overview
Bridge Worker uses Graphile Worker to process queued jobs for message handling, media conversion, webhook notifications, and scheduled tasks. It manages the flow of messages between various communication channels (Signal, WhatsApp, Facebook, Voice) and the Zammad ticketing system.
## Features
- **Message Processing**: Handle incoming/outgoing messages for all supported channels
- **Media Conversion**: Convert audio/video files between formats
- **Webhook Notifications**: Notify external systems of events
- **Scheduled Tasks**: Cron-based job scheduling
- **Job Queue Management**: Reliable job processing with retries
- **Multi-Channel Support**: Signal, WhatsApp, Facebook, Voice (Twilio)
## Development
### Prerequisites
- Node.js >= 20
- npm >= 10
- PostgreSQL database
- Redis (for caching)
- FFmpeg (for media conversion)
### Setup
```bash
# Install dependencies
npm install
# Build TypeScript
npm run build
# Run development server with auto-reload
npm run dev
# Start production worker
npm run start
```
### Environment Variables
Required environment variables:
- `DATABASE_URL` - PostgreSQL connection string
- `GRAPHILE_WORKER_CONCURRENCY` - Number of concurrent jobs (default: 10)
- `GRAPHILE_WORKER_POLL_INTERVAL` - Job poll interval in ms (default: 1000)
- `ZAMMAD_URL` - Zammad instance URL
- `ZAMMAD_API_TOKEN` - Zammad API token
- `TWILIO_ACCOUNT_SID` - Twilio account SID
- `TWILIO_AUTH_TOKEN` - Twilio auth token
- `SIGNAL_CLI_URL` - Signal CLI REST API URL
- `WHATSAPP_SERVICE_URL` - WhatsApp bridge service URL
- `FACEBOOK_APP_SECRET` - Facebook app secret
- `FACEBOOK_PAGE_ACCESS_TOKEN` - Facebook page token
### Available Scripts
- `npm run build` - Compile TypeScript
- `npm run dev` - Development mode with watch
- `npm run start` - Start production worker
## Task Types
### Signal Tasks
- `receive-signal-message` - Process incoming Signal messages
- `send-signal-message` - Send outgoing Signal messages
- `fetch-signal-messages` - Fetch messages from Signal CLI
### WhatsApp Tasks
- `receive-whatsapp-message` - Process incoming WhatsApp messages
- `send-whatsapp-message` - Send outgoing WhatsApp messages
### Facebook Tasks
- `receive-facebook-message` - Process incoming Facebook messages
- `send-facebook-message` - Send outgoing Facebook messages
### Voice Tasks
- `receive-voice-message` - Process incoming voice calls/messages
- `send-voice-message` - Send voice messages via Twilio
- `twilio-recording` - Handle Twilio call recordings
- `voice-line-audio-update` - Update voice line audio
- `voice-line-delete` - Delete voice line
- `voice-line-provider-update` - Update voice provider settings
### Common Tasks
- `notify-webhooks` - Send webhook notifications
- `import-label-studio` - Import Label Studio annotations
## Architecture
### Job Processing
Jobs are queued in PostgreSQL using Graphile Worker:
```typescript
await addJob('send-signal-message', {
to: '+1234567890',
message: 'Hello world'
})
```
### Cron Schedule
Scheduled tasks are configured in `crontab`:
- Periodic message fetching
- Cleanup tasks
- Health checks
### Error Handling
- Automatic retries with exponential backoff
- Dead letter queue for failed jobs
- Comprehensive logging with winston
## Media Handling
Supports conversion between formats:
- Audio: MP3, OGG, WAV, M4A
- Uses fluent-ffmpeg for processing
- Automatic format detection
## Integration Points
- **Zammad**: Creates/updates tickets via API
- **Signal CLI**: REST API for Signal messaging
- **WhatsApp Bridge**: HTTP API for WhatsApp
- **Twilio**: Voice and SMS capabilities
- **Facebook**: Graph API for Messenger
## Docker Support
```bash
# Build image
docker build -t link-stack/bridge-worker .
# Run with docker-compose
docker-compose -f docker/compose/bridge.yml up
```
The worker includes cron support via built-in crontab.

View file

@ -1 +1,2 @@
*/1 * * * * fetch-signal-messages ?max=1&id=fetchSignalMessagesCron {"scheduleTasks": "true"}
*/2 * * * * check-group-membership ?max=1&id=checkGroupMembershipCron {}

View file

@ -2,4 +2,4 @@
set -e
echo "starting bridge-worker"
exec dumb-init npm run start
exec dumb-init pnpm run start

View file

@ -1,11 +1,14 @@
import type {} from "graphile-config";
import type {} from "graphile-worker";
const preset: GraphileConfig.Preset = {
const preset: any = {
worker: {
connectionString: process.env.DATABASE_URL,
maxPoolSize: 10,
pollInterval: 2000,
maxPoolSize: process.env.BRIDGE_WORKER_POOL_SIZE
? parseInt(process.env.BRIDGE_WORKER_POOL_SIZE, 10)
: 10,
pollInterval: process.env.BRIDGE_WORKER_POLL_INTERVAL
? parseInt(process.env.BRIDGE_WORKER_POLL_INTERVAL, 10)
: 2000,
fileExtensions: [".ts"],
},
};

View file

@ -1,18 +1,27 @@
import { run } from "graphile-worker";
import { createLogger } from "@link-stack/logger";
import * as path from "path";
import { fileURLToPath } from "url";
const logger = createLogger("bridge-worker");
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const startWorker = async () => {
console.log("Starting worker...");
console.log(process.env);
logger.info("Starting worker...");
await run({
connectionString: process.env.DATABASE_URL,
concurrency: 10,
noHandleSignals: false,
pollInterval: 1000,
concurrency: process.env.BRIDGE_WORKER_CONCURRENCY
? parseInt(process.env.BRIDGE_WORKER_CONCURRENCY, 10)
: 10,
maxPoolSize: process.env.BRIDGE_WORKER_POOL_SIZE
? parseInt(process.env.BRIDGE_WORKER_POOL_SIZE, 10)
: 10,
pollInterval: process.env.BRIDGE_WORKER_POLL_INTERVAL
? parseInt(process.env.BRIDGE_WORKER_POLL_INTERVAL, 10)
: 1000,
taskDirectory: `${__dirname}/tasks`,
crontabFile: `${__dirname}/crontab`,
});
@ -23,6 +32,15 @@ const main = async () => {
};
main().catch((err) => {
console.error(err);
logger.error(
{
error: err,
message: err.message,
stack: err.stack,
name: err.name,
},
"Worker failed to start",
);
console.error("Full error:", err);
process.exit(1);
});

View file

@ -1,8 +1,6 @@
/* eslint-disable camelcase */
// import { SavedVoiceProvider } from "@digiresilience/bridge-db";
import Twilio from "twilio";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import { Zammad, getOrCreateUser } from "./zammad.js";
type SavedVoiceProvider = any;
@ -20,52 +18,3 @@ export const twilioClientFor = (
});
};
export const createZammadTicket = async (
call: CallInstance,
mp3: Buffer,
): Promise<void> => {
const title = `Call from ${call.fromFormatted} at ${call.startTime}`;
const body = `<ul>
<li>Caller: ${call.fromFormatted}</li>
<li>Service Number: ${call.toFormatted}</li>
<li>Call Duration: ${call.duration} seconds</li>
<li>Start Time: ${call.startTime}</li>
<li>End Time: ${call.endTime}</li>
</ul>
<p>See the attached recording.</p>`;
const filename = `${call.sid}-${call.startTime}.mp3`;
const zammad = Zammad(
{
token: "EviH_WL0p6YUlCoIER7noAZEAPsYA_fVU4FZCKdpq525Vmzzvl8d7dNuP_8d-Amb",
},
"https://demo.digiresilience.org",
);
try {
const customer = await getOrCreateUser(zammad, call.fromFormatted);
await zammad.ticket.create({
title,
group: "Finances",
note: "This ticket was created automaticaly from a recorded phone call.",
customer_id: customer.id,
article: {
body,
subject: title,
content_type: "text/html",
type: "note",
attachments: [
{
filename,
data: mp3.toString("base64"),
"mime-type": "audio/mpeg",
},
],
},
});
} catch (error: any) {
console.log(Object.keys(error));
if (error.isBoom) {
console.log(error.output);
throw new Error("Failed to create zamamd ticket");
}
}
};

View file

@ -0,0 +1,272 @@
import { createLogger } from "@link-stack/logger";
const logger = createLogger('formstack-field-mapping');
/**
* Field mapping configuration for Formstack to Zammad integration
*
* This configuration is completely flexible - you define your own internal field names
* and map them to both Formstack source fields and Zammad custom fields.
*/
export interface FieldMappingConfig {
/**
* Map internal field keys to Formstack field names
*
* Required keys (system):
* - formId: The Formstack Form ID field
* - uniqueId: The Formstack submission unique ID field
*
* Optional keys with special behavior:
* - email: Used for user lookup/creation (if provided)
* - phone: Used for user lookup/creation (if provided)
* - signalAccount: Used for Signal-based user lookup (tried first before phone)
* - name: User's full name (can be nested object with first/last, used in user creation)
* - organization: Used in ticket title template placeholder {organization}
* - typeOfSupport: Used in ticket title template placeholder {typeOfSupport}
* - descriptionOfIssue: Used as article subject (defaults to "Support Request" if not provided)
*
* All other keys are completely arbitrary and defined by your form.
*/
sourceFields: Record<string, string>;
/**
* Map Zammad custom field names to internal field keys (from sourceFields)
*
* Example:
* {
* "us_state": "state", // Zammad field "us_state" gets value from sourceFields["state"]
* "zip_code": "zipCode", // Zammad field "zip_code" gets value from sourceFields["zipCode"]
* "custom_field": "myField" // Any custom field mapping
* }
*
* The values in this object must correspond to keys in sourceFields.
*/
zammadFields: Record<string, string>;
/**
* Configuration for ticket creation
*/
ticket: {
/** Zammad group name to assign tickets to */
group: string;
/** Article type name (e.g., "note", "cdr_signal", "email") */
defaultArticleType: string;
/**
* Template for ticket title
* Supports placeholders: {name}, {organization}, {typeOfSupport}
* Placeholders reference internal field keys from sourceFields
*/
titleTemplate?: string;
};
/**
* Configuration for extracting nested field values
*/
nestedFields?: {
/**
* How to extract first/last name from a nested Name field
* Example: { firstNamePath: "first", lastNamePath: "last" }
* for a field like { "Name": { "first": "John", "last": "Doe" } }
*/
name?: {
firstNamePath?: string;
lastNamePath?: string;
};
};
}
let cachedMapping: FieldMappingConfig | null = null;
/**
* Load field mapping configuration from environment variable (REQUIRED)
*/
export function loadFieldMapping(): FieldMappingConfig {
if (cachedMapping) {
return cachedMapping;
}
const configJson = process.env.FORMSTACK_FIELD_MAPPING;
if (!configJson) {
throw new Error(
'FORMSTACK_FIELD_MAPPING environment variable is required. ' +
'Please set it to a JSON string containing your field mapping configuration.'
);
}
logger.info('Loading Formstack field mapping from environment variable');
try {
const config = JSON.parse(configJson) as FieldMappingConfig;
// Validate required sections exist
if (!config.sourceFields || typeof config.sourceFields !== 'object') {
throw new Error('Invalid field mapping configuration: sourceFields must be an object');
}
if (!config.zammadFields || typeof config.zammadFields !== 'object') {
throw new Error('Invalid field mapping configuration: zammadFields must be an object');
}
if (!config.ticket || typeof config.ticket !== 'object') {
throw new Error('Invalid field mapping configuration: ticket must be an object');
}
// Validate required ticket fields
if (!config.ticket.group) {
throw new Error('Invalid field mapping configuration: ticket.group is required');
}
if (!config.ticket.defaultArticleType) {
throw new Error('Invalid field mapping configuration: ticket.defaultArticleType is required');
}
// Validate required source fields
const systemRequiredFields = ['formId', 'uniqueId'];
for (const field of systemRequiredFields) {
if (!config.sourceFields[field]) {
throw new Error(`Invalid field mapping configuration: sourceFields.${field} is required (system field)`);
}
}
// Validate zammadFields reference valid sourceFields
for (const [zammadField, sourceKey] of Object.entries(config.zammadFields)) {
if (!config.sourceFields[sourceKey]) {
logger.warn(
{ zammadField, sourceKey },
'Zammad field maps to non-existent source field key'
);
}
}
logger.info('Successfully loaded Formstack field mapping configuration');
cachedMapping = config;
return cachedMapping;
} catch (error) {
logger.error({
error: error instanceof Error ? error.message : error,
jsonLength: configJson.length
}, 'Failed to parse field mapping configuration');
throw new Error(
`Failed to parse Formstack field mapping JSON: ${error instanceof Error ? error.message : error}`
);
}
}
/**
* Get a field value from formData using the source field name mapping
*/
export function getFieldValue(
formData: any,
internalFieldKey: string,
mapping?: FieldMappingConfig
): any {
const config = mapping || loadFieldMapping();
const sourceFieldName = config.sourceFields[internalFieldKey];
if (!sourceFieldName) {
return undefined;
}
return formData[sourceFieldName];
}
/**
* Get a nested field value (e.g., Name.first)
*/
export function getNestedFieldValue(
fieldValue: any,
path: string | undefined
): any {
if (!path || !fieldValue) {
return undefined;
}
const parts = path.split('.');
let current = fieldValue;
for (const part of parts) {
if (current && typeof current === 'object') {
current = current[part];
} else {
return undefined;
}
}
return current;
}
/**
* Format field value (handle arrays, objects, etc.)
*/
export function formatFieldValue(value: any): string | undefined {
if (value === null || value === undefined || value === '') {
return undefined;
}
if (Array.isArray(value)) {
return value.join(', ');
}
if (typeof value === 'object') {
return JSON.stringify(value);
}
return String(value);
}
/**
* Build ticket title from template and data
* Replaces placeholders like {name}, {organization}, {typeOfSupport} with provided values
*/
export function buildTicketTitle(
mapping: FieldMappingConfig,
data: Record<string, string | undefined>
): string {
const template = mapping.ticket.titleTemplate || '{name}';
let title = template;
// Replace all placeholders in the template
for (const [key, value] of Object.entries(data)) {
const placeholder = `{${key}}`;
if (title.includes(placeholder)) {
if (value) {
title = title.replace(placeholder, value);
} else {
// Remove empty placeholder and surrounding separators
title = title.replace(` - ${placeholder}`, '').replace(`${placeholder} - `, '').replace(placeholder, '');
}
}
}
return title.trim();
}
/**
* Get all Zammad field values from form data using the mapping
* Returns an object with Zammad field names as keys and formatted values
*/
export function getZammadFieldValues(
formData: any,
mapping?: FieldMappingConfig
): Record<string, string> {
const config = mapping || loadFieldMapping();
const result: Record<string, string> = {};
for (const [zammadFieldName, sourceKey] of Object.entries(config.zammadFields)) {
const value = getFieldValue(formData, sourceKey, config);
const formatted = formatFieldValue(value);
if (formatted !== undefined) {
result[zammadFieldName] = formatted;
}
}
return result;
}
/**
* Reset cached mapping (useful for testing)
*/
export function resetMappingCache(): void {
cachedMapping = null;
}

View file

@ -1,11 +0,0 @@
//import { defState } from "@digiresilience/montar";
//import { configureLogger } from "@digiresilience/bridge-common";
// import config from "@digiresilience/bridge-config";
//export const logger = defState("workerLogger", {
// start: async () => configureLogger(config),
//});
//export default logger;
export const logger = {};
export default logger;

View file

@ -1,6 +1,9 @@
import { Readable } from "stream";
import ffmpeg from "fluent-ffmpeg";
import * as R from "remeda";
import { createLogger } from "@link-stack/logger";
const logger = createLogger('bridge-worker-media-convert');
const requiredCodecs = ["mp3", "webm", "wav"];
@ -25,7 +28,7 @@ const defaultAudioConvertOpts = {
**/
export const convert = (
input: Buffer,
opts?: AudioConvertOpts
opts?: AudioConvertOpts,
): Promise<Buffer> => {
const settings = { ...defaultAudioConvertOpts, ...opts };
return new Promise((resolve, reject) => {
@ -35,12 +38,8 @@ export const convert = (
.audioCodec(settings.audioCodec)
.audioBitrate(settings.bitrate)
.toFormat(settings.format)
.on("error", (err, stdout, stderr) => {
console.error(err.message);
console.log("FFMPEG OUTPUT");
console.log(stdout);
console.log("FFMPEG ERROR");
console.log(stderr);
.on("error", (err, _stdout, _stderr) => {
logger.error({ error: err }, 'FFmpeg conversion error');
reject(err);
})
.on("end", () => {
@ -62,12 +61,16 @@ export const selfCheck = (): Promise<boolean> => {
return new Promise((resolve) => {
ffmpeg.getAvailableFormats((err, codecs) => {
if (err) {
console.error("FFMPEG error:", err);
logger.error({ error: err }, 'FFMPEG error');
resolve(false);
}
const preds = R.map(requiredCodecs, (codec) => (available: any) =>
available[codec] && available[codec].canDemux && available[codec].canMux
const preds = R.map(
requiredCodecs,
(codec) => (available: any) =>
available[codec] &&
available[codec].canDemux &&
available[codec].canMux,
);
resolve(R.allPass(codecs, preds));
@ -79,6 +82,6 @@ export const assertFfmpegAvailable = async (): Promise<void> => {
const r = await selfCheck();
if (!r)
throw new Error(
`ffmpeg is not installed, could not be located, or does not support the required codecs: ${requiredCodecs}`
`ffmpeg is not installed, could not be located, or does not support the required codecs: ${requiredCodecs}`,
);
};

View file

@ -19,11 +19,13 @@ export interface Ticket {
export interface ZammadClient {
ticket: {
create: (data: any) => Promise<Ticket>;
update: (id: number, data: any) => Promise<Ticket>;
};
user: {
search: (data: any) => Promise<User[]>;
create: (data: any) => Promise<User>;
};
get: (path: string) => Promise<any>;
}
export type ZammadCredentials =
@ -39,7 +41,7 @@ const formatAuth = (credentials: any) => {
return (
"Basic " +
Buffer.from(`${credentials.username}:${credentials.password}`).toString(
"base64"
"base64",
)
);
}
@ -54,7 +56,7 @@ const formatAuth = (credentials: any) => {
export const Zammad = (
credentials: ZammadCredentials,
host: string,
opts?: ZammadClientOpts
opts?: ZammadClientOpts,
): ZammadClient => {
const extraHeaders = (opts && opts.headers) || {};
@ -73,6 +75,12 @@ export const Zammad = (
const { payload: result } = await wreck.post("tickets", { payload });
return result as Ticket;
},
update: async (id, payload) => {
const { payload: result } = await wreck.put(`tickets/${id}`, {
payload,
});
return result as Ticket;
},
},
user: {
search: async (query) => {
@ -85,22 +93,79 @@ export const Zammad = (
return result as User;
},
},
get: async (path) => {
const { payload: result } = await wreck.get(path);
return result;
},
};
};
/**
* Sanitizes phone number to E.164 format: +15554446666
* Strips all non-digit characters except +, ensures + prefix
* @param phoneNumber - Raw phone number (e.g., "(555) 444-6666", "5554446666", "+1 555 444 6666")
* @returns E.164 formatted phone number (e.g., "+15554446666")
* @throws Error if phone number is invalid
*/
export const sanitizePhoneNumber = (phoneNumber: string): string => {
// Remove all characters except digits and +
let cleaned = phoneNumber.replace(/[^\d+]/g, "");
// Ensure it starts with +
if (!cleaned.startsWith("+")) {
// Assume US/Canada if no country code (11 digits starting with 1, or 10 digits)
if (cleaned.length === 10) {
cleaned = "+1" + cleaned;
} else if (cleaned.length === 11 && cleaned.startsWith("1")) {
cleaned = "+" + cleaned;
} else if (cleaned.length >= 10) {
// International number without +, add it
cleaned = "+" + cleaned;
}
}
// Validate E.164 format: + followed by 10-15 digits
if (!/^\+\d{10,15}$/.test(cleaned)) {
throw new Error(`Invalid phone number format: ${phoneNumber}`);
}
return cleaned;
};
export const getUser = async (zammad: ZammadClient, phoneNumber: string) => {
const mungedNumber = phoneNumber.replace("+", "");
const results = await zammad.user.search(`phone:${mungedNumber}`);
// Sanitize to E.164 format
const sanitized = sanitizePhoneNumber(phoneNumber);
// Remove + for Zammad search query
const searchNumber = sanitized.replace("+", "");
// Try sanitized format first (e.g., "6464229653" for "+16464229653")
let results = await zammad.user.search(`phone:${searchNumber}`);
if (results.length > 0) return results[0];
// Fall back to searching for original input (handles legacy formatted numbers)
// This ensures we can find users with "(646) 422-9653" format in database
const originalCleaned = phoneNumber.replace(/[^\d+]/g, "").replace("+", "");
if (originalCleaned !== searchNumber) {
results = await zammad.user.search(`phone:${originalCleaned}`);
if (results.length > 0) return results[0];
}
return undefined;
};
export const getOrCreateUser = async (zammad: ZammadClient, phoneNumber: string) => {
export const getOrCreateUser = async (
zammad: ZammadClient,
phoneNumber: string,
) => {
const customer = await getUser(zammad, phoneNumber);
if (customer) return customer;
// Sanitize phone number to E.164 format before storing
const sanitized = sanitizePhoneNumber(phoneNumber);
return zammad.user.create({
phone: phoneNumber,
note: "User created by Grabadora from incoming voice call",
phone: sanitized,
note: "User created from incoming voice call",
});
};

View file

@ -1,6 +1,6 @@
{
"name": "@link-stack/bridge-worker",
"version": "2.2.0",
"version": "3.3.0",
"type": "module",
"main": "build/main/index.js",
"author": "Darren Clarke <darren@redaranj.com>",
@ -12,18 +12,19 @@
},
"dependencies": {
"@hapi/wreck": "^18.1.0",
"@link-stack/bridge-common": "*",
"@link-stack/signal-api": "*",
"@link-stack/bridge-common": "workspace:*",
"@link-stack/logger": "workspace:*",
"@link-stack/signal-api": "workspace:*",
"fluent-ffmpeg": "^2.1.3",
"graphile-worker": "^0.16.6",
"remeda": "^2.14.0",
"twilio": "^5.3.2"
"remeda": "^2.32.0",
"twilio": "^5.10.2"
},
"devDependencies": {
"@types/fluent-ffmpeg": "^2.1.26",
"dotenv-cli": "^7.4.2",
"@link-stack/eslint-config": "*",
"@link-stack/typescript-config": "*",
"typescript": "^5.6.2"
"@types/fluent-ffmpeg": "^2.1.27",
"dotenv-cli": "^10.0.0",
"@link-stack/eslint-config": "workspace:*",
"@link-stack/typescript-config": "workspace:*",
"typescript": "^5.9.3"
}
}

View file

@ -0,0 +1,121 @@
#!/usr/bin/env node
/**
* Check Signal group membership status and update Zammad tickets
*
* This task queries the Signal CLI API to check if users have joined
* their assigned groups. When a user joins (moves from pendingInvites to members),
* it updates the ticket's group_joined flag in Zammad.
*
* Note: This task sends webhooks for all group members every time it runs.
* The Zammad webhook handler is idempotent and will ignore duplicate notifications
* if group_joined is already true.
*/
import { db, getWorkerUtils } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
import * as signalApi from "@link-stack/signal-api";
const logger = createLogger("check-group-membership");
const { Configuration, GroupsApi } = signalApi;
interface CheckGroupMembershipTaskOptions {
// Optional: Check specific group. If not provided, checks all groups with group_joined=false
groupId?: string;
botToken?: string;
}
const checkGroupMembershipTask = async (
options: CheckGroupMembershipTaskOptions = {},
): Promise<void> => {
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const groupsClient = new GroupsApi(config);
const worker = await getWorkerUtils();
// Get all Signal bots
const bots = await db.selectFrom("SignalBot").selectAll().execute();
for (const bot of bots) {
try {
logger.debug(
{ botId: bot.id, phoneNumber: bot.phoneNumber },
"Checking groups for bot",
);
// Get all groups for this bot
const groups = await groupsClient.v1GroupsNumberGet({
number: bot.phoneNumber,
});
logger.debug(
{ botId: bot.id, groupCount: groups.length },
"Retrieved groups from Signal CLI",
);
// For each group, check if we have tickets waiting for members to join
for (const group of groups) {
if (!group.id || !group.internalId) {
logger.debug({ groupName: group.name }, "Skipping group without ID");
continue;
}
// Log info about each group temporarily for debugging
logger.info(
{
groupId: group.id,
groupName: group.name,
membersCount: group.members?.length || 0,
members: group.members,
pendingInvitesCount: group.pendingInvites?.length || 0,
pendingInvites: group.pendingInvites,
pendingRequestsCount: group.pendingRequests?.length || 0,
},
"Checking group membership",
);
// Notify Zammad about each member who has joined
// This handles both cases:
// 1. New contacts who must accept invite (they move from pendingInvites to members)
// 2. Existing contacts who are auto-added (they appear directly in members)
if (group.members && group.members.length > 0) {
for (const memberPhone of group.members) {
// Check if this member was previously pending
// We'll send the webhook and let Zammad decide if it needs to update
await worker.addJob("common/notify-webhooks", {
backendId: bot.id,
payload: {
event: "group_member_joined",
group_id: group.id,
member_phone: memberPhone,
timestamp: new Date().toISOString(),
},
});
logger.info(
{
groupId: group.id,
memberPhone,
},
"Notified Zammad about group member",
);
}
}
}
} catch (error: any) {
logger.error(
{
botId: bot.id,
error: error.message,
stack: error.stack,
},
"Error checking group membership for bot",
);
}
}
logger.info("Completed group membership check");
};
export default checkGroupMembershipTask;

View file

@ -1,4 +1,7 @@
import { db } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
const logger = createLogger('notify-webhooks');
export interface NotifyWebhooksOptions {
backendId: string;
@ -9,6 +12,11 @@ const notifyWebhooksTask = async (
options: NotifyWebhooksOptions,
): Promise<void> => {
const { backendId, payload } = options;
logger.debug({
backendId,
payloadKeys: Object.keys(payload),
}, 'Processing webhook notification');
const webhooks = await db
.selectFrom("Webhook")
@ -16,16 +24,48 @@ const notifyWebhooksTask = async (
.where("backendId", "=", backendId)
.execute();
logger.debug({ count: webhooks.length, backendId }, 'Found webhooks');
for (const webhook of webhooks) {
const { endpointUrl, httpMethod, headers } = webhook;
const finalHeaders = { "Content-Type": "application/json", ...headers };
console.log({ endpointUrl, httpMethod, headers, finalHeaders });
const result = await fetch(endpointUrl, {
const body = JSON.stringify(payload);
logger.debug({
url: endpointUrl,
method: httpMethod,
headers: finalHeaders,
body: JSON.stringify(payload),
});
console.log(result);
bodyLength: body.length,
headerKeys: Object.keys(finalHeaders),
}, 'Sending webhook');
try {
const result = await fetch(endpointUrl, {
method: httpMethod,
headers: finalHeaders,
body,
});
logger.debug({
url: endpointUrl,
status: result.status,
statusText: result.statusText,
ok: result.ok,
}, 'Webhook response');
if (!result.ok) {
const responseText = await result.text();
logger.error({
url: endpointUrl,
status: result.status,
responseSample: responseText.substring(0, 500),
}, 'Webhook error response');
}
} catch (error) {
logger.error({
url: endpointUrl,
error: error instanceof Error ? error.message : error,
}, 'Webhook request failed');
}
}
};

View file

@ -1,4 +1,7 @@
import { db } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
const logger = createLogger('bridge-worker-send-facebook-message');
interface SendFacebookMessageTaskOptions {
token: string;
@ -31,9 +34,8 @@ const sendFacebookMessageTask = async (
headers: { "Content-Type": "application/json" },
body: JSON.stringify(outgoingMessage),
});
console.log({ response });
} catch (error) {
console.error({ error });
logger.error({ error });
throw error;
}
};

View file

@ -1,6 +1,9 @@
import { db, getWorkerUtils } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
import * as signalApi from "@link-stack/signal-api";
const logger = createLogger("fetch-signal-messages");
const { Configuration, MessagesApi, AttachmentsApi } = signalApi;
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL,
@ -21,8 +24,23 @@ const fetchAttachments = async (attachments: any[] | undefined) => {
const arrayBuffer = await blob.arrayBuffer();
const base64Attachment = Buffer.from(arrayBuffer).toString("base64");
// Generate default filename if not provided by Signal API
let defaultFilename = name;
if (!defaultFilename) {
// Check if id already has an extension
const hasExtension = id.includes(".");
if (hasExtension) {
// ID already includes extension
defaultFilename = id;
} else {
// Add extension based on content type
const extension = contentType?.split("/")[1] || "bin";
defaultFilename = `${id}.${extension}`;
}
}
const formattedAttachment = {
filename: name,
filename: defaultFilename,
mimeType: contentType,
attachment: base64Attachment,
};
@ -46,21 +64,109 @@ const processMessage = async ({
message: msg,
}: ProcessMessageArgs): Promise<Record<string, any>[]> => {
const { envelope } = msg;
console.log(envelope);
const { source, sourceUuid, dataMessage } = envelope;
const { source, sourceUuid, dataMessage, syncMessage, receiptMessage, typingMessage } =
envelope;
// Log all envelope types to understand what events we're receiving
logger.info(
{
source,
sourceUuid,
hasDataMessage: !!dataMessage,
hasSyncMessage: !!syncMessage,
hasReceiptMessage: !!receiptMessage,
hasTypingMessage: !!typingMessage,
envelopeKeys: Object.keys(envelope),
},
"Received Signal envelope",
);
const isGroup = !!(
dataMessage?.groupV2 ||
dataMessage?.groupContext ||
dataMessage?.groupInfo
);
// Check if this is a group membership change event
const groupInfo = dataMessage?.groupInfo;
if (groupInfo) {
logger.info(
{
type: groupInfo.type,
groupId: groupInfo.groupId,
source,
groupInfoKeys: Object.keys(groupInfo),
fullGroupInfo: groupInfo,
},
"Received group info event",
);
// If user joined the group, notify Zammad
if (groupInfo.type === "JOIN" || groupInfo.type === "JOINED") {
const worker = await getWorkerUtils();
const groupId = groupInfo.groupId
? `group.${Buffer.from(groupInfo.groupId).toString("base64")}`
: null;
if (groupId) {
await worker.addJob("common/notify-webhooks", {
backendId: id,
payload: {
event: "group_member_joined",
group_id: groupId,
member_phone: source,
timestamp: new Date().toISOString(),
},
});
logger.info(
{
groupId,
memberPhone: source,
},
"User joined Signal group, notifying Zammad",
);
}
}
}
if (!dataMessage) return [];
const { attachments } = dataMessage;
const rawTimestamp = dataMessage?.timestamp;
logger.debug(
{
sourceUuid,
source,
rawTimestamp,
hasGroupV2: !!dataMessage?.groupV2,
hasGroupContext: !!dataMessage?.groupContext,
hasGroupInfo: !!dataMessage?.groupInfo,
isGroup,
groupV2Id: dataMessage?.groupV2?.id,
groupContextType: dataMessage?.groupContext?.type,
groupInfoType: dataMessage?.groupInfo?.type,
},
"Processing message",
);
const timestamp = new Date(rawTimestamp);
const formattedAttachments = await fetchAttachments(attachments);
const primaryAttachment = formattedAttachments[0] ?? {};
const additionalAttachments = formattedAttachments.slice(1);
const groupId =
dataMessage?.groupV2?.id ||
dataMessage?.groupContext?.id ||
dataMessage?.groupInfo?.groupId;
const toRecipient = groupId
? `group.${Buffer.from(groupId).toString("base64")}`
: phoneNumber;
const primaryMessage = {
token: id,
to: phoneNumber,
to: toRecipient,
from: source,
messageId: `${sourceUuid}-${rawTimestamp}`,
message: dataMessage?.message,
@ -68,6 +174,7 @@ const processMessage = async ({
attachment: primaryAttachment.attachment,
filename: primaryAttachment.filename,
mimeType: primaryAttachment.mimeType,
isGroup,
};
const formattedMessages = [primaryMessage];
@ -119,19 +226,29 @@ const fetchSignalMessagesTask = async ({
number: phoneNumber,
});
logger.debug({ botId: id, phoneNumber }, "Fetching messages for bot");
for (const message of messages) {
const formattedMessages = await processMessage({
id,
phoneNumber,
message,
});
console.log({ formattedMessages });
for (const formattedMessage of formattedMessages) {
if (formattedMessage.to !== formattedMessage.from) {
await worker.addJob(
"signal/receive-signal-message",
formattedMessage,
logger.debug(
{
messageId: formattedMessage.messageId,
from: formattedMessage.from,
to: formattedMessage.to,
isGroup: formattedMessage.isGroup,
hasMessage: !!formattedMessage.message,
hasAttachment: !!formattedMessage.attachment,
},
"Creating job for message",
);
await worker.addJob("signal/receive-signal-message", formattedMessage);
}
}
}

View file

@ -0,0 +1,436 @@
import { createLogger } from "@link-stack/logger";
import { db } from "@link-stack/bridge-common";
import { Zammad, getUser, sanitizePhoneNumber } from "../../lib/zammad.js";
import {
loadFieldMapping,
getFieldValue,
getNestedFieldValue,
formatFieldValue,
buildTicketTitle,
getZammadFieldValues,
type FieldMappingConfig,
} from "../../lib/formstack-field-mapping.js";
const logger = createLogger("create-ticket-from-form");
export interface CreateTicketFromFormOptions {
formData: any;
receivedAt: string;
}
const createTicketFromFormTask = async (
options: CreateTicketFromFormOptions,
): Promise<void> => {
const { formData, receivedAt } = options;
// Load field mapping configuration
const mapping = loadFieldMapping();
// Log only non-PII metadata using configured field names
const formId = getFieldValue(formData, "formId", mapping);
const uniqueId = getFieldValue(formData, "uniqueId", mapping);
logger.info(
{
formId,
uniqueId,
receivedAt,
fieldCount: Object.keys(formData).length,
},
"Processing Formstack form submission",
);
// Extract fields using dynamic mapping
const nameField = getFieldValue(formData, "name", mapping);
const firstName = mapping.nestedFields?.name?.firstNamePath
? getNestedFieldValue(nameField, mapping.nestedFields.name.firstNamePath) || ""
: "";
const lastName = mapping.nestedFields?.name?.lastNamePath
? getNestedFieldValue(nameField, mapping.nestedFields.name.lastNamePath) || ""
: "";
const fullName =
firstName && lastName
? `${firstName} ${lastName}`.trim()
: firstName || lastName || "Unknown";
// Extract well-known fields used for special logic (all optional)
const email = getFieldValue(formData, "email", mapping);
const rawPhone = getFieldValue(formData, "phone", mapping);
const rawSignalAccount = getFieldValue(formData, "signalAccount", mapping);
const organization = getFieldValue(formData, "organization", mapping);
const typeOfSupport = getFieldValue(formData, "typeOfSupport", mapping);
const descriptionOfIssue = getFieldValue(formData, "descriptionOfIssue", mapping);
// Sanitize phone numbers to E.164 format (+15554446666)
let phone: string | undefined;
if (rawPhone) {
try {
phone = sanitizePhoneNumber(rawPhone);
logger.info({ rawPhone, sanitized: phone }, "Sanitized phone number");
} catch (error: any) {
logger.warn({ rawPhone, error: error.message }, "Invalid phone number format, ignoring");
phone = undefined;
}
}
let signalAccount: string | undefined;
if (rawSignalAccount) {
try {
signalAccount = sanitizePhoneNumber(rawSignalAccount);
logger.info({ rawSignalAccount, sanitized: signalAccount }, "Sanitized signal account");
} catch (error: any) {
logger.warn({ rawSignalAccount, error: error.message }, "Invalid signal account format, ignoring");
signalAccount = undefined;
}
}
// Validate that at least one contact method is provided
if (!email && !phone && !signalAccount) {
logger.error(
{ formId, uniqueId },
"No contact information provided - at least one of email, phone, or signalAccount is required",
);
throw new Error(
"At least one contact method (email, phone, or signalAccount) is required for ticket creation",
);
}
// Build ticket title using configured template
// Pass all potentially used fields - the template determines which are actually used
const title = buildTicketTitle(mapping, {
name: fullName,
organization: formatFieldValue(organization),
typeOfSupport: formatFieldValue(typeOfSupport),
});
// Build article body - format all fields as HTML
const formatAllFields = (data: any): string => {
let html = "";
// Add formatted name field first if we have it
if (fullName && fullName !== "Unknown") {
html += `<strong>Name:</strong><br>${fullName}<br>`;
}
for (const [key, value] of Object.entries(data)) {
// Skip metadata fields and name field (we already formatted it above)
const skipFields = [
mapping.sourceFields.formId,
mapping.sourceFields.uniqueId,
mapping.sourceFields.name, // Skip raw name field
"HandshakeKey",
].filter(Boolean);
if (skipFields.includes(key)) continue;
if (value === null || value === undefined || value === "") continue;
const displayValue = Array.isArray(value)
? value.join(", ")
: typeof value === "object"
? JSON.stringify(value)
: value;
html += `<strong>${key}:</strong><br>${displayValue}<br>`;
}
return html;
};
const body = formatAllFields(formData);
// Get Zammad configuration from environment
const zammadUrl = process.env.ZAMMAD_URL || "http://zammad-nginx:8080";
const zammadToken = process.env.ZAMMAD_API_TOKEN;
if (!zammadToken) {
logger.error("ZAMMAD_API_TOKEN environment variable is not configured");
throw new Error("ZAMMAD_API_TOKEN is required");
}
const zammad = Zammad({ token: zammadToken }, zammadUrl);
try {
// Look up the configured article type
let articleTypeId: number | undefined;
try {
const articleTypes = await zammad.get("ticket_article_types");
const configuredType = articleTypes.find(
(t: any) => t.name === mapping.ticket.defaultArticleType,
);
articleTypeId = configuredType?.id;
if (articleTypeId) {
logger.info(
{ articleTypeId, typeName: mapping.ticket.defaultArticleType },
"Found configured article type",
);
} else {
logger.warn(
{ typeName: mapping.ticket.defaultArticleType },
"Configured article type not found, ticket will use default type",
);
}
} catch (error: any) {
logger.warn({ error: error.message }, "Failed to look up article type");
}
// Get or create user
// Try to find existing user by: phone -> email
// Note: We can't search by Signal account since Signal group IDs aren't phone numbers
let customer;
// Try phone if provided
if (phone) {
customer = await getUser(zammad, phone);
if (customer) {
logger.info(
{ customerId: customer.id, method: "phone" },
"Found existing user by phone",
);
}
}
// Fall back to email if no customer found yet
if (!customer && email) {
// Validate email format before using in search
const emailRegex = /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/;
if (emailRegex.test(email)) {
const emailResults = await zammad.user.search(`email:${email}`);
if (emailResults.length > 0) {
customer = emailResults[0];
logger.info(
{ customerId: customer.id, method: "email" },
"Found existing user by email",
);
}
} else {
logger.warn({ email }, "Invalid email format provided, skipping email search");
}
}
if (!customer) {
// Create new user
logger.info("Creating new user from form submission");
// Build user data with whatever contact info we have
const userData: any = {
firstname: firstName,
lastname: lastName,
roles: ["Customer"],
};
// Add contact info only if provided
if (email) {
userData.email = email;
}
// Use phone number if provided (don't use Signal group ID as phone)
if (phone) {
userData.phone = phone;
}
customer = await zammad.user.create(userData);
}
logger.info(
{
customerId: customer.id,
email: customer.email,
},
"Using customer for ticket",
);
// Look up the configured group
const groups = await zammad.get("groups");
const targetGroup = groups.find((g: any) => g.name === mapping.ticket.group);
if (!targetGroup) {
logger.error({ groupName: mapping.ticket.group }, "Configured group not found");
throw new Error(`Zammad group "${mapping.ticket.group}" not found`);
}
logger.info(
{ groupId: targetGroup.id, groupName: targetGroup.name },
"Using configured group",
);
// Build custom fields using Zammad field mapping
// This dynamically maps all configured fields without hardcoding
const customFields = getZammadFieldValues(formData, mapping);
// Check if this is a Signal ticket
let signalArticleType = null;
let signalChannelId = null;
let signalBotToken = null;
if (signalAccount) {
try {
logger.info({ signalAccount }, "Looking up Signal channel and article type");
// Look up Signal channels from Zammad (admin-only endpoint)
// Note: bot_token is NOT included in this response for security reasons
const channels = await zammad.get("cdr_signal_channels");
if (channels.length > 0) {
const zammadChannel = channels[0]; // Use first active Signal channel
signalChannelId = zammadChannel.id;
logger.info(
{
channelId: zammadChannel.id,
phoneNumber: zammadChannel.phone_number,
},
"Found active Signal channel from Zammad",
);
// Look up the bot_token from our own cdr database using the phone number
const signalBot = await db
.selectFrom("SignalBot")
.selectAll()
.where("phoneNumber", "=", zammadChannel.phone_number)
.executeTakeFirst();
if (signalBot) {
signalBotToken = signalBot.token;
logger.info(
{ botId: signalBot.id, phoneNumber: signalBot.phoneNumber },
"Found Signal bot token from cdr database",
);
} else {
logger.warn(
{ phoneNumber: zammadChannel.phone_number },
"Signal bot not found in cdr database",
);
}
} else {
logger.warn("No active Signal channels found");
}
// Look up cdr_signal article type
const articleTypes = await zammad.get("ticket_article_types");
signalArticleType = articleTypes.find((t: any) => t.name === "cdr_signal");
if (!signalArticleType) {
logger.warn("Signal article type (cdr_signal) not found, using default type");
} else {
logger.info(
{ articleTypeId: signalArticleType.id },
"Found Signal article type",
);
}
} catch (error: any) {
logger.warn(
{ error: error.message },
"Failed to look up Signal article type, creating regular ticket",
);
}
}
// Create the ticket
const articleData: any = {
subject: descriptionOfIssue || "Support Request",
body,
content_type: "text/html",
internal: false,
};
// Use Signal article type if available, otherwise use configured default
if (signalArticleType) {
articleData.type_id = signalArticleType.id;
logger.info({ typeId: signalArticleType.id }, "Using Signal article type");
// IMPORTANT: Set sender to "Customer" for Signal tickets created from Formstack
// This prevents the article from being echoed back to the user via Signal
// (enqueue_communicate_cdr_signal_job only sends if sender != 'Customer')
articleData.sender = "Customer";
} else if (articleTypeId) {
articleData.type_id = articleTypeId;
}
const ticketData: any = {
title,
group_id: targetGroup.id,
customer_id: customer.id,
article: articleData,
...customFields,
};
// Add Signal preferences if we have Signal channel and article type
// Note: signalAccount from Formstack is the phone number the user typed in
// Groups are added later via update_group webhook from bridge-worker
if (signalChannelId && signalBotToken && signalArticleType && signalAccount) {
ticketData.preferences = {
channel_id: signalChannelId,
cdr_signal: {
bot_token: signalBotToken,
chat_id: signalAccount, // Use Signal phone number as chat_id
},
};
logger.info(
{
channelId: signalChannelId,
chatId: signalAccount,
},
"Adding Signal preferences to ticket",
);
}
logger.info(
{
title,
groupId: targetGroup.id,
customerId: customer.id,
hasArticleType: !!articleTypeId || !!signalArticleType,
isSignalTicket: !!signalArticleType && !!signalAccount,
customFieldCount: Object.keys(customFields).length,
},
"Creating ticket",
);
const ticket = await zammad.ticket.create(ticketData);
// Set create_article_type_id for Signal tickets to enable proper replies
if (signalArticleType && signalChannelId) {
try {
await zammad.ticket.update(ticket.id, {
create_article_type_id: signalArticleType.id,
});
logger.info(
{
ticketId: ticket.id,
articleTypeId: signalArticleType.id,
},
"Set create_article_type_id for Signal ticket",
);
} catch (error: any) {
logger.warn(
{
error: error.message,
ticketId: ticket.id,
},
"Failed to set create_article_type_id, ticket may not support Signal replies",
);
}
}
logger.info(
{
ticketId: ticket.id,
ticketNumber: ticket.id,
title,
isSignalTicket: !!signalChannelId,
},
"Successfully created ticket from Formstack submission",
);
} catch (error: any) {
logger.error(
{
error: error.message,
stack: error.stack,
formId,
uniqueId,
},
"Failed to create ticket from Formstack submission",
);
throw error;
}
};
export default createTicketFromFormTask;

View file

@ -1,213 +0,0 @@
/* eslint-disable camelcase */
/*
import { convert } from "html-to-text";
import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../../lib/db.js";
// import { loadConfig } from "@digiresilience/bridge-config";
import { tagMap } from "../../lib/tag-map.js";
const config: any = {};
type FormattedZammadTicket = {
data: Record<string, unknown>;
predictions: Record<string, unknown>[];
};
const getZammadTickets = async (
page: number,
minUpdatedTimestamp: Date,
): Promise<[boolean, FormattedZammadTicket[]]> => {
const {
leafcutter: { zammadApiUrl, zammadApiKey, contributorName, contributorId },
} = config;
const headers = { Authorization: `Token ${zammadApiKey}` };
let shouldContinue = false;
const docs = [];
const ticketsQuery = new URLSearchParams({
expand: "true",
sort_by: "updated_at",
order_by: "asc",
query: "state.name: closed",
per_page: "25",
page: `${page}`,
});
const rawTickets = await fetch(
`${zammadApiUrl}/tickets/search?${ticketsQuery}`,
{ headers },
);
const tickets: any = await rawTickets.json();
console.log({ tickets });
if (!tickets || tickets.length === 0) {
return [shouldContinue, docs];
}
for await (const ticket of tickets) {
const { id: source_id, created_at, updated_at, close_at } = ticket;
const source_created_at = new Date(created_at);
const source_updated_at = new Date(updated_at);
const source_closed_at = new Date(close_at);
shouldContinue = true;
if (source_closed_at <= minUpdatedTimestamp) {
console.log(`Skipping ticket`, {
source_id,
source_updated_at,
source_closed_at,
minUpdatedTimestamp,
});
continue;
}
console.log(`Processing ticket`, {
source_id,
source_updated_at,
source_closed_at,
minUpdatedTimestamp,
});
const rawArticles = await fetch(
`${zammadApiUrl}/ticket_articles/by_ticket/${source_id}`,
{ headers },
);
const articles: any = await rawArticles.json();
let articleText = "";
for (const article of articles) {
const { content_type: contentType, body } = article;
if (contentType === "text/html") {
const cleanArticleText = convert(body);
articleText += cleanArticleText + "\n\n";
} else {
articleText += body + "\n\n";
}
}
const tagsQuery = new URLSearchParams({
object: "Ticket",
o_id: source_id,
});
const rawTags = await fetch(`${zammadApiUrl}/tags?${tagsQuery}`, {
headers,
});
const { tags }: any = await rawTags.json();
const transformedTags = [];
for (const tag of tags) {
const outputs = tagMap[tag];
if (outputs) {
transformedTags.push(...outputs);
}
}
const doc: FormattedZammadTicket = {
data: {
ticket: articleText,
contributor_id: contributorId,
source_id,
source_closed_at,
source_created_at,
source_updated_at,
},
predictions: [],
};
const result = transformedTags.map((tag) => {
return {
type: "choices",
value: {
choices: [tag.value],
},
to_name: "ticket",
from_name: tag.field,
};
});
if (result.length > 0) {
doc.predictions.push({
model_version: `${contributorName}TranslatorV1`,
result,
});
}
docs.push(doc);
}
return [shouldContinue, docs];
};
const fetchFromZammad = async (
minUpdatedTimestamp: Date,
): Promise<FormattedZammadTicket[]> => {
const pages = [...Array.from({ length: 10000 }).keys()];
const allTickets: FormattedZammadTicket[] = [];
for await (const page of pages) {
const [shouldContinue, tickets] = await getZammadTickets(
page + 1,
minUpdatedTimestamp,
);
if (!shouldContinue) {
break;
}
if (tickets.length > 0) {
allTickets.push(...tickets);
}
}
return allTickets;
};
const sendToLabelStudio = async (tickets: FormattedZammadTicket[]) => {
const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = config;
const headers = {
Authorization: `Token ${labelStudioApiKey}`,
"Content-Type": "application/json",
Accept: "application/json",
};
for await (const ticket of tickets) {
const res = await fetch(`${labelStudioApiUrl}/projects/1/import`, {
method: "POST",
headers,
body: JSON.stringify([ticket]),
});
const importResult = await res.json();
console.log(JSON.stringify(importResult, undefined, 2));
}
};
*/
const importLabelStudioTask = async (): Promise<void> => {
/*
withDb(async (db: AppDatabase) => {
const {
leafcutter: { contributorName },
} = config;
const settingName = `${contributorName}ImportLabelStudioTask`;
const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp
? new Date(res.value.minUpdatedTimestamp as string)
: new Date("2023-03-01");
const tickets = await fetchFromZammad(startTimestamp);
if (tickets.length > 0) {
await sendToLabelStudio(tickets);
const lastTicket = tickets.pop();
const newLastTimestamp = lastTicket.data.source_closed_at;
console.log({ newLastTimestamp });
await db.settings.upsert(settingName, {
minUpdatedTimestamp: newLastTimestamp,
});
}
});
*/
};
export default importLabelStudioTask;

View file

@ -1,177 +0,0 @@
/* eslint-disable camelcase */
/*
import { URLSearchParams } from "url";
import { withDb, AppDatabase } from "../../lib/db.js";
// import { loadConfig } from "@digiresilience/bridge-config";
const config: any = {};
type LabelStudioTicket = {
id: string;
is_labeled: boolean;
annotations: Record<string, unknown>[];
data: Record<string, unknown>;
updated_at: string;
};
type LeafcutterTicket = {
id: string;
incident: string[];
technology: string[];
targeted_group: string[];
country: string[];
region: string[];
continent: string[];
date: Date;
origin: string;
origin_id: string;
source_created_at: string;
source_updated_at: string;
};
const getLabelStudioTickets = async (
page: number,
): Promise<LabelStudioTicket[]> => {
const {
leafcutter: { labelStudioApiUrl, labelStudioApiKey },
} = config;
const headers = {
Authorization: `Token ${labelStudioApiKey}`,
Accept: "application/json",
};
const ticketsQuery = new URLSearchParams({
page_size: "50",
page: `${page}`,
});
console.log({ url: `${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}` });
const res = await fetch(
`${labelStudioApiUrl}/projects/1/tasks?${ticketsQuery}`,
{ headers },
);
console.log({ res });
const tasksResult: any = await res.json();
console.log({ tasksResult });
return tasksResult;
};
const fetchFromLabelStudio = async (
minUpdatedTimestamp: Date,
): Promise<LabelStudioTicket[]> => {
const pages = [...Array.from({ length: 10000 }).keys()];
const allDocs: LabelStudioTicket[] = [];
for await (const page of pages) {
const docs = await getLabelStudioTickets(page + 1);
console.log({ page, docs });
if (docs && docs.length > 0) {
for (const doc of docs) {
const updatedAt = new Date(doc.updated_at);
console.log({ updatedAt, minUpdatedTimestamp });
if (updatedAt > minUpdatedTimestamp) {
console.log(`Adding doc`, { doc });
allDocs.push(doc);
}
}
} else {
break;
}
}
console.log({ allDocs });
return allDocs;
};
const sendToLeafcutter = async (tickets: LabelStudioTicket[]) => {
const {
leafcutter: {
contributorId,
opensearchApiUrl,
opensearchUsername,
opensearchPassword,
},
} = config;
console.log({ tickets });
const filteredTickets = tickets.filter((ticket) => ticket.is_labeled);
console.log({ filteredTickets });
const finalTickets: LeafcutterTicket[] = filteredTickets.map((ticket) => {
const {
id,
annotations,
data: { source_id, source_created_at, source_updated_at },
} = ticket;
const getTags = (tags: Record<string, any>[], name: string) =>
tags
.filter((tag) => tag.from_name === name)
.map((tag) => tag.value.choices)
.flat();
const allTags = annotations.map(({ result }) => result).flat();
const incident = getTags(allTags, "incidentType tag");
const technology = getTags(allTags, "platform tag");
const country = getTags(allTags, "country tag");
const targetedGroup = getTags(allTags, "targetedGroup tag");
return {
id,
incident,
technology,
targeted_group: targetedGroup,
country,
region: [],
continent: [],
date: new Date(source_created_at as string),
origin: contributorId,
origin_id: source_id as string,
source_created_at: source_created_at as string,
source_updated_at: source_updated_at as string,
};
});
console.log("Sending to Leafcutter");
console.log({ finalTickets });
const result = await fetch(opensearchApiUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Basic ${Buffer.from(`${opensearchUsername}:${opensearchPassword}`).toString("base64")}`,
},
body: JSON.stringify({ tickets: finalTickets }),
});
};
*/
const importLeafcutterTask = async (): Promise<void> => {
/*
withDb(async (db: AppDatabase) => {
const {
leafcutter: { contributorName },
} = config;
const settingName = `${contributorName}ImportLeafcutterTask`;
const res: any = await db.settings.findByName(settingName);
const startTimestamp = res?.value?.minUpdatedTimestamp
? new Date(res.value.minUpdatedTimestamp as string)
: new Date("2023-03-01");
const newLastTimestamp = new Date();
console.log({
contributorName,
settingName,
res,
startTimestamp,
newLastTimestamp,
});
const tickets = await fetchFromLabelStudio(startTimestamp);
console.log({ tickets });
await sendToLeafcutter(tickets);
await db.settings.upsert(settingName, {
minUpdatedTimestamp: newLastTimestamp,
});
});
*/
};
export default importLeafcutterTask;

View file

@ -1,4 +1,9 @@
import { db, getWorkerUtils } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
import * as signalApi from "@link-stack/signal-api";
const { Configuration, GroupsApi } = signalApi;
const logger = createLogger('bridge-worker-receive-signal-message');
interface ReceiveSignalMessageTaskOptions {
token: string;
@ -10,6 +15,7 @@ interface ReceiveSignalMessageTaskOptions {
attachment?: string;
filename?: string;
mimeType?: string;
isGroup?: boolean;
}
const receiveSignalMessageTask = async ({
@ -22,8 +28,17 @@ const receiveSignalMessageTask = async ({
attachment,
filename,
mimeType,
isGroup,
}: ReceiveSignalMessageTaskOptions): Promise<void> => {
console.log({ token, to, from });
logger.debug({
messageId,
from,
to,
isGroup,
hasMessage: !!message,
hasAttachment: !!attachment,
token,
}, 'Processing incoming message');
const worker = await getWorkerUtils();
const row = await db
.selectFrom("SignalBot")
@ -32,8 +47,170 @@ const receiveSignalMessageTask = async ({
.executeTakeFirstOrThrow();
const backendId = row.id;
let finalTo = to;
let createdInternalId: string | undefined;
// Check if auto-group creation is enabled and this is NOT already a group message
const enableAutoGroups = process.env.BRIDGE_SIGNAL_AUTO_GROUPS === "true";
logger.debug({
enableAutoGroups,
isGroup,
shouldCreateGroup: enableAutoGroups && !isGroup && from && to,
}, 'Auto-groups config');
// If this is already a group message and auto-groups is enabled,
// use group provided in 'to'
if (enableAutoGroups && isGroup && to) {
// Signal sends the internal ID (base64) in group messages
// We should NOT add "group." prefix - that's for sending messages, not receiving
logger.debug('Message is from existing group with internal ID');
finalTo = to;
} else if (enableAutoGroups && !isGroup && from && to) {
try {
const config = new Configuration({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const groupsClient = new GroupsApi(config);
// Always create a new group for direct messages to the helpdesk
// This ensures each conversation gets its own group/ticket
logger.info({ from }, 'Creating new group for user');
// Include timestamp to make each group unique
const timestamp = new Date()
.toISOString()
.replace(/[:.]/g, "-")
.substring(0, 19);
const groupName = `Support: ${from} (${timestamp})`;
// Create new group for this conversation
const createGroupResponse = await groupsClient.v1GroupsNumberPost({
number: row.phoneNumber,
data: {
name: groupName,
members: [from],
description: "Private support conversation",
},
});
logger.debug({ createGroupResponse }, 'Group creation response from Signal API');
if (createGroupResponse.id) {
// The createGroupResponse.id already contains the full group identifier (group.BASE64)
finalTo = createGroupResponse.id;
// Fetch the group details to get the actual internalId
// The base64 part of the ID is NOT the same as the internalId!
try {
logger.debug('Fetching group details to get internalId');
const groups = await groupsClient.v1GroupsNumberGet({
number: row.phoneNumber,
});
logger.debug({ groupsSample: groups.slice(0, 3) }, 'Groups for bot');
const createdGroup = groups.find((g) => g.id === finalTo);
if (createdGroup) {
logger.debug({ createdGroup }, 'Found created group details');
}
if (createdGroup && createdGroup.internalId) {
createdInternalId = createdGroup.internalId;
logger.debug({ createdInternalId }, 'Got actual internalId');
} else {
// Fallback: extract base64 part from ID
if (finalTo.startsWith("group.")) {
createdInternalId = finalTo.substring(6);
}
}
} catch (fetchError) {
logger.debug('Could not fetch group details, using ID base64 part');
// Fallback: extract base64 part from ID
if (finalTo.startsWith("group.")) {
createdInternalId = finalTo.substring(6);
}
}
logger.debug({
fullGroupId: finalTo,
internalId: createdInternalId,
}, 'Group created successfully');
logger.debug({
groupId: finalTo,
internalId: createdInternalId,
groupName,
forPhoneNumber: from,
botNumber: row.phoneNumber,
response: createGroupResponse,
}, 'Created new Signal group');
}
// Now handle notifications and message forwarding for both new and existing groups
if (finalTo && finalTo.startsWith("group.")) {
// Forward the user's initial message to the group using quote feature
try {
logger.debug('Forwarding initial message to group using quote feature');
const attributionMessage = `Message from ${from}:\n"${message}"\n\n---\nSupport team: Your request has been received. An agent will respond shortly.`;
await worker.addJob("signal/send-signal-message", {
token: row.token,
to: finalTo,
message: attributionMessage,
conversationId: null,
quoteMessage: message,
quoteAuthor: from,
quoteTimestamp: Date.parse(sentAt),
});
logger.debug({ finalTo }, 'Successfully forwarded initial message to group');
} catch (forwardError) {
logger.error({ error: forwardError }, 'Error forwarding message to group');
}
// Send a response to the original DM informing about the group
try {
logger.debug('Sending group notification to original DM');
const dmNotification = `Hello! A private support group has been created for your conversation.\n\nGroup name: ${groupName}\n\nPlease look for the new group in your Signal app to continue the conversation. Our support team will respond there shortly.\n\nThank you for contacting support!`;
await worker.addJob("signal/send-signal-message", {
token: row.token,
to: from,
message: dmNotification,
conversationId: null,
});
logger.debug('Successfully sent group notification to user DM');
} catch (dmError) {
logger.error({ error: dmError }, 'Error sending DM notification');
}
}
} catch (error: any) {
// Check if error is because group already exists
const errorMessage =
error?.response?.data?.error || error?.message || error;
const isAlreadyExists =
errorMessage?.toString().toLowerCase().includes("already") ||
errorMessage?.toString().toLowerCase().includes("exists");
if (isAlreadyExists) {
logger.debug({ from }, 'Group might already exist, continuing with original recipient');
} else {
logger.error({
error: errorMessage,
from,
to,
botNumber: row.phoneNumber,
}, 'Error creating Signal group');
}
}
}
const payload = {
to,
to: finalTo,
from,
message_id: messageId,
sent_at: sentAt,
@ -41,6 +218,7 @@ const receiveSignalMessageTask = async ({
attachment,
filename,
mime_type: mimeType,
is_group: finalTo.startsWith("group"),
};
await worker.addJob("common/notify-webhooks", { backendId, payload });

View file

@ -1,19 +1,51 @@
import { db } from "@link-stack/bridge-common";
import {
db,
getWorkerUtils,
getMaxAttachmentSize,
getMaxTotalAttachmentSize,
MAX_ATTACHMENTS,
buildSignalGroupName,
} from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
import * as signalApi from "@link-stack/signal-api";
const { Configuration, MessagesApi } = signalApi;
const { Configuration, MessagesApi, GroupsApi } = signalApi;
const logger = createLogger("bridge-worker-send-signal-message");
interface SendSignalMessageTaskOptions {
token: string;
to: string;
message: any;
conversationId?: string; // Zammad ticket/conversation ID for callback
quoteMessage?: string; // Optional: message text to quote
quoteAuthor?: string; // Optional: author of quoted message (phone number)
quoteTimestamp?: number; // Optional: timestamp of quoted message in milliseconds
attachments?: Array<{
data: string; // base64
filename: string;
mime_type: string;
}>;
}
const sendSignalMessageTask = async ({
token,
to,
message,
conversationId,
quoteMessage,
quoteAuthor,
quoteTimestamp,
attachments,
}: SendSignalMessageTaskOptions): Promise<void> => {
console.log({ token, to });
logger.debug(
{
token,
to,
conversationId,
messageLength: message?.length,
},
"Processing outgoing message",
);
const bot = await db
.selectFrom("SignalBot")
.selectAll()
@ -25,18 +57,255 @@ const sendSignalMessageTask = async ({
basePath: process.env.BRIDGE_SIGNAL_URL,
});
const messagesClient = new MessagesApi(config);
const groupsClient = new GroupsApi(config);
const worker = await getWorkerUtils();
let finalTo = to;
let groupCreated = false;
try {
const response = await messagesClient.v2SendPost({
data: {
number,
recipients: [to],
message,
// Check if 'to' is a group ID (UUID format, group.base64 format, or base64) vs phone number
const isUUID = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(
to,
);
const isGroupPrefix = to.startsWith("group.");
const isBase64 = /^[A-Za-z0-9+/]+=*$/.test(to) && to.length > 20; // Base64 internal_id
const isGroupId = isUUID || isGroupPrefix || isBase64;
const enableAutoGroups = process.env.BRIDGE_SIGNAL_AUTO_GROUPS === "true";
logger.debug(
{
to,
isGroupId,
enableAutoGroups,
shouldCreateGroup: enableAutoGroups && !isGroupId && to && conversationId,
},
"Recipient analysis",
);
// If sending to a phone number and auto-groups is enabled, create a group first
if (enableAutoGroups && !isGroupId && to && conversationId) {
try {
const groupName = buildSignalGroupName(conversationId);
const createGroupResponse = await groupsClient.v1GroupsNumberPost({
number: bot.phoneNumber,
data: {
name: groupName,
members: [to],
description: "Private support conversation",
},
});
if (createGroupResponse.id) {
// The createGroupResponse.id already contains the full group identifier (group.BASE64)
finalTo = createGroupResponse.id;
groupCreated = true;
// Fetch the group details to get the actual internalId
let internalId: string | undefined;
try {
const groups = await groupsClient.v1GroupsNumberGet({
number: bot.phoneNumber,
});
const createdGroup = groups.find((g) => g.id === finalTo);
if (createdGroup && createdGroup.internalId) {
internalId = createdGroup.internalId;
logger.debug({ internalId }, "Got actual internalId");
} else {
// Fallback: extract base64 part from ID
if (finalTo.startsWith("group.")) {
internalId = finalTo.substring(6);
}
}
} catch (fetchError) {
logger.debug("Could not fetch group details, using ID base64 part");
// Fallback: extract base64 part from ID
if (finalTo.startsWith("group.")) {
internalId = finalTo.substring(6);
}
}
logger.debug(
{
groupId: finalTo,
internalId,
groupName,
conversationId,
originalRecipient: to,
botNumber: bot.phoneNumber,
},
"Created new Signal group",
);
// Notify Zammad about the new group ID via webhook
// Set group_joined: false initially - will be updated when user accepts invitation
await worker.addJob("common/notify-webhooks", {
backendId: bot.id,
payload: {
event: "group_created",
conversation_id: conversationId,
original_recipient: to,
group_id: finalTo,
internal_group_id: internalId,
group_joined: false,
timestamp: new Date().toISOString(),
},
});
}
} catch (groupError) {
logger.error(
{
error: groupError instanceof Error ? groupError.message : groupError,
to,
conversationId,
},
"Error creating Signal group",
);
// Continue with original recipient if group creation fails
}
}
logger.debug(
{
fromNumber: number,
toRecipient: finalTo,
originalTo: to,
recipientChanged: to !== finalTo,
groupCreated,
isGroupRecipient: finalTo.startsWith("group."),
},
"Sending message via API",
);
// Build the message data with optional quote parameters
const messageData: signalApi.ApiSendMessageV2 = {
number,
recipients: [finalTo],
message,
};
logger.debug(
{
number,
recipients: [finalTo],
messageLength: message?.length,
hasQuoteParams: !!(quoteMessage && quoteAuthor && quoteTimestamp),
},
"Message data being sent",
);
// Add quote parameters if all are provided
if (quoteMessage && quoteAuthor && quoteTimestamp) {
messageData.quoteTimestamp = quoteTimestamp;
messageData.quoteAuthor = quoteAuthor;
messageData.quoteMessage = quoteMessage;
logger.debug(
{
quoteAuthor,
quoteMessageLength: quoteMessage?.length,
quoteTimestamp,
},
"Including quote in message",
);
}
// Add attachments if provided with size validation
if (attachments && attachments.length > 0) {
const MAX_ATTACHMENT_SIZE = getMaxAttachmentSize();
const MAX_TOTAL_SIZE = getMaxTotalAttachmentSize();
if (attachments.length > MAX_ATTACHMENTS) {
throw new Error(
`Too many attachments: ${attachments.length} (max ${MAX_ATTACHMENTS})`,
);
}
let totalSize = 0;
const validatedAttachments = [];
for (const attachment of attachments) {
// Calculate size from base64 string (rough estimate: length * 3/4)
const estimatedSize = (attachment.data.length * 3) / 4;
if (estimatedSize > MAX_ATTACHMENT_SIZE) {
logger.warn(
{
filename: attachment.filename,
size: estimatedSize,
maxSize: MAX_ATTACHMENT_SIZE,
},
"Attachment exceeds size limit, skipping",
);
continue;
}
totalSize += estimatedSize;
if (totalSize > MAX_TOTAL_SIZE) {
logger.warn(
{
totalSize,
maxTotalSize: MAX_TOTAL_SIZE,
},
"Total attachment size exceeds limit, skipping remaining",
);
break;
}
validatedAttachments.push(attachment.data);
}
if (validatedAttachments.length > 0) {
messageData.base64Attachments = validatedAttachments;
logger.debug(
{
attachmentCount: validatedAttachments.length,
attachmentNames: attachments
.slice(0, validatedAttachments.length)
.map((att) => att.filename),
totalSizeBytes: totalSize,
},
"Including attachments in message",
);
}
}
const response = await messagesClient.v2SendPost({
data: messageData,
});
console.log({ response });
} catch (error) {
console.error({ error });
logger.debug(
{
to: finalTo,
groupCreated,
response: response?.timestamp || "no timestamp",
},
"Message sent successfully",
);
} catch (error: any) {
// Try to get the actual error message from the response
if (error.response) {
try {
const errorBody = await error.response.text();
logger.error(
{
status: error.response.status,
statusText: error.response.statusText,
body: errorBody,
sentTo: finalTo,
messageDetails: {
fromNumber: number,
toRecipients: [finalTo],
hasQuote: !!quoteMessage,
},
},
"Signal API error",
);
} catch (e) {
logger.error("Could not parse error response");
}
}
logger.error({ error }, "Full error details");
throw error;
}
};

View file

@ -3,6 +3,9 @@ import { withDb, AppDatabase } from "../../lib/db.js";
import { twilioClientFor } from "../../lib/common.js";
import { CallInstance } from "twilio/lib/rest/api/v2010/account/call";
import workerUtils from "../../lib/utils.js";
import { createLogger } from "@link-stack/logger";
const logger = createLogger('bridge-worker-twilio-recording');
interface WebhookPayload {
startTime: string;
@ -20,7 +23,7 @@ const getTwilioRecording = async (url: string) => {
const { payload } = await Wreck.get(url);
return { recording: payload as Buffer };
} catch (error: any) {
console.error(error.output);
logger.error(error.output);
return { error: error.output };
}
};

View file

@ -23,8 +23,6 @@ const receiveWhatsappMessageTask = async ({
filename,
mimeType,
}: ReceiveWhatsappMessageTaskOptions): Promise<void> => {
console.log({ token, to, from });
const worker = await getWorkerUtils();
const row = await db
.selectFrom("WhatsappBot")

View file

@ -1,15 +1,24 @@
import { db } from "@link-stack/bridge-common";
import { createLogger } from "@link-stack/logger";
const logger = createLogger("bridge-worker-send-whatsapp-message");
interface SendWhatsappMessageTaskOptions {
token: string;
to: string;
message: any;
attachments?: Array<{
data: string;
filename: string;
mime_type: string;
}>;
}
const sendWhatsappMessageTask = async ({
message,
to,
token,
attachments,
}: SendWhatsappMessageTaskOptions): Promise<void> => {
const bot = await db
.selectFrom("WhatsappBot")
@ -18,16 +27,40 @@ const sendWhatsappMessageTask = async ({
.executeTakeFirstOrThrow();
const url = `${process.env.BRIDGE_WHATSAPP_URL}/api/bots/${bot.id}/send`;
const params = { message, phoneNumber: to };
const params: any = { message, phoneNumber: to };
if (attachments && attachments.length > 0) {
params.attachments = attachments;
logger.debug(
{
attachmentCount: attachments.length,
attachmentNames: attachments.map((att) => att.filename),
},
"Sending WhatsApp message with attachments",
);
}
try {
const result = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(params),
});
console.log({ result });
if (!result.ok) {
const errorText = await result.text();
logger.error(
{
status: result.status,
errorText,
url,
},
"WhatsApp send failed",
);
throw new Error(`Failed to send message: ${result.status}`);
}
} catch (error) {
console.error({ error });
logger.error({ error });
throw new Error("Failed to send message");
}
};