link-stack/apps/metamigo-api/app/services/whatsapp.ts
2023-02-13 13:10:48 +00:00

247 lines
8.2 KiB
TypeScript

import { Server } from "@hapi/hapi";
import { Service } from "@hapipal/schmervice";
import { SavedWhatsappBot as Bot } from "db";
import makeWASocket, { DisconnectReason, proto, downloadContentFromMessage, MediaType } from "@adiwajshing/baileys";
import workerUtils from "../../worker-utils";
import { useDatabaseAuthState } from "../lib/whatsapp-key-store";
import { connect } from "pg-monitor";
export type AuthCompleteCallback = (error?: string) => void;
export default class WhatsappService extends Service {
connections: { [key: string]: any } = {};
loginConnections: { [key: string]: any } = {};
static browserDescription: [string, string, string] = [
"Metamigo",
"Chrome",
"2.0",
];
constructor(server: Server, options: never) {
super(server, options);
}
async initialize(): Promise<void> {
this.updateConnections();
}
async teardown(): Promise<void> {
this.resetConnections();
}
private async sleep(ms: number): Promise<void> {
console.log(`pausing ${ms}`)
return new Promise(resolve => setTimeout(resolve, ms));
}
private async resetConnections() {
for (const connection of Object.values(this.connections)) {
try {
connection.end(null)
} catch (error) {
console.log(error);
}
}
this.connections = {};
}
private createConnection(bot: Bot, server: Server, options: any, authCompleteCallback?: any) {
const { state, saveState } = useDatabaseAuthState(bot, server)
const connection = makeWASocket({ ...options, auth: state });
let pause = 5000;
connection.ev.on('connection.update', async (update) => {
console.log(`Connection updated ${JSON.stringify(update, null, 2)}`)
const { connection: connectionState, lastDisconnect, qr, isNewLogin } = update
if (qr) {
console.log('got qr code')
await this.server.db().whatsappBots.updateQR(bot, qr);
} else if (isNewLogin) {
console.log("got new login")
} else if (connectionState === 'open') {
console.log('opened connection')
} else if (connectionState === "close") {
console.log('connection closed due to ', lastDisconnect.error)
const disconnectStatusCode = (lastDisconnect?.error as any)?.output?.statusCode
if (disconnectStatusCode === DisconnectReason.restartRequired) {
console.log('reconnecting after got new login')
const updatedBot = await this.findById(bot.id);
this.createConnection(updatedBot, server, options)
authCompleteCallback()
} else if (disconnectStatusCode !== DisconnectReason.loggedOut) {
console.log('reconnecting')
await this.sleep(pause)
pause = pause * 2
this.createConnection(bot, server, options)
}
}
})
connection.ev.on('chats.set', item => console.log(`recv ${item.chats.length} chats (is latest: ${item.isLatest})`))
connection.ev.on('messages.set', item => console.log(`recv ${item.messages.length} messages (is latest: ${item.isLatest})`))
connection.ev.on('contacts.set', item => console.log(`recv ${item.contacts.length} contacts`))
connection.ev.on('messages.upsert', async m => {
console.log("messages upsert")
const { messages } = m;
if (messages) {
await this.queueUnreadMessages(bot, messages);
}
})
connection.ev.on('messages.update', m => console.log(m))
connection.ev.on('message-receipt.update', m => console.log(m))
connection.ev.on('presence.update', m => console.log(m))
connection.ev.on('chats.update', m => console.log(m))
connection.ev.on('contacts.upsert', m => console.log(m))
connection.ev.on('creds.update', saveState)
this.connections[bot.id] = connection;
}
private async updateConnections() {
this.resetConnections();
const bots = await this.server.db().whatsappBots.findAll();
for await (const bot of bots) {
if (bot.isVerified) {
this.createConnection(
bot,
this.server,
{
browser: WhatsappService.browserDescription,
printQRInTerminal: false,
version: [2, 2204, 13],
})
}
}
}
private async queueMessage(bot: Bot, webMessageInfo: proto.WebMessageInfo) {
const { key, message, messageTimestamp } = webMessageInfo;
const { remoteJid } = key;
if (!key.fromMe && message && remoteJid !== "status@broadcast") {
const isMediaMessage =
message.audioMessage ||
message.documentMessage ||
message.imageMessage ||
message.videoMessage;
let messageContent = Object.values(message)[0]
let messageType: MediaType;
let attachment: string;
let filename: string;
let mimetype: string;
if (isMediaMessage) {
if (message.audioMessage) {
messageType = "audio";
filename =
key.id + "." + message.audioMessage.mimetype.split("/").pop();
mimetype = message.audioMessage.mimetype;
} else if (message.documentMessage) {
messageType = "document";
filename = message.documentMessage.fileName;
mimetype = message.documentMessage.mimetype;
} else if (message.imageMessage) {
messageType = "image";
filename =
key.id + "." + message.imageMessage.mimetype.split("/").pop();
mimetype = message.imageMessage.mimetype;
} else if (message.videoMessage) {
messageType = "video"
filename =
key.id + "." + message.videoMessage.mimetype.split("/").pop();
mimetype = message.videoMessage.mimetype;
}
const stream = await downloadContentFromMessage(messageContent, messageType)
let buffer = Buffer.from([])
for await (const chunk of stream) {
buffer = Buffer.concat([buffer, chunk])
}
attachment = buffer.toString("base64");
}
if (messageContent || attachment) {
const receivedMessage = {
waMessageId: key.id,
waMessage: JSON.stringify(webMessageInfo),
waTimestamp: new Date((messageTimestamp as number) * 1000),
attachment,
filename,
mimetype,
whatsappBotId: bot.id,
botPhoneNumber: bot.phoneNumber,
};
workerUtils.addJob("whatsapp-message", receivedMessage, {
jobKey: key.id,
});
}
}
}
private async queueUnreadMessages(bot: Bot, messages: any[]) {
for await (const message of messages) {
await this.queueMessage(bot, message);
}
}
async create(
phoneNumber: string,
description: string,
email: string
): Promise<Bot> {
const db = this.server.db();
const user = await db.users.findBy({ email });
const row = await db.whatsappBots.insert({
phoneNumber,
description,
userId: user.id,
});
return row;
}
async findAll(): Promise<Bot[]> {
return this.server.db().whatsappBots.findAll();
}
async findById(id: string): Promise<Bot> {
return this.server.db().whatsappBots.findById({ id });
}
async findByToken(token: string): Promise<Bot> {
return this.server.db().whatsappBots.findBy({ token });
}
async register(bot: Bot, callback: AuthCompleteCallback): Promise<void> {
await this.createConnection(bot, this.server, { version: [2, 2204, 13] }, callback);
}
async send(bot: Bot, phoneNumber: string, message: string): Promise<void> {
const connection = this.connections[bot.id];
const recipient = `${phoneNumber.replace(/\D+/g, "")}@s.whatsapp.net`;
await connection.sendMessage(recipient, { text: message });
}
async receiveSince(bot: Bot, lastReceivedDate: Date): Promise<void> {
const connection = this.connections[bot.id];
const messages = await connection.messagesReceivedAfter(
lastReceivedDate,
false
);
for (const message of messages) {
this.queueMessage(bot, message);
}
}
async receive(bot: Bot, lastReceivedDate: Date): Promise<any> {
const connection = this.connections[bot.id];
// const messages = await connection.messagesReceivedAfter(
// lastReceivedDate,
// false
// );
const messages = await connection.loadAllUnreadMessages();
return messages;
}
}