Add bridge-whatsapp

This commit is contained in:
Darren Clarke 2024-05-07 14:16:01 +02:00
parent 0d09ad1b7e
commit 0499287555
21 changed files with 3485 additions and 47 deletions

View file

@ -6,11 +6,7 @@
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint",
"migrate:up:all": "tsx database/migrate.ts up:all",
"migrate:up:one": "tsx database/migrate.ts up:one",
"migrate:down:all": "tsx database/migrate.ts down:all",
"migrate:down:one": "tsx database/migrate.ts down:one"
"lint": "next lint"
},
"dependencies": {
"@auth/kysely-adapter": "^1.0.0",

View file

@ -0,0 +1,4 @@
{
"preset": "jest-config",
"setupFiles": ["<rootDir>/src/setup.test.ts"]
}

View file

@ -0,0 +1,33 @@
{
"name": "bridge-whatsapp",
"version": "0.3.0",
"type": "module",
"main": "build/main/main.js",
"author": "Darren Clarke <darren@redaranj.com>",
"license": "AGPL-3.0-or-later",
"dependencies": {
"@adiwajshing/keyed-db": "0.2.4",
"@hapi/hapi": "^21.3.9",
"@hapi/boom": "^10.0.1",
"@hapipal/schmervice": "^3.0.0",
"@hapipal/toys": "^4.0.0",
"@types/hapi-auth-bearer-token": "^6.1.8",
"@whiskeysockets/baileys": "^6.7.2",
"bridge-common": "*",
"hapi-auth-bearer-token": "^8.0.0",
"hapi-pino": "^12.1.0",
"kysely": "^0.27.3"
},
"devDependencies": {
"@types/node": "*",
"eslint-config": "*",
"jest-config": "*",
"ts-config": "*",
"tsx": "^4.9.3",
"typescript": "^5.4.5"
},
"scripts": {
"build": "tsc -p tsconfig.json",
"dev": "tsx src/index.ts"
}
}

View file

@ -0,0 +1,14 @@
import Toys from "@hapipal/toys";
export const withDefaults = Toys.withRouteDefaults({
options: {
cors: true,
auth: "bearer",
},
});
export const noAuth = Toys.withRouteDefaults({
options: {
cors: true,
},
});

View file

@ -0,0 +1,60 @@
import * as Hapi from "@hapi/hapi";
import * as AuthBearer from "hapi-auth-bearer-token";
import hapiPino from "hapi-pino";
import Schmervice from "@hapipal/schmervice";
import WhatsappService from "./service";
import {
GetAllWhatsappBotsRoute,
GetBotsRoute,
SendBotRoute,
ReceiveBotRoute,
RegisterBotRoute,
UnverifyBotRoute,
RefreshBotRoute,
CreateBotRoute,
} from "./routes";
const server = Hapi.server({ host: "localhost", port: 5000 });
const startServer = async () => {
await server.register({
plugin: hapiPino,
options: {
redact: ["req.headers.authorization"],
},
});
await server.register(AuthBearer);
server.auth.strategy("bearer", "bearer-access-token", {
validate: async (_request, token, _h) => {
const isValid = token === "1234";
const credentials = { token };
return { isValid, credentials };
},
});
server.route(GetAllWhatsappBotsRoute);
server.route(GetBotsRoute);
server.route(SendBotRoute);
server.route(ReceiveBotRoute);
server.route(RegisterBotRoute);
server.route(UnverifyBotRoute);
server.route(RefreshBotRoute);
server.route(CreateBotRoute);
await server.register(Schmervice);
server.registerService(WhatsappService);
await server.start();
return server;
};
const main = async () => {
await startServer();
};
main().catch((err) => {
console.error(err);
process.exit(1);
});

View file

@ -0,0 +1,217 @@
import * as Hapi from "@hapi/hapi";
import * as Helpers from "./helpers";
import Boom from "@hapi/boom";
export const GetAllWhatsappBotsRoute = Helpers.withDefaults({
method: "get",
path: "/api/whatsapp/bots",
options: {
description: "Get all bots",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const whatsappService = request.services("whatsapp");
const bots = await whatsappService.findAll();
if (bots) {
// @ts-ignore
request.logger.info({ bots }, "Retrieved bot(s) at %s", new Date());
return { bots };
}
return _h.response().code(204);
},
},
});
export const GetBotsRoute = Helpers.noAuth({
method: "get",
path: "/api/whatsapp/bots/{token}",
options: {
description: "Get one bot",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { token } = request.params;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findByToken(token);
if (bot) {
// @ts-ignore
request.logger.info({ bot }, "Retrieved bot(s) at %s", new Date());
return bot;
}
throw Boom.notFound("Bot not found");
},
},
});
interface MessageRequest {
phoneNumber: string;
message: string;
}
export const SendBotRoute = Helpers.noAuth({
method: "post",
path: "/api/whatsapp/bots/{token}/send",
options: {
description: "Send a message",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { token } = request.params;
const { phoneNumber, message } = request.payload as MessageRequest;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findByToken(token);
if (bot) {
// @ts-ignore
request.logger.info({ bot }, "Sent a message at %s", new Date());
await whatsappService.send(bot, phoneNumber, message as string);
return _h
.response({
result: {
recipient: phoneNumber,
timestamp: new Date().toISOString(),
source: bot.phoneNumber,
},
})
.code(200); // temp
}
throw Boom.notFound("Bot not found");
},
},
});
export const ReceiveBotRoute = Helpers.withDefaults({
method: "get",
path: "/api/whatsapp/bots/{token}/receive",
options: {
description: "Receive messages",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { token } = request.params;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findByToken(token);
if (bot) {
// @ts-ignore
request.logger.info({ bot }, "Received messages at %s", new Date());
// temp
const date = new Date();
const twoDaysAgo = new Date(date.getTime());
twoDaysAgo.setDate(date.getDate() - 2);
return whatsappService.receive(bot, twoDaysAgo);
}
throw Boom.notFound("Bot not found");
},
},
});
export const RegisterBotRoute = Helpers.withDefaults({
method: "get",
path: "/api/whatsapp/bots/{id}/register",
options: {
description: "Register a bot",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { id } = request.params;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findById(id);
if (bot) {
await whatsappService.register(bot, (error: string) => {
if (error) {
return _h.response(error).code(500);
}
// @ts-ignore
request.logger.info({ bot }, "Register bot at %s", new Date());
return _h.response().code(200);
});
}
throw Boom.notFound("Bot not found");
},
},
});
export const UnverifyBotRoute = Helpers.withDefaults({
method: "post",
path: "/api/whatsapp/bots/{id}/unverify",
options: {
description: "Unverify bot",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { id } = request.params;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findById(id);
if (bot) {
return whatsappService.unverify(bot);
}
throw Boom.notFound("Bot not found");
},
},
});
export const RefreshBotRoute = Helpers.withDefaults({
method: "get",
path: "/api/whatsapp/bots/{id}/refresh",
options: {
description: "Refresh messages",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { id } = request.params;
const whatsappService = request.services("whatsapp");
const bot = await whatsappService.findById(id);
if (bot) {
// @ts-ignore
request.logger.info({ bot }, "Refreshed messages at %s", new Date());
// await whatsappService.refresh(bot);
return;
}
throw Boom.notFound("Bot not found");
},
},
});
interface BotRequest {
phoneNumber: string;
description: string;
}
export const CreateBotRoute = Helpers.withDefaults({
method: "post",
path: "/api/whatsapp/bots",
options: {
description: "Register a bot",
async handler(request: Hapi.Request, _h: Hapi.ResponseToolkit) {
const { phoneNumber, description } = request.payload as BotRequest;
const whatsappService = request.services("whatsapp");
console.log("request.auth.credentials:", request.auth.credentials);
const bot = await whatsappService.create(
phoneNumber,
description,
request.auth.credentials.email as string,
);
if (bot) {
// @ts-ignore
request.logger.info({ bot }, "Register bot at %s", new Date());
return bot;
}
throw Boom.notFound("Bot not found");
},
},
});

View file

@ -0,0 +1,347 @@
import { Server } from "@hapi/hapi";
import { Service } from "@hapipal/schmervice";
import { db, WhatsappBot } from "bridge-common";
import makeWASocket, {
DisconnectReason,
proto,
downloadContentFromMessage,
MediaType,
fetchLatestBaileysVersion,
isJidBroadcast,
isJidStatusBroadcast,
useMultiFileAuthState,
} from "@whiskeysockets/baileys";
import fs from "fs";
export type AuthCompleteCallback = (error?: string) => void;
export default class WhatsappService extends Service {
connections: { [key: string]: any } = {};
loginConnections: { [key: string]: any } = {};
static browserDescription: [string, string, string] = [
"Bridge",
"Chrome",
"2.0",
];
constructor(server: Server, options: never) {
super(server, options);
}
getAuthDirectory(bot: WhatsappBot): string {
return `/baileys/${bot.id}`;
}
async initialize(): Promise<void> {
this.updateConnections();
}
async teardown(): Promise<void> {
this.resetConnections();
}
private async sleep(ms: number): Promise<void> {
console.log(`pausing ${ms}`);
return new Promise((resolve) => setTimeout(resolve, ms));
}
private async resetConnections() {
for (const connection of Object.values(this.connections)) {
try {
connection.end(null);
} catch (error) {
console.log(error);
}
}
this.connections = {};
}
private async createConnection(
bot: WhatsappBot,
server: Server,
options: any,
authCompleteCallback?: any,
) {
const directory = this.getAuthDirectory(bot);
const { state, saveCreds } = await useMultiFileAuthState(directory);
const msgRetryCounterMap: any = {};
const socket = makeWASocket({
...options,
auth: state,
msgRetryCounterMap,
shouldIgnoreJid: (jid) =>
isJidBroadcast(jid) || isJidStatusBroadcast(jid),
});
let pause = 5000;
socket.ev.process(async (events) => {
if (events["connection.update"]) {
const update = events["connection.update"];
const {
connection: connectionState,
lastDisconnect,
qr,
isNewLogin,
} = update;
if (qr) {
console.log("got qr code");
await db
.updateTable("WhatsappBot")
.set({
qrCode: qr,
verified: false,
})
.where("id", "=", bot.id)
.executeTakeFirst();
} else if (isNewLogin) {
console.log("got new login");
await db
.updateTable("WhatsappBot")
.set({
verified: true,
})
.where("id", "=", bot.id)
.executeTakeFirst();
} else if (connectionState === "open") {
console.log("opened connection");
} else if (connectionState === "close") {
console.log("connection closed due to ", lastDisconnect.error);
const disconnectStatusCode = (lastDisconnect?.error as any)?.output
?.statusCode;
if (disconnectStatusCode === DisconnectReason.restartRequired) {
console.log("reconnecting after got new login");
const updatedBot = await db
.selectFrom("WhatsappBot")
.selectAll()
.where("id", "=", bot.id)
.executeTakeFirstOrThrow();
await this.createConnection(updatedBot, server, options);
authCompleteCallback?.();
} else if (disconnectStatusCode !== DisconnectReason.loggedOut) {
console.log("reconnecting");
await this.sleep(pause);
pause *= 2;
this.createConnection(bot, server, options);
}
}
}
if (events["creds.update"]) {
console.log("creds update");
await saveCreds();
}
if (events["messages.upsert"]) {
console.log("messages upsert");
const upsert = events["messages.upsert"];
const { messages } = upsert;
if (messages) {
await this.queueUnreadMessages(bot, messages);
}
}
});
this.connections[bot.id] = { socket, msgRetryCounterMap };
}
private async updateConnections() {
this.resetConnections();
const bots = await db.selectFrom("WhatsappBot").selectAll().execute();
for await (const bot of bots) {
if (bot.verified) {
const { version, isLatest } = await fetchLatestBaileysVersion();
console.log(`using WA v${version.join(".")}, isLatest: ${isLatest}`);
await this.createConnection(bot, this.server, {
browser: WhatsappService.browserDescription,
printQRInTerminal: false,
version,
});
}
}
}
private async queueMessage(
bot: WhatsappBot,
webMessageInfo: proto.IWebMessageInfo,
) {
const {
key: { id, fromMe, remoteJid },
message,
messageTimestamp,
} = webMessageInfo;
if (!fromMe && message && remoteJid !== "status@broadcast") {
const { audioMessage, documentMessage, imageMessage, videoMessage } =
message;
const isMediaMessage =
audioMessage || documentMessage || imageMessage || videoMessage;
const messageContent = Object.values(message)[0];
let messageType: MediaType;
let attachment: string;
let filename: string;
let mimetype: string;
if (isMediaMessage) {
if (audioMessage) {
messageType = "audio";
filename = id + "." + audioMessage.mimetype.split("/").pop();
mimetype = audioMessage.mimetype;
} else if (documentMessage) {
messageType = "document";
filename = documentMessage.fileName;
mimetype = documentMessage.mimetype;
} else if (imageMessage) {
messageType = "image";
filename = id + "." + imageMessage.mimetype.split("/").pop();
mimetype = imageMessage.mimetype;
} else if (videoMessage) {
messageType = "video";
filename = id + "." + videoMessage.mimetype.split("/").pop();
mimetype = videoMessage.mimetype;
}
const stream = await downloadContentFromMessage(
messageContent,
messageType,
);
let buffer = Buffer.from([]);
for await (const chunk of stream) {
buffer = Buffer.concat([buffer, chunk]);
}
attachment = buffer.toString("base64");
}
if (messageContent || attachment) {
const receivedMessage = {
waMessageId: id,
waMessage: JSON.stringify(webMessageInfo),
waTimestamp: new Date((messageTimestamp as number) * 1000),
attachment,
filename,
mimetype,
whatsappBotId: bot.id,
botPhoneNumber: bot.phoneNumber,
};
// switch to send to bridge-frontend
// workerUtils.addJob("whatsapp-message", receivedMessage, {
// jobKey: id,
// });
}
}
}
private async queueUnreadMessages(
bot: WhatsappBot,
messages: proto.IWebMessageInfo[],
) {
for await (const message of messages) {
await this.queueMessage(bot, message);
}
}
async create(
phoneNumber: string,
description: string,
email: string,
): Promise<WhatsappBot> {
const user = await db
.selectFrom("User")
.selectAll()
.where("email", "=", email)
.executeTakeFirstOrThrow();
const row = await db
.insertInto("WhatsappBot")
.values({
phoneNumber,
description,
userId: user.id,
})
.returningAll()
.executeTakeFirst();
return row;
}
async unverify(bot: WhatsappBot): Promise<WhatsappBot> {
const directory = this.getAuthDirectory(bot);
fs.rmSync(directory, { recursive: true, force: true });
return db
.updateTable("WhatsappBot")
.set({ verified: false })
.where("id", "=", bot.id)
.returningAll()
.executeTakeFirst();
}
async remove(bot: WhatsappBot): Promise<number> {
const directory = this.getAuthDirectory(bot);
fs.rmSync(directory, { recursive: true, force: true });
const result = await db
.deleteFrom("WhatsappBot")
.where("id", "=", bot.id)
.execute();
return result.length;
}
async findAll(): Promise<WhatsappBot[]> {
return db.selectFrom("WhatsappBot").selectAll().execute();
}
async findById(id: string): Promise<WhatsappBot> {
return db
.selectFrom("WhatsappBot")
.selectAll()
.where("id", "=", id)
.executeTakeFirstOrThrow();
}
async findByToken(token: string): Promise<WhatsappBot> {
return db
.selectFrom("WhatsappBot")
.selectAll()
.where("token", "=", token)
.executeTakeFirstOrThrow();
}
async register(
bot: WhatsappBot,
callback: AuthCompleteCallback,
): Promise<void> {
const { version } = await fetchLatestBaileysVersion();
await this.createConnection(bot, this.server, { version }, callback);
}
async send(
bot: WhatsappBot,
phoneNumber: string,
message: string,
): Promise<void> {
const connection = this.connections[bot.id]?.socket;
const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`;
await connection.sendMessage(recipient, { text: message });
}
async receiveSince(bot: WhatsappBot, lastReceivedDate: Date): Promise<void> {
const connection = this.connections[bot.id]?.socket;
const messages = await connection.messagesReceivedAfter(
lastReceivedDate,
false,
);
for (const message of messages) {
this.queueMessage(bot, message);
}
}
async receive(
bot: WhatsappBot,
_lastReceivedDate: Date,
): Promise<proto.IWebMessageInfo[]> {
const connection = this.connections[bot.id]?.socket;
const messages = await connection.loadAllUnreadMessages();
return messages;
}
}

View file

@ -0,0 +1,8 @@
import type WhatsappService from "./service";
declare module "@hapipal/schmervice" {
interface SchmerviceDecorator {
(namespace: "whatsapp"): WhatsappService;
}
type ServiceFunctionalInterface = { name: string };
}

View file

@ -0,0 +1,13 @@
{
"extends": "ts-config",
"compilerOptions": {
"outDir": "build/main",
"rootDir": "src",
"skipLibCheck": true,
"types": ["jest", "node", "long"],
"lib": ["es2020", "DOM"],
"composite": true
},
"include": ["src/**/*.ts", "src/**/.*.ts"],
"exclude": ["node_modules/**"]
}

2755
package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -42,7 +42,7 @@ export const migrate = async (arg: string) => {
}),
});
let error = null;
let error: any = null;
let results: MigrationResult[] = [];
if (arg === "up:all") {

View file

@ -11,10 +11,8 @@ export async function up(db: Kysely<any>): Promise<void> {
.addColumn("user_id", "uuid")
.addColumn("name", "text")
.addColumn("description", "text")
.addColumn("auth_info", "text")
.addColumn("is_verified", "boolean", (col) =>
col.notNull().defaultTo(false),
)
.addColumn("qr_code", "text")
.addColumn("verified", "boolean", (col) => col.notNull().defaultTo(false))
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)

View file

@ -15,9 +15,7 @@ export async function up(db: Kysely<any>): Promise<void> {
.addColumn("page_id", "text")
.addColumn("app_id", "text")
.addColumn("user_id", "uuid")
.addColumn("is_verified", "boolean", (col) =>
col.notNull().defaultTo(false),
)
.addColumn("verified", "boolean", (col) => col.notNull().defaultTo(false))
.addColumn("created_at", "timestamptz", (col) =>
col.notNull().defaultTo(sql`now()`),
)

View file

@ -1,2 +1,11 @@
export { db, type Database } from "./lib/database";
export { db } from "./lib/database";
export type {
Database,
FacebookBot,
SignalBot,
WhatsappBot,
VoiceLine,
Webhook,
User,
} from "./lib/database";
export { getWorkerUtils } from "./lib/utils";

View file

@ -4,8 +4,6 @@ import type {
Generated,
ColumnType,
Selectable,
Insertable,
Updateable,
} from "kysely";
import pg from "pg";
import { KyselyAuth } from "@auth/kysely-adapter";
@ -72,6 +70,10 @@ export interface Database {
name: string;
description: string;
phoneNumber: string;
token: string;
qrCode: string;
verified: boolean;
userId: string;
createdBy: string;
createdAt: Date;
updatedAt: Date;
@ -88,7 +90,7 @@ export interface Database {
pageId: string | null;
appId: string | null;
userId: string | null;
isVerified: Generated<boolean>;
verified: Generated<boolean>;
createdAt: GeneratedAlways<Timestamp>;
updatedAt: GeneratedAlways<Timestamp>;
};

View file

@ -1,12 +1,16 @@
{
"name": "bridge-common",
"version": "1.0.0",
"main": "index.js",
"main": "build/main/index.js",
"type": "module",
"author": "Darren Clarke <darren@redaranj.com>",
"license": "AGPL-3.0-or-later",
"scripts": {
"build": "tsc -p tsconfig.json"
"build": "tsc -p tsconfig.json",
"migrate:up:all": "tsx database/migrate.ts up:all",
"migrate:up:one": "tsx database/migrate.ts up:one",
"migrate:down:all": "tsx database/migrate.ts down:all",
"migrate:down:one": "tsx database/migrate.ts down:one"
},
"dependencies": {
"@auth/kysely-adapter": "^1.0.0",
@ -20,6 +24,7 @@
"eslint": "^9.0.0",
"prettier": "^3.2.5",
"ts-config": "*",
"tsx": "^4.9.3",
"typescript": "^5.4.5"
}
}

View file

@ -1,29 +1,12 @@
{
"extends": "ts-config",
"compilerOptions": {
"target": "esnext",
"lib": ["esnext"],
"allowJs": true,
"skipLibCheck": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noEmit": true,
"esModuleInterop": true,
"outDir": "build/main",
"module": "esnext",
"moduleResolution": "node",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",
"incremental": true,
"baseUrl": ".",
"paths": {
"@/*": ["./*", "../../node_modules/*"]
"esModuleInterop": true,
"skipLibCheck": true
},
"plugins": [
{
"name": "next"
}
]
},
"include": ["**.d.ts", "**/*.ts", "**/*.tsx", "**/*.png, **/*.svg"],
"exclude": ["node_modules", "babel__core"]
"include": ["**/*.ts", "**/.*.ts"],
"exclude": ["node_modules", "build", "database"]
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long