import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { db, WhatsappBot } from "bridge-common"; import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType, fetchLatestBaileysVersion, isJidBroadcast, isJidStatusBroadcast, useMultiFileAuthState, } from "@whiskeysockets/baileys"; import fs from "fs"; export type AuthCompleteCallback = (error?: string) => void; export default class WhatsappService extends Service { connections: { [key: string]: any } = {}; loginConnections: { [key: string]: any } = {}; static browserDescription: [string, string, string] = [ "Bridge", "Chrome", "2.0", ]; constructor(server: Server, options: never) { super(server, options); } getAuthDirectory(bot: WhatsappBot): string { return `/baileys/${bot.id}`; } async initialize(): Promise { this.updateConnections(); } async teardown(): Promise { this.resetConnections(); } private async sleep(ms: number): Promise { console.log(`pausing ${ms}`); return new Promise((resolve) => setTimeout(resolve, ms)); } private async resetConnections() { for (const connection of Object.values(this.connections)) { try { connection.end(null); } catch (error) { console.log(error); } } this.connections = {}; } private async createConnection( bot: WhatsappBot, server: Server, options: any, authCompleteCallback?: any, ) { const directory = this.getAuthDirectory(bot); const { state, saveCreds } = await useMultiFileAuthState(directory); const msgRetryCounterMap: any = {}; const socket = makeWASocket({ ...options, auth: state, msgRetryCounterMap, shouldIgnoreJid: (jid) => isJidBroadcast(jid) || isJidStatusBroadcast(jid), }); let pause = 5000; socket.ev.process(async (events) => { if (events["connection.update"]) { const update = events["connection.update"]; const { connection: connectionState, lastDisconnect, qr, isNewLogin, } = update; if (qr) { console.log("got qr code"); await db .updateTable("WhatsappBot") .set({ qrCode: qr, verified: false, }) .where("id", "=", bot.id) .executeTakeFirst(); } else if (isNewLogin) { console.log("got new login"); await db .updateTable("WhatsappBot") .set({ verified: true, }) .where("id", "=", bot.id) .executeTakeFirst(); } else if (connectionState === "open") { console.log("opened connection"); } else if (connectionState === "close") { console.log("connection closed due to ", lastDisconnect.error); const disconnectStatusCode = (lastDisconnect?.error as any)?.output ?.statusCode; if (disconnectStatusCode === DisconnectReason.restartRequired) { console.log("reconnecting after got new login"); const updatedBot = await db .selectFrom("WhatsappBot") .selectAll() .where("id", "=", bot.id) .executeTakeFirstOrThrow(); await this.createConnection(updatedBot, server, options); authCompleteCallback?.(); } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { console.log("reconnecting"); await this.sleep(pause); pause *= 2; this.createConnection(bot, server, options); } } } if (events["creds.update"]) { console.log("creds update"); await saveCreds(); } if (events["messages.upsert"]) { console.log("messages upsert"); const upsert = events["messages.upsert"]; const { messages } = upsert; if (messages) { await this.queueUnreadMessages(bot, messages); } } }); this.connections[bot.id] = { socket, msgRetryCounterMap }; } private async updateConnections() { this.resetConnections(); const bots = await db.selectFrom("WhatsappBot").selectAll().execute(); for await (const bot of bots) { if (bot.verified) { const { version, isLatest } = await fetchLatestBaileysVersion(); console.log(`using WA v${version.join(".")}, isLatest: ${isLatest}`); await this.createConnection(bot, this.server, { browser: WhatsappService.browserDescription, printQRInTerminal: false, version, }); } } } private async queueMessage( bot: WhatsappBot, webMessageInfo: proto.IWebMessageInfo, ) { const { key: { id, fromMe, remoteJid }, message, messageTimestamp, } = webMessageInfo; if (!fromMe && message && remoteJid !== "status@broadcast") { const { audioMessage, documentMessage, imageMessage, videoMessage } = message; const isMediaMessage = audioMessage || documentMessage || imageMessage || videoMessage; const messageContent = Object.values(message)[0]; let messageType: MediaType; let attachment: string; let filename: string; let mimetype: string; if (isMediaMessage) { if (audioMessage) { messageType = "audio"; filename = id + "." + audioMessage.mimetype.split("/").pop(); mimetype = audioMessage.mimetype; } else if (documentMessage) { messageType = "document"; filename = documentMessage.fileName; mimetype = documentMessage.mimetype; } else if (imageMessage) { messageType = "image"; filename = id + "." + imageMessage.mimetype.split("/").pop(); mimetype = imageMessage.mimetype; } else if (videoMessage) { messageType = "video"; filename = id + "." + videoMessage.mimetype.split("/").pop(); mimetype = videoMessage.mimetype; } const stream = await downloadContentFromMessage( messageContent, messageType, ); let buffer = Buffer.from([]); for await (const chunk of stream) { buffer = Buffer.concat([buffer, chunk]); } attachment = buffer.toString("base64"); } if (messageContent || attachment) { const receivedMessage = { waMessageId: id, waMessage: JSON.stringify(webMessageInfo), waTimestamp: new Date((messageTimestamp as number) * 1000), attachment, filename, mimetype, whatsappBotId: bot.id, botPhoneNumber: bot.phoneNumber, }; // switch to send to bridge-frontend // workerUtils.addJob("whatsapp-message", receivedMessage, { // jobKey: id, // }); } } } private async queueUnreadMessages( bot: WhatsappBot, messages: proto.IWebMessageInfo[], ) { for await (const message of messages) { await this.queueMessage(bot, message); } } async create( phoneNumber: string, description: string, email: string, ): Promise { const user = await db .selectFrom("User") .selectAll() .where("email", "=", email) .executeTakeFirstOrThrow(); const row = await db .insertInto("WhatsappBot") .values({ phoneNumber, description, userId: user.id, }) .returningAll() .executeTakeFirst(); return row; } async unverify(bot: WhatsappBot): Promise { const directory = this.getAuthDirectory(bot); fs.rmSync(directory, { recursive: true, force: true }); return db .updateTable("WhatsappBot") .set({ verified: false }) .where("id", "=", bot.id) .returningAll() .executeTakeFirst(); } async remove(bot: WhatsappBot): Promise { const directory = this.getAuthDirectory(bot); fs.rmSync(directory, { recursive: true, force: true }); const result = await db .deleteFrom("WhatsappBot") .where("id", "=", bot.id) .execute(); return result.length; } async findAll(): Promise { return db.selectFrom("WhatsappBot").selectAll().execute(); } async findById(id: string): Promise { return db .selectFrom("WhatsappBot") .selectAll() .where("id", "=", id) .executeTakeFirstOrThrow(); } async findByToken(token: string): Promise { return db .selectFrom("WhatsappBot") .selectAll() .where("token", "=", token) .executeTakeFirstOrThrow(); } async register( bot: WhatsappBot, callback: AuthCompleteCallback, ): Promise { const { version } = await fetchLatestBaileysVersion(); await this.createConnection(bot, this.server, { version }, callback); } async send( bot: WhatsappBot, phoneNumber: string, message: string, ): Promise { const connection = this.connections[bot.id]?.socket; const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`; await connection.sendMessage(recipient, { text: message }); } async receiveSince(bot: WhatsappBot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]?.socket; const messages = await connection.messagesReceivedAfter( lastReceivedDate, false, ); for (const message of messages) { this.queueMessage(bot, message); } } async receive( bot: WhatsappBot, _lastReceivedDate: Date, ): Promise { const connection = this.connections[bot.id]?.socket; const messages = await connection.loadAllUnreadMessages(); return messages; } }