import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { SavedWhatsappBot as Bot } from "@digiresilience/metamigo-db"; import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType, AnyMessageContent, WAProto, UserFacingSocketConfig, MiscMessageGenerationOptions, } from "@adiwajshing/baileys"; import workerUtils from "../../worker-utils"; import { useDatabaseAuthState } from "../lib/whatsapp-key-store"; export type AuthCompleteCallback = (error?: string) => void; export type Connection = { end: (error: Error | undefined) => void; sendMessage: ( jid: string, content: AnyMessageContent, options?: MiscMessageGenerationOptions ) => Promise; }; export default class WhatsappService extends Service { connections: { [key: string]: Connection } = {}; static browserDescription: [string, string, string] = [ "Metamigo", "Chrome", "2.0", ]; constructor(server: Server, options: never) { super(server, options); } async initialize(): Promise { this.updateConnections(); } async teardown(): Promise { this.resetConnections(); } private async sleep(ms: number): Promise { console.log(`pausing ${ms}`); // eslint-disable-next-line no-new new Promise((resolve) => { setTimeout(resolve, ms); }); } private async resetConnections() { for (const connection of Object.values(this.connections)) { try { connection.end(undefined); } catch (error) { console.log(error); } } this.connections = {}; } private createConnection( bot: Bot, server: Server, options: Omit, authCompleteCallback?: () => void ) { const { state, saveState } = useDatabaseAuthState(bot, server); const connection = makeWASocket({ ...options, auth: state }); let pause = 5000; connection.ev.on("connection.update", async (update) => { console.log(`Connection updated ${JSON.stringify(update, undefined, 2)}`); const { connection: connectionState, lastDisconnect, qr, isNewLogin, } = update; if (qr) { console.log("got qr code"); await this.server.db().whatsappBots.updateQR(bot, qr); } else if (isNewLogin) { console.log("got new login"); } else if (connectionState === "open") { console.log("opened connection"); } else if (connectionState === "close") { console.log("connection closed due to", lastDisconnect.error); // eslint-disable-next-line @typescript-eslint/no-explicit-any const disconnectStatusCode = (lastDisconnect?.error as any)?.output ?.statusCode; if (disconnectStatusCode === DisconnectReason.restartRequired) { console.log("reconnecting after got new login"); const updatedBot = await this.findById(bot.id); this.createConnection(updatedBot, server, options); authCompleteCallback(); } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { console.log("reconnecting"); await this.sleep(pause); pause *= 2; this.createConnection(bot, server, options); } } }); connection.ev.process(async (events) => { if (events["messaging-history.set"]) { const { chats, contacts, messages, isLatest } = events["messaging-history.set"]; console.log( `recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})` ); } }); connection.ev.on("messages.upsert", async (m) => { console.log("messages upsert"); const { messages } = m; if (messages) { await this.queueUnreadMessages(bot, messages); } }); connection.ev.on("messages.update", (m) => console.log(m)); connection.ev.on("message-receipt.update", (m) => console.log(m)); connection.ev.on("presence.update", (m) => console.log(m)); connection.ev.on("chats.update", (m) => console.log(m)); connection.ev.on("contacts.upsert", (m) => console.log(m)); connection.ev.on("creds.update", saveState); this.connections[bot.id] = connection; } private async updateConnections() { this.resetConnections(); const bots = await this.server.db().whatsappBots.findAll(); for await (const bot of bots) { if (bot.isVerified) { this.createConnection(bot, this.server, { browser: WhatsappService.browserDescription, printQRInTerminal: false, version: [2, 2204, 13], }); } } } private async queueMessage(bot: Bot, webMessageInfo: proto.IWebMessageInfo) { const { key, message, messageTimestamp } = webMessageInfo; const { remoteJid } = key; const {fromMe, id: keyId} = key; if (!fromMe && message && remoteJid !== "status@broadcast") { const { audioMessage, documentMessage, imageMessage, videoMessage } = message; const isMediaMessage = audioMessage || documentMessage || imageMessage || videoMessage; const messageContent = Object.values(message)[0]; let messageType: MediaType; let attachment: string; let filename: string; let mimetype: string; if (isMediaMessage) { if (audioMessage) { messageType = "audio"; filename = keyId + "." + audioMessage.mimetype.split("/").pop(); mimetype = audioMessage.mimetype; } else if (documentMessage) { messageType = "document"; filename = documentMessage.fileName; mimetype = documentMessage.mimetype; } else if (imageMessage) { messageType = "image"; filename = keyId + "." + imageMessage.mimetype.split("/").pop(); mimetype = imageMessage.mimetype; } else if (videoMessage) { messageType = "video"; filename = keyId + "." + videoMessage.mimetype.split("/").pop(); mimetype = videoMessage.mimetype; } const stream = await downloadContentFromMessage( messageContent, messageType ); let buffer = Buffer.from([]); for await (const chunk of stream) { buffer = Buffer.concat([buffer, chunk]); } attachment = buffer.toString("base64"); } if (messageContent || attachment) { const receivedMessage = { waMessageId: keyId, waMessage: JSON.stringify(webMessageInfo), waTimestamp: new Date((messageTimestamp as number) * 1000), attachment, filename, mimetype, whatsappBotId: bot.id, botPhoneNumber: bot.phoneNumber, }; workerUtils.addJob("whatsapp-message", receivedMessage, { jobKey: keyId, }); } } } private async queueUnreadMessages(bot: Bot, messages: proto.IWebMessageInfo[]) { for await (const message of messages) { await this.queueMessage(bot, message); } } async create( phoneNumber: string, description: string, email: string ): Promise { const db = this.server.db(); const user = await db.users.findBy({ email }); const row = await db.whatsappBots.insert({ phoneNumber, description, userId: user.id, }); return row; } async findAll(): Promise { return this.server.db().whatsappBots.findAll(); } async findById(id: string): Promise { return this.server.db().whatsappBots.findById({ id }); } async findByToken(token: string): Promise { return this.server.db().whatsappBots.findBy({ token }); } async register(bot: Bot, callback: AuthCompleteCallback): Promise { await this.createConnection( bot, this.server, { version: [2, 2204, 13] }, callback ); } async send(bot: Bot, phoneNumber: string, message: string): Promise { const connection = this.connections[bot.id]; const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`; await connection.sendMessage(recipient, { text: message }); } async receiveSince(bot: Bot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; const messages = await connection.messagesReceivedAfter( lastReceivedDate, false ); for (const message of messages) { this.queueMessage(bot, message); } } async receive(bot: Bot, _lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; // const messages = await connection.messagesReceivedAfter( // lastReceivedDate, // false // ); const messages = await connection.loadAllUnreadMessages(); return messages; } }