// _opt/pbutils.js // Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require) import { createRequire } from 'module'; const require = createRequire(import.meta.url); const { EventSource } = require('eventsource'); if (typeof global.EventSource === 'undefined') { global.EventSource = EventSource; } /** * PocketBase utilities module - extends PocketBase client with useful shortcuts */ /** * Initializes the PocketBase utilities module * @param {Object} client - Discord client with attached PocketBase instance * @param {Object} config - Client configuration */ export const init = async (client, config) => { const { pb, logger } = client; logger.info('Initializing PocketBase utilities module'); // Attach utility methods to the pb object extendPocketBase(client, pb, logger); // Add connection state handling setupConnectionHandling(pb, logger); // Subscribe to real-time message queue events and re-emit via client try { pb.collection('message_queue').subscribe('*', (e) => { client.emit('message_queue_event', e.action, e.record); logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`); }); logger.info('Subscribed to PocketBase message_queue realtime events'); } catch (error) { logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`); } // end of init() logger.info('PocketBase utilities module initialized'); }; /** * Register a handler for incoming message_queue pub/sub events. * Other modules can import and use this to react to external messages. * @param {import('discord.js').Client} client - The Discord client * @param {(action: string, record: object) => void} handler - Callback for each event */ export function onMessageQueueEvent(client, handler) { client.on('message_queue_event', (action, record) => { try { handler(action, record); } catch (err) { client.logger.error(`Error in message_queue handler: ${err.message}`); } }); } /** * Extends the PocketBase instance with utility methods * @param {Object} pb - PocketBase instance * @param {Object} logger - Winston logger */ /** * Adds utility methods to the PocketBase client. * @param {import('discord.js').Client} client - The Discord client * @param {object} pb - The PocketBase instance * @param {object} logger - Logger instance */ const extendPocketBase = (client, pb, logger) => { // ===== COLLECTION OPERATIONS ===== /** * Get a single record with better error handling * @param {string} collection - Collection name * @param {string} id - Record ID * @param {Object} options - Additional options * @returns {Promise} The record or null */ pb.getOne = async (collection, id, options = {}) => { try { return await pb.collection(collection).getOne(id, options); } catch (error) { if (error.status === 404) { return null; } logger.error(`Failed to get record ${id} from ${collection}: ${error.message}`); throw error; } }; /** * Creates a record with validation and error handling * @param {string} collection - Collection name * @param {Object} data - Record data * @returns {Promise} Created record */ pb.createOne = async (collection, data) => { try { return await pb.collection(collection).create(data); } catch (error) { logger.error(`Failed to create record in ${collection}: ${error.message}`); throw error; } }; /** * Updates a record with better error handling * @param {string} collection - Collection name * @param {string} id - Record ID * @param {Object} data - Record data * @returns {Promise} Updated record */ pb.updateOne = async (collection, id, data) => { try { return await pb.collection(collection).update(id, data); } catch (error) { logger.error(`Failed to update record ${id} in ${collection}: ${error.message}`); throw error; } }; /** * Deletes a record with better error handling * @param {string} collection - Collection name * @param {string} id - Record ID * @returns {Promise} Success status */ pb.deleteOne = async (collection, id) => { try { await pb.collection(collection).delete(id); return true; } catch (error) { if (error.status === 404) { logger.warn(`Record ${id} not found in ${collection} for deletion`); return false; } logger.error(`Failed to delete record ${id} from ${collection}: ${error.message}`); throw error; } }; /** * Convenience: publish a message into the "message_queue" collection, * with source/destination validation. * @param {string} source - origin (client id or 'external') * @param {string} destination - target client id * @param {string} dataType - message type * @param {object} data - JSON-serializable payload * @returns {Promise} created record */ pb.publishMessage = async (source, destination, dataType, data) => { // Valid sources: all configured clients + 'external' const validSources = client.config.clients.map(c => c.id).concat('external'); if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`); // Valid destinations: all configured clients const validDest = client.config.clients.map(c => c.id); if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`); return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) }); }; /** * Upsert - creates or updates a record based on whether it exists * @param {string} collection - Collection name * @param {string} id - Record ID or null for new record * @param {Object} data - Record data * @returns {Promise} Created/updated record */ pb.upsert = async (collection, id, data) => { if (id) { const exists = await pb.getOne(collection, id); if (exists) { return await pb.updateOne(collection, id, data); } } return await pb.createOne(collection, data); }; // ===== QUERY SHORTCUTS ===== /** * Get first record matching a filter * @param {string} collection - Collection name * @param {string} filter - Filter query * @param {Object} options - Additional options * @returns {Promise} First matching record or null */ pb.getFirst = async (collection, filter, options = {}) => { try { const result = await pb.collection(collection).getList(1, 1, { filter, ...options }); return result.items.length > 0 ? result.items[0] : null; } catch (error) { if (error.status === 404) { return null; } logger.error(`Failed to get first record from ${collection}: ${error.message}`); throw error; } }; /** * Get all records from a collection (handles pagination) * @param {string} collection - Collection name * @param {Object} options - Query options * @returns {Promise} Array of records */ pb.getAll = async (collection, options = {}) => { const records = []; const pageSize = options.pageSize || 200; let page = 1; try { while (true) { const result = await pb.collection(collection).getList(page, pageSize, options); records.push(...result.items); if (records.length >= result.totalItems) { break; } page++; } return records; } catch (error) { logger.error(`Failed to get all records from ${collection}: ${error.message}`); throw error; } }; /** * Count records matching a filter * @param {string} collection - Collection name * @param {string} filter - Filter query * @returns {Promise} Count of matching records */ pb.count = async (collection, filter = '') => { try { const result = await pb.collection(collection).getList(1, 1, { filter, fields: 'id' }); return result.totalItems; } catch (error) { logger.error(`Failed to count records in ${collection}: ${error.message}`); throw error; } }; // ===== BATCH OPERATIONS ===== /** * Perform batch create * @param {string} collection - Collection name * @param {Array} items - Array of items to create * @returns {Promise} Created records */ pb.batchCreate = async (collection, items) => { if (!items || items.length === 0) { return []; } const results = []; try { // Process in chunks to avoid rate limits const chunkSize = 50; for (let i = 0; i < items.length; i += chunkSize) { const chunk = items.slice(i, i + chunkSize); const promises = chunk.map(item => pb.createOne(collection, item)); const chunkResults = await Promise.all(promises); results.push(...chunkResults); } return results; } catch (error) { logger.error(`Failed batch create in ${collection}: ${error.message}`); throw error; } }; /** * Perform batch update * @param {string} collection - Collection name * @param {Array} items - Array of items with id field * @returns {Promise} Updated records */ pb.batchUpdate = async (collection, items) => { if (!items || items.length === 0) { return []; } const results = []; try { // Process in chunks to avoid rate limits const chunkSize = 50; for (let i = 0; i < items.length; i += chunkSize) { const chunk = items.slice(i, i + chunkSize); const promises = chunk.map(item => { const { id, ...data } = item; return pb.updateOne(collection, id, data); }); const chunkResults = await Promise.all(promises); results.push(...chunkResults); } return results; } catch (error) { logger.error(`Failed batch update in ${collection}: ${error.message}`); throw error; } }; /** * Perform batch delete * @param {string} collection - Collection name * @param {Array} ids - Array of record IDs to delete * @returns {Promise} Results of deletion operations */ pb.batchDelete = async (collection, ids) => { if (!ids || ids.length === 0) { return []; } const results = []; try { // Process in chunks to avoid rate limits const chunkSize = 50; for (let i = 0; i < ids.length; i += chunkSize) { const chunk = ids.slice(i, i + chunkSize); const promises = chunk.map(id => pb.deleteOne(collection, id)); const chunkResults = await Promise.all(promises); results.push(...chunkResults); } return results; } catch (error) { logger.error(`Failed batch delete in ${collection}: ${error.message}`); throw error; } }; /** * Delete a message in the "message_queue" collection by its record ID. * @param {string} id - Record ID to delete. * @returns {Promise} True if deleted or not found, false on error. */ pb.deleteMessageQueue = async (id) => { return await pb.deleteOne('message_queue', id); }; // ===== PUB/SUB OPERATIONS ===== /** * Publish a message into the "message_queue" collection. * @param {string} source - Origin identifier for the message. * @param {string} destination - Target identifier (e.g. channel or client ID). * @param {string} dataType - A short string describing the type of data. * @param {object} data - The payload object to deliver. * @returns {Promise} The created message_queue record. */ pb.publishMessage = async (source, destination, dataType, data) => { try { return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) }); } catch (error) { logger.error(`Failed to publish message to message_queue: ${error.message}`); throw error; } }; // ===== CACHE MANAGEMENT ===== // Simple in-memory cache pb.cache = { _store: new Map(), _ttls: new Map(), /** * Get a value from cache * @param {string} key - Cache key * @returns {*} Cached value or undefined */ get(key) { if (this._ttls.has(key) && this._ttls.get(key) < Date.now()) { this.delete(key); return undefined; } return this._store.get(key); }, /** * Set a value in cache * @param {string} key - Cache key * @param {*} value - Value to store * @param {number} ttlSeconds - Time to live in seconds */ set(key, value, ttlSeconds = 300) { this._store.set(key, value); if (ttlSeconds > 0) { this._ttls.set(key, Date.now() + (ttlSeconds * 1000)); } }, /** * Delete a value from cache * @param {string} key - Cache key */ delete(key) { this._store.delete(key); this._ttls.delete(key); }, /** * Clear all cache */ clear() { this._store.clear(); this._ttls.clear(); } }; /** * Get a record with caching * @param {string} collection - Collection name * @param {string} id - Record ID * @param {number} ttlSeconds - Cache TTL in seconds * @returns {Promise} Record or null */ pb.getCached = async (collection, id, ttlSeconds = 60) => { const cacheKey = `${collection}:${id}`; const cached = pb.cache.get(cacheKey); if (cached !== undefined) { return cached; } const record = await pb.getOne(collection, id); pb.cache.set(cacheKey, record, ttlSeconds); return record; }; /** * Get list with caching * @param {string} collection - Collection name * @param {Object} options - Query options * @param {number} ttlSeconds - Cache TTL in seconds * @returns {Promise} List result */ pb.getListCached = async (collection, options = {}, ttlSeconds = 30) => { const cacheKey = `${collection}:list:${JSON.stringify(options)}`; const cached = pb.cache.get(cacheKey); if (cached !== undefined) { return cached; } const { page = 1, perPage = 50, ...restOptions } = options; const result = await pb.collection(collection).getList(page, perPage, restOptions); pb.cache.set(cacheKey, result, ttlSeconds); return result; }; }; /** * Setup connection state handling * @param {Object} pb - PocketBase instance * @param {Object} logger - Winston logger */ const setupConnectionHandling = (pb, logger) => { // Add connection state tracking pb.isConnected = true; pb.lastSuccessfulAuth = null; // Add auto-reconnect and token refresh pb.authStore.onChange(() => { pb.isConnected = pb.authStore.isValid; if (pb.isConnected) { pb.lastSuccessfulAuth = new Date(); logger.info('PocketBase authentication successful'); } else { logger.warn('PocketBase auth token expired or invalid'); } }); // Helper to check health and reconnect if needed pb.ensureConnection = async () => { if (!pb.isConnected || !pb.authStore.isValid) { try { logger.info('Reconnecting to PocketBase...'); // Attempt to refresh the auth if we have a refresh token if (pb.authStore.token && pb.authStore.model?.id) { await pb.admins.authRefresh(); } else if (pb._config.username && pb._config.password) { // Fall back to full re-authentication if credentials available await pb.admins.authWithPassword( pb._config.username, pb._config.password ); } else { logger.error('No credentials available to reconnect PocketBase'); pb.isConnected = false; return false; } pb.isConnected = true; pb.lastSuccessfulAuth = new Date(); logger.info('Successfully reconnected to PocketBase'); return true; } catch (error) { logger.error(`Failed to reconnect to PocketBase: ${error.message}`); pb.isConnected = false; return false; } } return true; }; // Store credentials for reconnection pb._config = pb._config || {}; // Ensure only if env provided if (process.env.SHARED_POCKETBASE_USERNAME && process.env.SHARED_POCKETBASE_PASSWORD) { pb._config.username = process.env.SHARED_POCKETBASE_USERNAME; pb._config.password = process.env.SHARED_POCKETBASE_PASSWORD; } // Heartbeat function to check connection periodically const heartbeatInterval = setInterval(async () => { try { // Simple health check await pb.health.check(); pb.isConnected = true; } catch (error) { logger.warn(`PocketBase connection issue: ${error.message}`); pb.isConnected = false; await pb.ensureConnection(); } }, 5 * 60 * 1000); // Check every 5 minutes // Clean up on client disconnect pb.cleanup = () => { clearInterval(heartbeatInterval); }; };