import { Server } from "@hapi/hapi"; import { Service } from "@hapipal/schmervice"; import { SavedWhatsappBot as Bot } from "db"; import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType } from "@adiwajshing/baileys"; import workerUtils from "../../worker-utils"; import { useDatabaseAuthState } from "../lib/whatsapp-key-store"; import { connect } from "pg-monitor"; export type AuthCompleteCallback = (error?: string) => void; export default class WhatsappService extends Service { connections: { [key: string]: any } = {}; loginConnections: { [key: string]: any } = {}; static browserDescription: [string, string, string] = [ "Metamigo", "Chrome", "2.0", ]; constructor(server: Server, options: never) { super(server, options); } async initialize(): Promise { this.updateConnections(); } async teardown(): Promise { this.resetConnections(); } private async sleep(ms: number): Promise { console.log(`pausing ${ms}`) return new Promise(resolve => setTimeout(resolve, ms)); } private async resetConnections() { for (const connection of Object.values(this.connections)) { try { connection.end(null) } catch (error) { console.log(error); } } this.connections = {}; } private createConnection(bot: Bot, server: Server, options: any, authCompleteCallback?: any) { const { state, saveState } = useDatabaseAuthState(bot, server) const connection = makeWASocket({ ...options, auth: state }); let pause = 5000; connection.ev.on('connection.update', async (update) => { console.log(`Connection updated ${JSON.stringify(update, null, 2)}`) const { connection: connectionState, lastDisconnect, qr, isNewLogin } = update if (qr) { console.log('got qr code') await this.server.db().whatsappBots.updateQR(bot, qr); } else if (isNewLogin) { console.log("got new login") } else if (connectionState === 'open') { console.log('opened connection') } else if (connectionState === "close") { console.log('connection closed due to ', lastDisconnect.error) const disconnectStatusCode = (lastDisconnect?.error as any)?.output?.statusCode if (disconnectStatusCode === DisconnectReason.restartRequired) { console.log('reconnecting after got new login') const updatedBot = await this.findById(bot.id); this.createConnection(updatedBot, server, options) authCompleteCallback() } else if (disconnectStatusCode !== DisconnectReason.loggedOut) { console.log('reconnecting') await this.sleep(pause) pause = pause * 2 this.createConnection(bot, server, options) } } }) connection.ev.on('chats.set', item => console.log(`recv ${item.chats.length} chats (is latest: ${item.isLatest})`)) connection.ev.on('messages.set', item => console.log(`recv ${item.messages.length} messages (is latest: ${item.isLatest})`)) connection.ev.on('contacts.set', item => console.log(`recv ${item.contacts.length} contacts`)) connection.ev.on('messages.upsert', async m => { console.log("messages upsert") const { messages } = m; if (messages) { await this.queueUnreadMessages(bot, messages); } }) connection.ev.on('messages.update', m => console.log(m)) connection.ev.on('message-receipt.update', m => console.log(m)) connection.ev.on('presence.update', m => console.log(m)) connection.ev.on('chats.update', m => console.log(m)) connection.ev.on('contacts.upsert', m => console.log(m)) connection.ev.on('creds.update', saveState) this.connections[bot.id] = connection; } private async updateConnections() { this.resetConnections(); const bots = await this.server.db().whatsappBots.findAll(); for await (const bot of bots) { if (bot.isVerified) { this.createConnection( bot, this.server, { browser: WhatsappService.browserDescription, printQRInTerminal: false, version: [2, 2204, 13], }) } } } private async queueMessage(bot: Bot, webMessageInfo: proto.WebMessageInfo) { const { key, message, messageTimestamp } = webMessageInfo; const { remoteJid } = key; if (!key.fromMe && message && remoteJid !== "status@broadcast") { const isMediaMessage = message.audioMessage || message.documentMessage || message.imageMessage || message.videoMessage; let messageContent = Object.values(message)[0] let messageType: MediaType; let attachment: string; let filename: string; let mimetype: string; if (isMediaMessage) { if (message.audioMessage) { messageType = "audio"; filename = key.id + "." + message.audioMessage.mimetype.split("/").pop(); mimetype = message.audioMessage.mimetype; } else if (message.documentMessage) { messageType = "document"; filename = message.documentMessage.fileName; mimetype = message.documentMessage.mimetype; } else if (message.imageMessage) { messageType = "image"; filename = key.id + "." + message.imageMessage.mimetype.split("/").pop(); mimetype = message.imageMessage.mimetype; } else if (message.videoMessage) { messageType = "video" filename = key.id + "." + message.videoMessage.mimetype.split("/").pop(); mimetype = message.videoMessage.mimetype; } const stream = await downloadContentFromMessage(messageContent, messageType) let buffer = Buffer.from([]) for await (const chunk of stream) { buffer = Buffer.concat([buffer, chunk]) } attachment = buffer.toString("base64"); } if (messageContent || attachment) { const receivedMessage = { waMessageId: key.id, waMessage: JSON.stringify(webMessageInfo), waTimestamp: new Date((messageTimestamp as number) * 1000), attachment, filename, mimetype, whatsappBotId: bot.id, botPhoneNumber: bot.phoneNumber, }; workerUtils.addJob("whatsapp-message", receivedMessage, { jobKey: key.id, }); } } } private async queueUnreadMessages(bot: Bot, messages: any[]) { for await (const message of messages) { await this.queueMessage(bot, message); } } async create( phoneNumber: string, description: string, email: string ): Promise { const db = this.server.db(); const user = await db.users.findBy({ email }); const row = await db.whatsappBots.insert({ phoneNumber, description, userId: user.id, }); return row; } async findAll(): Promise { return this.server.db().whatsappBots.findAll(); } async findById(id: string): Promise { return this.server.db().whatsappBots.findById({ id }); } async findByToken(token: string): Promise { return this.server.db().whatsappBots.findBy({ token }); } async register(bot: Bot, callback: AuthCompleteCallback): Promise { await this.createConnection(bot, this.server, { version: [2, 2204, 13] }, callback); } async send(bot: Bot, phoneNumber: string, message: string): Promise { const connection = this.connections[bot.id]; const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`; await connection.sendMessage(recipient, { text: message }); } async receiveSince(bot: Bot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; const messages = await connection.messagesReceivedAfter( lastReceivedDate, false ); for (const message of messages) { this.queueMessage(bot, message); } } async receive(bot: Bot, lastReceivedDate: Date): Promise { const connection = this.connections[bot.id]; // const messages = await connection.messagesReceivedAfter( // lastReceivedDate, // false // ); const messages = await connection.loadAllUnreadMessages(); return messages; } }