link-stack/packages/node-signald/src/util.ts
2023-03-13 10:47:38 +00:00

277 lines
7.5 KiB
TypeScript

import EventEmitter from "eventemitter3";
import { v4 as uuid } from "uuid";
import * as backoff from "backoff";
import * as net from "net";
import { throwOnError } from "./error";
export interface EventTypes {
/**
* Emitted when the signald connection closes.
* @event
*/
transport_disconnected(): void;
/**
* Emitted when a new connection is established.
* @event
*/
transport_connected(): void;
/**
* Emitted when a transport level error occurs. This is errors around the socket and connection, errors from signald itself will not be emitted here.
* @event
*/
transport_error(error: any): void;
/**
* Every full JSON payload sent to signald will be emitted on this event (after sending).
* @event
*/
transport_sent_payload(payload: object): void;
/**
* Every full JSON payload received from signald will be emitted on this event (before processing)
* @event
*/
transport_received_payload(payload: object): void;
}
/**
* JSONTransport handles the low level connection and over-the-wire handling.
*
* You probably do not need to instantiate this class directly.
*/
export class JSONTransport extends EventEmitter<EventTypes> {
#socketfile: string;
#client;
#connected: boolean = false;
#buffer: string;
#callbacks: Record<string, Function>;
#logger: Logger = consoleLogger;
#backoff: backoff.Backoff;
constructor() {
super();
this.#buffer = "";
this.#callbacks = {};
}
/**
* Connect to signald via the unix domain socket, with a fibonacci backoff
* @param socketfile the path to the signald socket
* @param maxDelay maximum delay (in ms) of the backoff
* @param initialDelay how long (in ms) the initial delay before the first retry is
* @return a promise that resolves on a successful connection
*/
public connectWithBackoff(
socketfile: string,
maxDelay: number = 90000,
initialDelay: number = 10
) {
this.#backoff = backoff.fibonacci({
initialDelay: 10,
maxDelay: 90000,
});
let lastLogNotify = 0;
let backoffInProgress = true;
this.#backoff.on("backoff", (number, delay) => {});
this.#backoff.on("ready", async (number, delay) => {
const now = +new Date();
if (backoffInProgress && now - lastLogNotify > 10000) {
lastLogNotify = +new Date();
this.log(`reconnecting. attempt=${number} delay=${delay}`);
}
if (!this.isConnected()) {
this.connect(socketfile);
}
});
this.on("transport_disconnected", async () => {
if (!backoffInProgress) this.log("disconnected. attemping to reconnect.");
this.#backoff.backoff();
backoffInProgress = true;
});
this.on("transport_connected", async () => {
this.log("connected");
this.#backoff.reset();
backoffInProgress = false;
lastLogNotify = 0;
});
this.connect(socketfile);
}
/**
* Connect to signald via the unix domain socket.
* @param socketfile the path to the signald socket
* @return a promise that resolves on a successful connection
*/
public connect(socketfile: string) {
let connectedOnce = false;
this.#socketfile = socketfile;
this.#client = net.createConnection(socketfile);
this.#client.setEncoding("utf8");
this.#client.on("connect", () => {
this.#connected = true;
connectedOnce = true;
this.#buffer = "";
this.emit("transport_connected");
});
this.#client.on("close", () => {
this.#connected = false;
this.emit("transport_disconnected");
});
this.#client.on("data", (frame) => {
this.#buffer += frame;
if (!this.#buffer.endsWith("\n")) return;
const data = this.#buffer;
this.#buffer = "";
data.split("\n").forEach((line) => {
if (!line) return;
const payload = JSON.parse(line);
this.emit("transport_received_payload", payload);
this.receivePayload(payload);
});
});
this.#client.on("error", (error) => {
this.emit("transport_error", error);
});
}
/**
* Connect to signald asynchronously. Does not include backoff and auto-reconnect.
*/
public async connectAsync(socketfile: string): Promise<void> {
return new Promise((resolve, reject) => {
const onconnected = () => {
this.removeListener("transport_connected", onconnected);
this.removeListener("transport_error", onerror);
resolve();
};
const onerror = (error) => {
this.removeListener("transport_error", onerror);
this.removeListener("transport_connected", onconnected);
reject(error);
};
this.on("transport_connected", onconnected);
this.on("transport_error", onerror);
this.connect(socketfile);
});
}
/**
* Disconnect from the signald socket.
*/
public disconnect() {
this.#client.end();
}
/**
* @returns true if the socket is connected
*/
public isConnected(): boolean {
return this.#connected;
}
private receivePayload(payload: any) {
const { id, type, version = "v0" } = payload;
if (!type) {
this.debug("no type in payload.");
this.debug(
"found following keys: ",
JSON.stringify(Object.keys(payload))
);
} else {
this.emit(`${type}${version}` as keyof EventTypes, payload.data);
}
if (id) {
const callback = this.#callbacks[id];
if (!callback) {
this.error(
`Payload received for an id but no callbacks were registered. id=${id}`
);
} else callback(payload);
}
}
protected async sendRequest(
payload: any,
id: string = uuid(),
callback?
): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.#connected) {
reject(new Error("send failed. not connected"));
}
if (callback) {
this.#callbacks[id] = callback;
}
payload["id"] = id;
const serialized = JSON.stringify(payload) + "\n";
this.#client.write(serialized, "utf8", () => {
this.emit("transport_sent_payload", payload);
resolve();
});
});
}
protected async getResponse(payload: any): Promise<any> {
return new Promise((resolve, reject) => {
const callback = (responsePayload: any) => {
try {
throwOnError(responsePayload);
} catch (e) {
reject(e);
}
resolve(responsePayload.data);
};
this.sendRequest(payload, uuid(), callback);
});
}
/**
* Sets the logger used to log events, debug, and error messages
*
* @param logger the logger implementation to use
*/
public setLogger(logger: Logger) {
this.#logger = logger;
}
_log(level: LogLevel, message?: any, ...extra: any[]) {
if (!this.#logger) return;
this.#logger(level, `[signald] ${message}`);
}
protected log(message?: any, ...extra: any[]) {
this._log("info", message, ...extra);
}
protected debug(message?: any, ...extra: any[]) {
this._log("debug", message, ...extra);
}
protected warn(message?: any, ...extra: any[]) {
this._log("warn", message, ...extra);
}
protected error(message?: any, ...extra: any[]) {
this._log("error", message, ...extra);
}
}
export type LogLevel = "debug" | "info" | "warn" | "error";
export type Logger = (level: LogLevel, message?: any, ...extra: any[]) => void;
export const nullLogger = (
level: LogLevel,
message?: any,
...extra: any[]
) => {};
export const consoleLogger = (
level: LogLevel,
message?: any,
...extra: any[]
) => console[level](message, ...extra);