From 832226c229998aafb94aad657b2a510f6d4dac04 Mon Sep 17 00:00:00 2001 From: jrmyr Date: Thu, 1 May 2025 16:57:03 +0000 Subject: [PATCH] Added pub/sub support. --- _opt/pbUtils.js | 113 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 10 deletions(-) diff --git a/_opt/pbUtils.js b/_opt/pbUtils.js index 8968ae0..6f31106 100644 --- a/_opt/pbUtils.js +++ b/_opt/pbUtils.js @@ -1,4 +1,11 @@ // _opt/pbutils.js +// Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require) +import { createRequire } from 'module'; +const require = createRequire(import.meta.url); +const { EventSource } = require('eventsource'); +if (typeof global.EventSource === 'undefined') { + global.EventSource = EventSource; +} /** * PocketBase utilities module - extends PocketBase client with useful shortcuts @@ -10,25 +17,60 @@ * @param {Object} config - Client configuration */ export const init = async (client, config) => { - const { pb, logger } = client; + const { pb, logger } = client; - logger.info('Initializing PocketBase utilities module'); + logger.info('Initializing PocketBase utilities module'); - // Attach utility methods to the pb object - extendPocketBase(pb, logger); + // Attach utility methods to the pb object + extendPocketBase(client, pb, logger); - // Add connection state handling - setupConnectionHandling(pb, logger); + // Add connection state handling + setupConnectionHandling(pb, logger); + + // Subscribe to real-time message queue events and re-emit via client + try { + pb.collection('message_queue').subscribe('*', (e) => { + client.emit('message_queue_event', e.action, e.record); + logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`); + }); + logger.info('Subscribed to PocketBase message_queue realtime events'); + } catch (error) { + logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`); + } - logger.info('PocketBase utilities module initialized'); +// end of init() + + logger.info('PocketBase utilities module initialized'); }; +/** + * Register a handler for incoming message_queue pub/sub events. + * Other modules can import and use this to react to external messages. + * @param {import('discord.js').Client} client - The Discord client + * @param {(action: string, record: object) => void} handler - Callback for each event + */ +export function onMessageQueueEvent(client, handler) { + client.on('message_queue_event', (action, record) => { + try { + handler(action, record); + } catch (err) { + client.logger.error(`Error in message_queue handler: ${err.message}`); + } + }); +} + /** * Extends the PocketBase instance with utility methods * @param {Object} pb - PocketBase instance * @param {Object} logger - Winston logger */ -const extendPocketBase = (pb, logger) => { +/** + * Adds utility methods to the PocketBase client. + * @param {import('discord.js').Client} client - The Discord client + * @param {object} pb - The PocketBase instance + * @param {object} logger - Logger instance + */ +const extendPocketBase = (client, pb, logger) => { // ===== COLLECTION OPERATIONS ===== /** @@ -87,7 +129,7 @@ const extendPocketBase = (pb, logger) => { * @param {string} id - Record ID * @returns {Promise} Success status */ - pb.deleteOne = async (collection, id) => { + pb.deleteOne = async (collection, id) => { try { await pb.collection(collection).delete(id); return true; @@ -100,6 +142,25 @@ const extendPocketBase = (pb, logger) => { throw error; } }; + + /** + * Convenience: publish a message into the "message_queue" collection, + * with source/destination validation. + * @param {string} source - origin (client id or 'external') + * @param {string} destination - target client id + * @param {string} dataType - message type + * @param {object} data - JSON-serializable payload + * @returns {Promise} created record + */ + pb.publishMessage = async (source, destination, dataType, data) => { + // Valid sources: all configured clients + 'external' + const validSources = client.config.clients.map(c => c.id).concat('external'); + if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`); + // Valid destinations: all configured clients + const validDest = client.config.clients.map(c => c.id); + if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`); + return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) }); + }; /** * Upsert - creates or updates a record based on whether it exists @@ -194,7 +255,7 @@ const extendPocketBase = (pb, logger) => { } }; - // ===== BATCH OPERATIONS ===== + // ===== BATCH OPERATIONS ===== /** * Perform batch create @@ -291,6 +352,38 @@ const extendPocketBase = (pb, logger) => { throw error; } }; + /** + * Delete a message in the "message_queue" collection by its record ID. + * @param {string} id - Record ID to delete. + * @returns {Promise} True if deleted or not found, false on error. + */ + pb.deleteMessageQueue = async (id) => { + return await pb.deleteOne('message_queue', id); + }; + + // ===== PUB/SUB OPERATIONS ===== + + /** + * Publish a message into the "message_queue" collection. + * @param {string} source - Origin identifier for the message. + * @param {string} destination - Target identifier (e.g. channel or client ID). + * @param {string} dataType - A short string describing the type of data. + * @param {object} data - The payload object to deliver. + * @returns {Promise} The created message_queue record. + */ + pb.publishMessage = async (source, destination, dataType, data) => { + try { + return await pb.collection('message_queue').create({ + source, + destination, + dataType, + data: JSON.stringify(data) + }); + } catch (error) { + logger.error(`Failed to publish message to message_queue: ${error.message}`); + throw error; + } + }; // ===== CACHE MANAGEMENT =====