Merge pull request 'Added pub/sub support.' (#2) from pb-pubsub into main
Reviewed-on: #2
This commit is contained in:
commit
bb3c97c5bb
@ -1,4 +1,11 @@
|
|||||||
// _opt/pbutils.js
|
// _opt/pbutils.js
|
||||||
|
// Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require)
|
||||||
|
import { createRequire } from 'module';
|
||||||
|
const require = createRequire(import.meta.url);
|
||||||
|
const { EventSource } = require('eventsource');
|
||||||
|
if (typeof global.EventSource === 'undefined') {
|
||||||
|
global.EventSource = EventSource;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* PocketBase utilities module - extends PocketBase client with useful shortcuts
|
* PocketBase utilities module - extends PocketBase client with useful shortcuts
|
||||||
@ -15,20 +22,55 @@ export const init = async (client, config) => {
|
|||||||
logger.info('Initializing PocketBase utilities module');
|
logger.info('Initializing PocketBase utilities module');
|
||||||
|
|
||||||
// Attach utility methods to the pb object
|
// Attach utility methods to the pb object
|
||||||
extendPocketBase(pb, logger);
|
extendPocketBase(client, pb, logger);
|
||||||
|
|
||||||
// Add connection state handling
|
// Add connection state handling
|
||||||
setupConnectionHandling(pb, logger);
|
setupConnectionHandling(pb, logger);
|
||||||
|
|
||||||
|
// Subscribe to real-time message queue events and re-emit via client
|
||||||
|
try {
|
||||||
|
pb.collection('message_queue').subscribe('*', (e) => {
|
||||||
|
client.emit('message_queue_event', e.action, e.record);
|
||||||
|
logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`);
|
||||||
|
});
|
||||||
|
logger.info('Subscribed to PocketBase message_queue realtime events');
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// end of init()
|
||||||
|
|
||||||
logger.info('PocketBase utilities module initialized');
|
logger.info('PocketBase utilities module initialized');
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a handler for incoming message_queue pub/sub events.
|
||||||
|
* Other modules can import and use this to react to external messages.
|
||||||
|
* @param {import('discord.js').Client} client - The Discord client
|
||||||
|
* @param {(action: string, record: object) => void} handler - Callback for each event
|
||||||
|
*/
|
||||||
|
export function onMessageQueueEvent(client, handler) {
|
||||||
|
client.on('message_queue_event', (action, record) => {
|
||||||
|
try {
|
||||||
|
handler(action, record);
|
||||||
|
} catch (err) {
|
||||||
|
client.logger.error(`Error in message_queue handler: ${err.message}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extends the PocketBase instance with utility methods
|
* Extends the PocketBase instance with utility methods
|
||||||
* @param {Object} pb - PocketBase instance
|
* @param {Object} pb - PocketBase instance
|
||||||
* @param {Object} logger - Winston logger
|
* @param {Object} logger - Winston logger
|
||||||
*/
|
*/
|
||||||
const extendPocketBase = (pb, logger) => {
|
/**
|
||||||
|
* Adds utility methods to the PocketBase client.
|
||||||
|
* @param {import('discord.js').Client} client - The Discord client
|
||||||
|
* @param {object} pb - The PocketBase instance
|
||||||
|
* @param {object} logger - Logger instance
|
||||||
|
*/
|
||||||
|
const extendPocketBase = (client, pb, logger) => {
|
||||||
// ===== COLLECTION OPERATIONS =====
|
// ===== COLLECTION OPERATIONS =====
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -101,6 +143,25 @@ const extendPocketBase = (pb, logger) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convenience: publish a message into the "message_queue" collection,
|
||||||
|
* with source/destination validation.
|
||||||
|
* @param {string} source - origin (client id or 'external')
|
||||||
|
* @param {string} destination - target client id
|
||||||
|
* @param {string} dataType - message type
|
||||||
|
* @param {object} data - JSON-serializable payload
|
||||||
|
* @returns {Promise<Object>} created record
|
||||||
|
*/
|
||||||
|
pb.publishMessage = async (source, destination, dataType, data) => {
|
||||||
|
// Valid sources: all configured clients + 'external'
|
||||||
|
const validSources = client.config.clients.map(c => c.id).concat('external');
|
||||||
|
if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`);
|
||||||
|
// Valid destinations: all configured clients
|
||||||
|
const validDest = client.config.clients.map(c => c.id);
|
||||||
|
if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`);
|
||||||
|
return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) });
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upsert - creates or updates a record based on whether it exists
|
* Upsert - creates or updates a record based on whether it exists
|
||||||
* @param {string} collection - Collection name
|
* @param {string} collection - Collection name
|
||||||
@ -291,6 +352,38 @@ const extendPocketBase = (pb, logger) => {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
/**
|
||||||
|
* Delete a message in the "message_queue" collection by its record ID.
|
||||||
|
* @param {string} id - Record ID to delete.
|
||||||
|
* @returns {Promise<boolean>} True if deleted or not found, false on error.
|
||||||
|
*/
|
||||||
|
pb.deleteMessageQueue = async (id) => {
|
||||||
|
return await pb.deleteOne('message_queue', id);
|
||||||
|
};
|
||||||
|
|
||||||
|
// ===== PUB/SUB OPERATIONS =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a message into the "message_queue" collection.
|
||||||
|
* @param {string} source - Origin identifier for the message.
|
||||||
|
* @param {string} destination - Target identifier (e.g. channel or client ID).
|
||||||
|
* @param {string} dataType - A short string describing the type of data.
|
||||||
|
* @param {object} data - The payload object to deliver.
|
||||||
|
* @returns {Promise<object>} The created message_queue record.
|
||||||
|
*/
|
||||||
|
pb.publishMessage = async (source, destination, dataType, data) => {
|
||||||
|
try {
|
||||||
|
return await pb.collection('message_queue').create({
|
||||||
|
source,
|
||||||
|
destination,
|
||||||
|
dataType,
|
||||||
|
data: JSON.stringify(data)
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to publish message to message_queue: ${error.message}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// ===== CACHE MANAGEMENT =====
|
// ===== CACHE MANAGEMENT =====
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user