Compare commits
2 Commits
532d08a3eb
...
bb3c97c5bb
| Author | SHA1 | Date | |
|---|---|---|---|
| bb3c97c5bb | |||
| 832226c229 |
113
_opt/pbUtils.js
113
_opt/pbUtils.js
@ -1,4 +1,11 @@
|
||||
// _opt/pbutils.js
|
||||
// Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require)
|
||||
import { createRequire } from 'module';
|
||||
const require = createRequire(import.meta.url);
|
||||
const { EventSource } = require('eventsource');
|
||||
if (typeof global.EventSource === 'undefined') {
|
||||
global.EventSource = EventSource;
|
||||
}
|
||||
|
||||
/**
|
||||
* PocketBase utilities module - extends PocketBase client with useful shortcuts
|
||||
@ -10,25 +17,60 @@
|
||||
* @param {Object} config - Client configuration
|
||||
*/
|
||||
export const init = async (client, config) => {
|
||||
const { pb, logger } = client;
|
||||
const { pb, logger } = client;
|
||||
|
||||
logger.info('Initializing PocketBase utilities module');
|
||||
logger.info('Initializing PocketBase utilities module');
|
||||
|
||||
// Attach utility methods to the pb object
|
||||
extendPocketBase(pb, logger);
|
||||
// Attach utility methods to the pb object
|
||||
extendPocketBase(client, pb, logger);
|
||||
|
||||
// Add connection state handling
|
||||
setupConnectionHandling(pb, logger);
|
||||
// Add connection state handling
|
||||
setupConnectionHandling(pb, logger);
|
||||
|
||||
logger.info('PocketBase utilities module initialized');
|
||||
// Subscribe to real-time message queue events and re-emit via client
|
||||
try {
|
||||
pb.collection('message_queue').subscribe('*', (e) => {
|
||||
client.emit('message_queue_event', e.action, e.record);
|
||||
logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`);
|
||||
});
|
||||
logger.info('Subscribed to PocketBase message_queue realtime events');
|
||||
} catch (error) {
|
||||
logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`);
|
||||
}
|
||||
|
||||
// end of init()
|
||||
|
||||
logger.info('PocketBase utilities module initialized');
|
||||
};
|
||||
|
||||
/**
|
||||
* Register a handler for incoming message_queue pub/sub events.
|
||||
* Other modules can import and use this to react to external messages.
|
||||
* @param {import('discord.js').Client} client - The Discord client
|
||||
* @param {(action: string, record: object) => void} handler - Callback for each event
|
||||
*/
|
||||
export function onMessageQueueEvent(client, handler) {
|
||||
client.on('message_queue_event', (action, record) => {
|
||||
try {
|
||||
handler(action, record);
|
||||
} catch (err) {
|
||||
client.logger.error(`Error in message_queue handler: ${err.message}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Extends the PocketBase instance with utility methods
|
||||
* @param {Object} pb - PocketBase instance
|
||||
* @param {Object} logger - Winston logger
|
||||
*/
|
||||
const extendPocketBase = (pb, logger) => {
|
||||
/**
|
||||
* Adds utility methods to the PocketBase client.
|
||||
* @param {import('discord.js').Client} client - The Discord client
|
||||
* @param {object} pb - The PocketBase instance
|
||||
* @param {object} logger - Logger instance
|
||||
*/
|
||||
const extendPocketBase = (client, pb, logger) => {
|
||||
// ===== COLLECTION OPERATIONS =====
|
||||
|
||||
/**
|
||||
@ -87,7 +129,7 @@ const extendPocketBase = (pb, logger) => {
|
||||
* @param {string} id - Record ID
|
||||
* @returns {Promise<boolean>} Success status
|
||||
*/
|
||||
pb.deleteOne = async (collection, id) => {
|
||||
pb.deleteOne = async (collection, id) => {
|
||||
try {
|
||||
await pb.collection(collection).delete(id);
|
||||
return true;
|
||||
@ -101,6 +143,25 @@ const extendPocketBase = (pb, logger) => {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Convenience: publish a message into the "message_queue" collection,
|
||||
* with source/destination validation.
|
||||
* @param {string} source - origin (client id or 'external')
|
||||
* @param {string} destination - target client id
|
||||
* @param {string} dataType - message type
|
||||
* @param {object} data - JSON-serializable payload
|
||||
* @returns {Promise<Object>} created record
|
||||
*/
|
||||
pb.publishMessage = async (source, destination, dataType, data) => {
|
||||
// Valid sources: all configured clients + 'external'
|
||||
const validSources = client.config.clients.map(c => c.id).concat('external');
|
||||
if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`);
|
||||
// Valid destinations: all configured clients
|
||||
const validDest = client.config.clients.map(c => c.id);
|
||||
if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`);
|
||||
return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) });
|
||||
};
|
||||
|
||||
/**
|
||||
* Upsert - creates or updates a record based on whether it exists
|
||||
* @param {string} collection - Collection name
|
||||
@ -194,7 +255,7 @@ const extendPocketBase = (pb, logger) => {
|
||||
}
|
||||
};
|
||||
|
||||
// ===== BATCH OPERATIONS =====
|
||||
// ===== BATCH OPERATIONS =====
|
||||
|
||||
/**
|
||||
* Perform batch create
|
||||
@ -291,6 +352,38 @@ const extendPocketBase = (pb, logger) => {
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
/**
|
||||
* Delete a message in the "message_queue" collection by its record ID.
|
||||
* @param {string} id - Record ID to delete.
|
||||
* @returns {Promise<boolean>} True if deleted or not found, false on error.
|
||||
*/
|
||||
pb.deleteMessageQueue = async (id) => {
|
||||
return await pb.deleteOne('message_queue', id);
|
||||
};
|
||||
|
||||
// ===== PUB/SUB OPERATIONS =====
|
||||
|
||||
/**
|
||||
* Publish a message into the "message_queue" collection.
|
||||
* @param {string} source - Origin identifier for the message.
|
||||
* @param {string} destination - Target identifier (e.g. channel or client ID).
|
||||
* @param {string} dataType - A short string describing the type of data.
|
||||
* @param {object} data - The payload object to deliver.
|
||||
* @returns {Promise<object>} The created message_queue record.
|
||||
*/
|
||||
pb.publishMessage = async (source, destination, dataType, data) => {
|
||||
try {
|
||||
return await pb.collection('message_queue').create({
|
||||
source,
|
||||
destination,
|
||||
dataType,
|
||||
data: JSON.stringify(data)
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(`Failed to publish message to message_queue: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// ===== CACHE MANAGEMENT =====
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user