ClientX/_opt/pbUtils.js
2025-05-01 16:57:03 +00:00

566 lines
16 KiB
JavaScript

// _opt/pbutils.js
// Polyfill global EventSource for PocketBase realtime in Node.js (using CommonJS require)
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const { EventSource } = require('eventsource');
if (typeof global.EventSource === 'undefined') {
global.EventSource = EventSource;
}
/**
* PocketBase utilities module - extends PocketBase client with useful shortcuts
*/
/**
* Initializes the PocketBase utilities module
* @param {Object} client - Discord client with attached PocketBase instance
* @param {Object} config - Client configuration
*/
export const init = async (client, config) => {
const { pb, logger } = client;
logger.info('Initializing PocketBase utilities module');
// Attach utility methods to the pb object
extendPocketBase(client, pb, logger);
// Add connection state handling
setupConnectionHandling(pb, logger);
// Subscribe to real-time message queue events and re-emit via client
try {
pb.collection('message_queue').subscribe('*', (e) => {
client.emit('message_queue_event', e.action, e.record);
logger.debug(`PubSub event: ${e.action} on message_queue: ${JSON.stringify(e.record)}`);
});
logger.info('Subscribed to PocketBase message_queue realtime events');
} catch (error) {
logger.error(`Failed to subscribe to message_queue realtime: ${error.message}`);
}
// end of init()
logger.info('PocketBase utilities module initialized');
};
/**
* Register a handler for incoming message_queue pub/sub events.
* Other modules can import and use this to react to external messages.
* @param {import('discord.js').Client} client - The Discord client
* @param {(action: string, record: object) => void} handler - Callback for each event
*/
export function onMessageQueueEvent(client, handler) {
client.on('message_queue_event', (action, record) => {
try {
handler(action, record);
} catch (err) {
client.logger.error(`Error in message_queue handler: ${err.message}`);
}
});
}
/**
* Extends the PocketBase instance with utility methods
* @param {Object} pb - PocketBase instance
* @param {Object} logger - Winston logger
*/
/**
* Adds utility methods to the PocketBase client.
* @param {import('discord.js').Client} client - The Discord client
* @param {object} pb - The PocketBase instance
* @param {object} logger - Logger instance
*/
const extendPocketBase = (client, pb, logger) => {
// ===== COLLECTION OPERATIONS =====
/**
* Get a single record with better error handling
* @param {string} collection - Collection name
* @param {string} id - Record ID
* @param {Object} options - Additional options
* @returns {Promise<Object>} The record or null
*/
pb.getOne = async (collection, id, options = {}) => {
try {
return await pb.collection(collection).getOne(id, options);
} catch (error) {
if (error.status === 404) {
return null;
}
logger.error(`Failed to get record ${id} from ${collection}: ${error.message}`);
throw error;
}
};
/**
* Creates a record with validation and error handling
* @param {string} collection - Collection name
* @param {Object} data - Record data
* @returns {Promise<Object>} Created record
*/
pb.createOne = async (collection, data) => {
try {
return await pb.collection(collection).create(data);
} catch (error) {
logger.error(`Failed to create record in ${collection}: ${error.message}`);
throw error;
}
};
/**
* Updates a record with better error handling
* @param {string} collection - Collection name
* @param {string} id - Record ID
* @param {Object} data - Record data
* @returns {Promise<Object>} Updated record
*/
pb.updateOne = async (collection, id, data) => {
try {
return await pb.collection(collection).update(id, data);
} catch (error) {
logger.error(`Failed to update record ${id} in ${collection}: ${error.message}`);
throw error;
}
};
/**
* Deletes a record with better error handling
* @param {string} collection - Collection name
* @param {string} id - Record ID
* @returns {Promise<boolean>} Success status
*/
pb.deleteOne = async (collection, id) => {
try {
await pb.collection(collection).delete(id);
return true;
} catch (error) {
if (error.status === 404) {
logger.warn(`Record ${id} not found in ${collection} for deletion`);
return false;
}
logger.error(`Failed to delete record ${id} from ${collection}: ${error.message}`);
throw error;
}
};
/**
* Convenience: publish a message into the "message_queue" collection,
* with source/destination validation.
* @param {string} source - origin (client id or 'external')
* @param {string} destination - target client id
* @param {string} dataType - message type
* @param {object} data - JSON-serializable payload
* @returns {Promise<Object>} created record
*/
pb.publishMessage = async (source, destination, dataType, data) => {
// Valid sources: all configured clients + 'external'
const validSources = client.config.clients.map(c => c.id).concat('external');
if (!validSources.includes(source)) throw new Error(`Invalid message source: ${source}`);
// Valid destinations: all configured clients
const validDest = client.config.clients.map(c => c.id);
if (!validDest.includes(destination)) throw new Error(`Invalid message destination: ${destination}`);
return await pb.collection('message_queue').create({ source, destination, dataType, data: JSON.stringify(data) });
};
/**
* Upsert - creates or updates a record based on whether it exists
* @param {string} collection - Collection name
* @param {string} id - Record ID or null for new record
* @param {Object} data - Record data
* @returns {Promise<Object>} Created/updated record
*/
pb.upsert = async (collection, id, data) => {
if (id) {
const exists = await pb.getOne(collection, id);
if (exists) {
return await pb.updateOne(collection, id, data);
}
}
return await pb.createOne(collection, data);
};
// ===== QUERY SHORTCUTS =====
/**
* Get first record matching a filter
* @param {string} collection - Collection name
* @param {string} filter - Filter query
* @param {Object} options - Additional options
* @returns {Promise<Object>} First matching record or null
*/
pb.getFirst = async (collection, filter, options = {}) => {
try {
const result = await pb.collection(collection).getList(1, 1, {
filter,
...options
});
return result.items.length > 0 ? result.items[0] : null;
} catch (error) {
if (error.status === 404) {
return null;
}
logger.error(`Failed to get first record from ${collection}: ${error.message}`);
throw error;
}
};
/**
* Get all records from a collection (handles pagination)
* @param {string} collection - Collection name
* @param {Object} options - Query options
* @returns {Promise<Array>} Array of records
*/
pb.getAll = async (collection, options = {}) => {
const records = [];
const pageSize = options.pageSize || 200;
let page = 1;
try {
while (true) {
const result = await pb.collection(collection).getList(page, pageSize, options);
records.push(...result.items);
if (records.length >= result.totalItems) {
break;
}
page++;
}
return records;
} catch (error) {
logger.error(`Failed to get all records from ${collection}: ${error.message}`);
throw error;
}
};
/**
* Count records matching a filter
* @param {string} collection - Collection name
* @param {string} filter - Filter query
* @returns {Promise<number>} Count of matching records
*/
pb.count = async (collection, filter = '') => {
try {
const result = await pb.collection(collection).getList(1, 1, {
filter,
fields: 'id'
});
return result.totalItems;
} catch (error) {
logger.error(`Failed to count records in ${collection}: ${error.message}`);
throw error;
}
};
// ===== BATCH OPERATIONS =====
/**
* Perform batch create
* @param {string} collection - Collection name
* @param {Array<Object>} items - Array of items to create
* @returns {Promise<Array>} Created records
*/
pb.batchCreate = async (collection, items) => {
if (!items || items.length === 0) {
return [];
}
const results = [];
try {
// Process in chunks to avoid rate limits
const chunkSize = 50;
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
const promises = chunk.map(item => pb.createOne(collection, item));
const chunkResults = await Promise.all(promises);
results.push(...chunkResults);
}
return results;
} catch (error) {
logger.error(`Failed batch create in ${collection}: ${error.message}`);
throw error;
}
};
/**
* Perform batch update
* @param {string} collection - Collection name
* @param {Array<Object>} items - Array of items with id field
* @returns {Promise<Array>} Updated records
*/
pb.batchUpdate = async (collection, items) => {
if (!items || items.length === 0) {
return [];
}
const results = [];
try {
// Process in chunks to avoid rate limits
const chunkSize = 50;
for (let i = 0; i < items.length; i += chunkSize) {
const chunk = items.slice(i, i + chunkSize);
const promises = chunk.map(item => {
const { id, ...data } = item;
return pb.updateOne(collection, id, data);
});
const chunkResults = await Promise.all(promises);
results.push(...chunkResults);
}
return results;
} catch (error) {
logger.error(`Failed batch update in ${collection}: ${error.message}`);
throw error;
}
};
/**
* Perform batch delete
* @param {string} collection - Collection name
* @param {Array<string>} ids - Array of record IDs to delete
* @returns {Promise<Array>} Results of deletion operations
*/
pb.batchDelete = async (collection, ids) => {
if (!ids || ids.length === 0) {
return [];
}
const results = [];
try {
// Process in chunks to avoid rate limits
const chunkSize = 50;
for (let i = 0; i < ids.length; i += chunkSize) {
const chunk = ids.slice(i, i + chunkSize);
const promises = chunk.map(id => pb.deleteOne(collection, id));
const chunkResults = await Promise.all(promises);
results.push(...chunkResults);
}
return results;
} catch (error) {
logger.error(`Failed batch delete in ${collection}: ${error.message}`);
throw error;
}
};
/**
* Delete a message in the "message_queue" collection by its record ID.
* @param {string} id - Record ID to delete.
* @returns {Promise<boolean>} True if deleted or not found, false on error.
*/
pb.deleteMessageQueue = async (id) => {
return await pb.deleteOne('message_queue', id);
};
// ===== PUB/SUB OPERATIONS =====
/**
* Publish a message into the "message_queue" collection.
* @param {string} source - Origin identifier for the message.
* @param {string} destination - Target identifier (e.g. channel or client ID).
* @param {string} dataType - A short string describing the type of data.
* @param {object} data - The payload object to deliver.
* @returns {Promise<object>} The created message_queue record.
*/
pb.publishMessage = async (source, destination, dataType, data) => {
try {
return await pb.collection('message_queue').create({
source,
destination,
dataType,
data: JSON.stringify(data)
});
} catch (error) {
logger.error(`Failed to publish message to message_queue: ${error.message}`);
throw error;
}
};
// ===== CACHE MANAGEMENT =====
// Simple in-memory cache
pb.cache = {
_store: new Map(),
_ttls: new Map(),
/**
* Get a value from cache
* @param {string} key - Cache key
* @returns {*} Cached value or undefined
*/
get(key) {
if (this._ttls.has(key) && this._ttls.get(key) < Date.now()) {
this.delete(key);
return undefined;
}
return this._store.get(key);
},
/**
* Set a value in cache
* @param {string} key - Cache key
* @param {*} value - Value to store
* @param {number} ttlSeconds - Time to live in seconds
*/
set(key, value, ttlSeconds = 300) {
this._store.set(key, value);
if (ttlSeconds > 0) {
this._ttls.set(key, Date.now() + (ttlSeconds * 1000));
}
},
/**
* Delete a value from cache
* @param {string} key - Cache key
*/
delete(key) {
this._store.delete(key);
this._ttls.delete(key);
},
/**
* Clear all cache
*/
clear() {
this._store.clear();
this._ttls.clear();
}
};
/**
* Get a record with caching
* @param {string} collection - Collection name
* @param {string} id - Record ID
* @param {number} ttlSeconds - Cache TTL in seconds
* @returns {Promise<Object>} Record or null
*/
pb.getCached = async (collection, id, ttlSeconds = 60) => {
const cacheKey = `${collection}:${id}`;
const cached = pb.cache.get(cacheKey);
if (cached !== undefined) {
return cached;
}
const record = await pb.getOne(collection, id);
pb.cache.set(cacheKey, record, ttlSeconds);
return record;
};
/**
* Get list with caching
* @param {string} collection - Collection name
* @param {Object} options - Query options
* @param {number} ttlSeconds - Cache TTL in seconds
* @returns {Promise<Object>} List result
*/
pb.getListCached = async (collection, options = {}, ttlSeconds = 30) => {
const cacheKey = `${collection}:list:${JSON.stringify(options)}`;
const cached = pb.cache.get(cacheKey);
if (cached !== undefined) {
return cached;
}
const { page = 1, perPage = 50, ...restOptions } = options;
const result = await pb.collection(collection).getList(page, perPage, restOptions);
pb.cache.set(cacheKey, result, ttlSeconds);
return result;
};
};
/**
* Setup connection state handling
* @param {Object} pb - PocketBase instance
* @param {Object} logger - Winston logger
*/
const setupConnectionHandling = (pb, logger) => {
// Add connection state tracking
pb.isConnected = true;
pb.lastSuccessfulAuth = null;
// Add auto-reconnect and token refresh
pb.authStore.onChange(() => {
pb.isConnected = pb.authStore.isValid;
if (pb.isConnected) {
pb.lastSuccessfulAuth = new Date();
logger.info('PocketBase authentication successful');
} else {
logger.warn('PocketBase auth token expired or invalid');
}
});
// Helper to check health and reconnect if needed
pb.ensureConnection = async () => {
if (!pb.isConnected || !pb.authStore.isValid) {
try {
logger.info('Reconnecting to PocketBase...');
// Attempt to refresh the auth if we have a refresh token
if (pb.authStore.token && pb.authStore.model?.id) {
await pb.admins.authRefresh();
} else if (pb._config.username && pb._config.password) {
// Fall back to full re-authentication if credentials available
await pb.admins.authWithPassword(
pb._config.username,
pb._config.password
);
} else {
logger.error('No credentials available to reconnect PocketBase');
pb.isConnected = false;
return false;
}
pb.isConnected = true;
pb.lastSuccessfulAuth = new Date();
logger.info('Successfully reconnected to PocketBase');
return true;
} catch (error) {
logger.error(`Failed to reconnect to PocketBase: ${error.message}`);
pb.isConnected = false;
return false;
}
}
return true;
};
// Store credentials for reconnection
pb._config = pb._config || {};
// Ensure only if env provided
if (process.env.SHARED_POCKETBASE_USERNAME && process.env.SHARED_POCKETBASE_PASSWORD) {
pb._config.username = process.env.SHARED_POCKETBASE_USERNAME;
pb._config.password = process.env.SHARED_POCKETBASE_PASSWORD;
}
// Heartbeat function to check connection periodically
const heartbeatInterval = setInterval(async () => {
try {
// Simple health check
await pb.health.check();
pb.isConnected = true;
} catch (error) {
logger.warn(`PocketBase connection issue: ${error.message}`);
pb.isConnected = false;
await pb.ensureConnection();
}
}, 5 * 60 * 1000); // Check every 5 minutes
// Clean up on client disconnect
pb.cleanup = () => {
clearInterval(heartbeatInterval);
};
};