import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { SavedWhatsappBot as Bot } from "@digiresilience/metamigo-db"; import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType, AnyMessageContent, WAProto, MiscMessageGenerationOptions, } from "@adiwajshing/baileys"; import workerUtils from "../../worker-utils"; import { useDatabaseAuthState } from "../lib/whatsapp-key-store"; export type AuthCompleteCallback = (error?: string) => void; export type Connection = { end: (error: Error | undefined) => void; sendMessage: ( jid: string, content: AnyMessageContent, options?: MiscMessageGenerationOptions ) => Promise; }; export default class WhatsappService extends Service { connections: { [key: string]: Connection } = {}; static browserDescription: [string, string, string] = [ "Metamigo", "Chrome", "2.0", ]; constructor(server: Server, options: never) { super(server, options); } async initialize(): Promise { this.updateConnections(); } async teardown(): Promise { this.resetConnections(); } private async sleep(ms: number): Promise { console.log(`pausing ${ms}`); return new Promise((resolve) => setTimeout(resolve, ms)); } private async resetConnections() { for (const connection of Object.values(this.connections)) { try { connection.end(null); } catch (error) { console.log(error); } } this.connections = {}; } private createConnection( bot: Bot, server: Server, options: any, authCompleteCallback?: any ) { const { state, saveState } = useDatabaseAuthState(bot, server); const connection = makeWASocket({ ...options, auth: state }); let pause = 5000; connection.ev.on("connection.update", async (update) => { console.log(`Connection updated ${JSON.stringify(update, null, 2)}`); const { connection: connectionState, lastDisconnect, qr, isNewLogin, } = update; if (qr) { console.log("got qr code"); await this.server.db().whatsappBots.updateQR(bot, qr); } else if (isNewLogin) { console.log("got new login"); } else if (connectionState === "open") { console.log("opened connection"); } else if (connectionState === "close") { console.log("connection closed due to", lastDisconnect.error); const disconnectStatusCode = (lastDisconnect?.error as any)?.output ?.statusCode; if (disconnectStatusCode === DisconnectReason.restartRequired) { console.log("reconnecting after got new login"); const updatedBot = await this.findById(bot.id); this.createConnection(updatedBot, server, options); authCompleteCallback(); } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { console.log("reconnecting"); await this.sleep(pause); pause *= 2; this.createConnection(bot, server, options); } } }); connection.ev.process(async (events) => { if (events["messaging-history.set"]) { const { chats, contacts, messages, isLatest } = events["messaging-history.set"]; console.log( `recv ${chats.length} chats, ${contacts.length} contacts, ${messages.length} msgs (is latest: ${isLatest})` ); } }); connection.ev.on("messages.upsert", async (m) => { console.log("messages upsert"); const { messages } = m; if (messages) { await this.queueUnreadMessages(bot, messages); } }); connection.ev.on("messages.update", (m) => console.log(m)); connection.ev.on("message-receipt.update", (m) => console.log(m)); connection.ev.on("presence.update", (m) => console.log(m)); connection.ev.on("chats.update", (m) => console.log(m)); connection.ev.on("contacts.upsert", (m) => console.log(m)); connection.ev.on("creds.update", saveState); this.connections[bot.id] = connection; } private async updateConnections() { this.resetConnections(); const bots = await this.server.db().whatsappBots.findAll(); for await (const bot of bots) { if (bot.isVerified) { this.createConnection(bot, this.server, { browser: WhatsappService.browserDescription, printQRInTerminal: false, version: [2, 2204, 13], }); } } } private async queueMessage(bot: Bot, webMessageInfo: proto.WebMessageInfo) { const { key, message, messageTimestamp } = webMessageInfo; const { remoteJid } = key; if (!key.fromMe && message && remoteJid !== "status@broadcast") { const isMediaMessage = message.audioMessage || message.documentMessage || message.imageMessage || message.videoMessage; const messageContent = Object.values(message)[0]; let messageType: MediaType; let attachment: string; let filename: string; let mimetype: string; if (isMediaMessage) { if (message.audioMessage) { messageType = "audio"; filename = key.id + "." + message.audioMessage.mimetype.split("/").pop(); mimetype = message.audioMessage.mimetype; } else if (message.documentMessage) { messageType = "document"; filename = message.documentMessage.fileName; mimetype = message.documentMessage.mimetype; } else if (message.imageMessage) { messageType = "image"; filename = key.id + "." + message.imageMessage.mimetype.split("/").pop(); mimetype = message.imageMessage.mimetype; } else if (message.videoMessage) { messageType = "video"; filename = key.id + "." + message.videoMessage.mimetype.split("/").pop(); mimetype = message.videoMessage.mimetype; } const stream = await downloadContentFromMessage( messageContent, messageType ); let buffer = Buffer.from([]); for await (const chunk of stream) { buffer = Buffer.concat([buffer, chunk]); } attachment = buffer.toString("base64"); } if (messageContent || attachment) { const receivedMessage = { waMessageId: key.id, waMessage: JSON.stringify(webMessageInfo), waTimestamp: new Date((messageTimestamp as number) * 1000), attachment, filename, mimetype, whatsappBotId: bot.id, botPhoneNumber: bot.phoneNumber, }; workerUtils.addJob("whatsapp-message", receivedMessage, { jobKey: key.id, }); } } } private async queueUnreadMessages(bot: Bot, messages: any[]) { for await (const message of messages) { await this.queueMessage(bot, message); } } async create( phoneNumber: string, description: string, email: string ): Promise { const db = this.server.db(); const user = await db.users.findBy({ email }); const row = await db.whatsappBots.insert({ phoneNumber, description, userId: user.id, }); return row; } async findAll(): Promise { return this.server.db().whatsappBots.findAll(); } async findById(id: string): Promise { return this.server.db().whatsappBots.findById({ id }); } async findByToken(token: string): Promise { return this.server.db().whatsappBots.findBy({ token }); } async register(bot: Bot, callback: AuthCompleteCallback): Promise { await this.createConnection( bot, this.server, { version: [2, 2204, 13] }, callback ); } async send(bot: Bot, phoneNumber: string, message: string): Promise { const connection = this.connections[bot.id]; const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`; await connection.sendMessage(recipient, { text: message }); } async receiveSince(bot: Bot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; const messages = await connection.messagesReceivedAfter( lastReceivedDate, false ); for (const message of messages) { this.queueMessage(bot, message); } } async receive(bot: Bot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; // const messages = await connection.messagesReceivedAfter( // lastReceivedDate, // false // ); const messages = await connection.loadAllUnreadMessages(); return messages; } }