Compare commits

..

2 Commits

Author SHA1 Message Date
bb3c97c5bb Merge pull request 'Added pub/sub support.' (#2) from pb-pubsub into main
Reviewed-on: #2
2025-05-01 16:59:06 +00:00
832226c229 Added pub/sub support. 2025-05-01 16:57:03 +00:00

View File

@ -1,4 +1,11 @@
// _opt/pbutils.js
// Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require)
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const { EventSource } = require('eventsource');
if (typeof global.EventSource === 'undefined') {
global.EventSource = EventSource;
}
/**
* PocketBase utilities module - extends PocketBase client with useful shortcuts
@ -10,25 +17,60 @@
* @param {Object} config - Client configuration
*/
export const init = async (client, config) => {
const { pb, logger } = client;
const { pb, logger } = client;
logger.info('Initializing PocketBase utilities module');
logger.info('Initializing PocketBase utilities module');
// Attach utility methods to the pb object
extendPocketBase(pb, logger);
// Attach utility methods to the pb object
extendPocketBase(client, pb, logger);
// Add connection state handling
setupConnectionHandling(pb, logger);
// Add connection state handling
setupConnectionHandling(pb, logger);
// Subscribe to real-time message queue events and re-emit via client
try {
pb.collection('message_queue').subscribe('*', (e) => {
client.emit('message_queue_event', e.action, e.record);
logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`);
});
logger.info('Subscribed to PocketBase message_queue realtime events');
} catch (error) {
logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`);
}
logger.info('PocketBase utilities module initialized');
// end of init()
logger.info('PocketBase utilities module initialized');
};
/**
* Register a handler for incoming message_queue pub/sub events.
* Other modules can import and use this to react to external messages.
* @param {import('discord.js').Client} client - The Discord client
* @param {(action: string, record: object) => void} handler - Callback for each event
*/
export function onMessageQueueEvent(client, handler) {
client.on('message_queue_event', (action, record) => {
try {
handler(action, record);
} catch (err) {
client.logger.error(`Error in message_queue handler: ${err.message}`);
}
});
}
/**
* Extends the PocketBase instance with utility methods
* @param {Object} pb - PocketBase instance
* @param {Object} logger - Winston logger
*/
const extendPocketBase = (pb, logger) => {
/**
* Adds utility methods to the PocketBase client.
* @param {import('discord.js').Client} client - The Discord client
* @param {object} pb - The PocketBase instance
* @param {object} logger - Logger instance
*/
const extendPocketBase = (client, pb, logger) => {
// ===== COLLECTION OPERATIONS =====
/**
@ -87,7 +129,7 @@ const extendPocketBase = (pb, logger) => {
* @param {string} id - Record ID
* @returns {Promise<boolean>} Success status
*/
pb.deleteOne = async (collection, id) => {
pb.deleteOne = async (collection, id) => {
try {
await pb.collection(collection).delete(id);
return true;
@ -100,6 +142,25 @@ const extendPocketBase = (pb, logger) => {
throw error;
}
};
/**
* Convenience: publish a message into the "message_queue" collection,
* with source/destination validation.
* @param {string} source - origin (client id or 'external')
* @param {string} destination - target client id
* @param {string} dataType - message type
* @param {object} data - JSON-serializable payload
* @returns {Promise<Object>} created record
*/
pb.publishMessage = async (source, destination, dataType, data) => {
// Valid sources: all configured clients + 'external'
const validSources = client.config.clients.map(c => c.id).concat('external');
if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`);
// Valid destinations: all configured clients
const validDest = client.config.clients.map(c => c.id);
if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`);
return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) });
};
/**
* Upsert - creates or updates a record based on whether it exists
@ -194,7 +255,7 @@ const extendPocketBase = (pb, logger) => {
}
};
// ===== BATCH OPERATIONS =====
// ===== BATCH OPERATIONS =====
/**
* Perform batch create
@ -291,6 +352,38 @@ const extendPocketBase = (pb, logger) => {
throw error;
}
};
/**
* Delete a message in the "message_queue" collection by its record ID.
* @param {string} id - Record ID to delete.
* @returns {Promise<boolean>} True if deleted or not found, false on error.
*/
pb.deleteMessageQueue = async (id) => {
return await pb.deleteOne('message_queue', id);
};
// ===== PUB/SUB OPERATIONS =====
/**
* Publish a message into the "message_queue" collection.
* @param {string} source - Origin identifier for the message.
* @param {string} destination - Target identifier (e.g. channel or client ID).
* @param {string} dataType - A short string describing the type of data.
* @param {object} data - The payload object to deliver.
* @returns {Promise<object>} The created message_queue record.
*/
pb.publishMessage = async (source, destination, dataType, data) => {
try {
return await pb.collection('message_queue').create({
source,
destination,
dataType,
data: JSON.stringify(data)
});
} catch (error) {
logger.error(`Failed to publish message to message_queue: ${error.message}`);
throw error;
}
};
// ===== CACHE MANAGEMENT =====