diff --git a/package-lock.json b/package-lock.json index f97c5ad4..07674a00 100644 --- a/package-lock.json +++ b/package-lock.json @@ -58,6 +58,7 @@ "http-errors": "2.0.0", "i": "^0.3.7", "input-otp": "1.4.2", + "ioredis": "^5.6.1", "jmespath": "^0.16.0", "js-yaml": "4.1.0", "jsonwebtoken": "^9.0.2", @@ -1971,6 +1972,12 @@ "url": "https://opencollective.com/libvips" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "license": "MIT" + }, "node_modules/@isaacs/balanced-match": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@isaacs/balanced-match/-/balanced-match-4.0.1.tgz", @@ -6254,6 +6261,15 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/cmdk": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/cmdk/-/cmdk-1.1.1.tgz", @@ -6691,6 +6707,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -8853,6 +8878,30 @@ "tslib": "^2.8.0" } }, + "node_modules/ioredis": { + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.6.1.tgz", + "integrity": "sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -9810,12 +9859,24 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==", "license": "MIT" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -14402,6 +14463,27 @@ "node": ">=0.8.8" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.10.tgz", @@ -15173,6 +15255,12 @@ "node": "*" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/package.json b/package.json index 040a5453..02d26197 100644 --- a/package.json +++ b/package.json @@ -75,6 +75,7 @@ "http-errors": "2.0.0", "i": "^0.3.7", "input-otp": "1.4.2", + "ioredis": "^5.6.1", "jmespath": "^0.16.0", "js-yaml": "4.1.0", "jsonwebtoken": "^9.0.2", diff --git a/server/db/redis.ts b/server/db/redis.ts index bae80099..80f1e690 100644 --- a/server/db/redis.ts +++ b/server/db/redis.ts @@ -1,316 +1,333 @@ -import Redis from 'ioredis'; -import logger from '@server/logger'; - -interface RedisConfig { - host: string; - port: number; - password?: string; - db?: number; - retryDelayOnFailover?: number; - maxRetriesPerRequest?: number; -} +import Redis, { RedisOptions } from "ioredis"; +import logger from "@server/logger"; +import config from "@server/lib/config"; class RedisManager { - private static instance: RedisManager; - private client: Redis | null = null; - private subscriber: Redis | null = null; - private publisher: Redis | null = null; - private isEnabled: boolean = false; - private subscribers: Map void>> = new Map(); + private static instance: RedisManager; + private client: Redis | null = null; + private subscriber: Redis | null = null; + private publisher: Redis | null = null; + private isEnabled: boolean = false; + private subscribers: Map< + string, + Set<(channel: string, message: string) => void> + > = new Map(); - private constructor() { - this.isEnabled = !!process.env.REDIS; - if (this.isEnabled) { - this.initializeClients(); + private constructor() { + this.isEnabled = config.getRawConfig().redis?.enabled || false; + if (this.isEnabled) { + this.initializeClients(); + } } - } - public static getInstance(): RedisManager { - if (!RedisManager.instance) { - RedisManager.instance = new RedisManager(); + public static getInstance(): RedisManager { + if (!RedisManager.instance) { + RedisManager.instance = new RedisManager(); + } + return RedisManager.instance; } - return RedisManager.instance; - } - private getRedisConfig(): RedisConfig { - return { - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - password: process.env.REDIS_PASSWORD, - db: parseInt(process.env.REDIS_DB || '0'), - retryDelayOnFailover: 100, - maxRetriesPerRequest: 3, - }; - } + private getRedisConfig(): RedisOptions { + const redisConfig = config.getRawConfig().redis!; + const opts: RedisOptions = { + host: redisConfig.host!, + port: redisConfig.port!, + password: redisConfig.password, + db: redisConfig.db, + tls: { + rejectUnauthorized: false + }, + }; + return opts; + } - private initializeClients(): void { - const config = this.getRedisConfig(); + private initializeClients(): void { + const config = this.getRedisConfig(); - try { - // Main client for general operations - this.client = new Redis(config); - - // Dedicated publisher client - this.publisher = new Redis(config); - - // Dedicated subscriber client - this.subscriber = new Redis(config); + try { + // Main client for general operations + this.client = new Redis(config); - // Set up error handlers - this.client.on('error', (err) => { - logger.error('Redis client error:', err); - }); + // Dedicated publisher client + this.publisher = new Redis(config); - this.publisher.on('error', (err) => { - logger.error('Redis publisher error:', err); - }); + // Dedicated subscriber client + this.subscriber = new Redis(config); - this.subscriber.on('error', (err) => { - logger.error('Redis subscriber error:', err); - }); + // Set up error handlers + this.client.on("error", (err) => { + logger.error("Redis client error:", err); + }); - // Set up connection handlers - this.client.on('connect', () => { - logger.info('Redis client connected'); - }); + this.publisher.on("error", (err) => { + logger.error("Redis publisher error:", err); + }); - this.publisher.on('connect', () => { - logger.info('Redis publisher connected'); - }); + this.subscriber.on("error", (err) => { + logger.error("Redis subscriber error:", err); + }); - this.subscriber.on('connect', () => { - logger.info('Redis subscriber connected'); - }); + // Set up connection handlers + this.client.on("connect", () => { + logger.info("Redis client connected"); + }); - // Set up message handler for subscriber - this.subscriber.on('message', (channel: string, message: string) => { - const channelSubscribers = this.subscribers.get(channel); - if (channelSubscribers) { - channelSubscribers.forEach(callback => { - try { - callback(channel, message); - } catch (error) { - logger.error(`Error in subscriber callback for channel ${channel}:`, error); + this.publisher.on("connect", () => { + logger.info("Redis publisher connected"); + }); + + this.subscriber.on("connect", () => { + logger.info("Redis subscriber connected"); + }); + + // Set up message handler for subscriber + this.subscriber.on( + "message", + (channel: string, message: string) => { + const channelSubscribers = this.subscribers.get(channel); + if (channelSubscribers) { + channelSubscribers.forEach((callback) => { + try { + callback(channel, message); + } catch (error) { + logger.error( + `Error in subscriber callback for channel ${channel}:`, + error + ); + } + }); + } + } + ); + + logger.info("Redis clients initialized successfully"); + } catch (error) { + logger.error("Failed to initialize Redis clients:", error); + this.isEnabled = false; + } + } + + public isRedisEnabled(): boolean { + return this.isEnabled && this.client !== null; + } + + public getClient(): Redis | null { + return this.client; + } + + public async set( + key: string, + value: string, + ttl?: number + ): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + if (ttl) { + await this.client.setex(key, ttl, value); + } else { + await this.client.set(key, value); } - }); + return true; + } catch (error) { + logger.error("Redis SET error:", error); + return false; } - }); - - logger.info('Redis clients initialized successfully'); - } catch (error) { - logger.error('Failed to initialize Redis clients:', error); - this.isEnabled = false; } - } - public isRedisEnabled(): boolean { - return this.isEnabled && this.client !== null; - } + public async get(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; - public getClient(): Redis | null { - return this.client; - } - - public async set(key: string, value: string, ttl?: number): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - if (ttl) { - await this.client.setex(key, ttl, value); - } else { - await this.client.set(key, value); - } - return true; - } catch (error) { - logger.error('Redis SET error:', error); - return false; - } - } - - public async get(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.get(key); - } catch (error) { - logger.error('Redis GET error:', error); - return null; - } - } - - public async del(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.del(key); - return true; - } catch (error) { - logger.error('Redis DEL error:', error); - return false; - } - } - - public async sadd(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.sadd(key, member); - return true; - } catch (error) { - logger.error('Redis SADD error:', error); - return false; - } - } - - public async srem(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.srem(key, member); - return true; - } catch (error) { - logger.error('Redis SREM error:', error); - return false; - } - } - - public async smembers(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return []; - - try { - return await this.client.smembers(key); - } catch (error) { - logger.error('Redis SMEMBERS error:', error); - return []; - } - } - - public async hset(key: string, field: string, value: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hset(key, field, value); - return true; - } catch (error) { - logger.error('Redis HSET error:', error); - return false; - } - } - - public async hget(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.hget(key, field); - } catch (error) { - logger.error('Redis HGET error:', error); - return null; - } - } - - public async hdel(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hdel(key, field); - return true; - } catch (error) { - logger.error('Redis HDEL error:', error); - return false; - } - } - - public async hgetall(key: string): Promise> { - if (!this.isRedisEnabled() || !this.client) return {}; - - try { - return await this.client.hgetall(key); - } catch (error) { - logger.error('Redis HGETALL error:', error); - return {}; - } - } - - public async publish(channel: string, message: string): Promise { - if (!this.isRedisEnabled() || !this.publisher) return false; - - try { - await this.publisher.publish(channel, message); - return true; - } catch (error) { - logger.error('Redis PUBLISH error:', error); - return false; - } - } - - public async subscribe(channel: string, callback: (channel: string, message: string) => void): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - // Add callback to subscribers map - if (!this.subscribers.has(channel)) { - this.subscribers.set(channel, new Set()); - // Only subscribe to the channel if it's the first subscriber - await this.subscriber.subscribe(channel); - } - - this.subscribers.get(channel)!.add(callback); - return true; - } catch (error) { - logger.error('Redis SUBSCRIBE error:', error); - return false; - } - } - - public async unsubscribe(channel: string, callback?: (channel: string, message: string) => void): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - const channelSubscribers = this.subscribers.get(channel); - if (!channelSubscribers) return true; - - if (callback) { - // Remove specific callback - channelSubscribers.delete(callback); - if (channelSubscribers.size === 0) { - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); + try { + return await this.client.get(key); + } catch (error) { + logger.error("Redis GET error:", error); + return null; } - } else { - // Remove all callbacks for this channel - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); - } - - return true; - } catch (error) { - logger.error('Redis UNSUBSCRIBE error:', error); - return false; } - } - public async disconnect(): Promise { - try { - if (this.client) { - await this.client.quit(); - this.client = null; - } - if (this.publisher) { - await this.publisher.quit(); - this.publisher = null; - } - if (this.subscriber) { - await this.subscriber.quit(); - this.subscriber = null; - } - this.subscribers.clear(); - logger.info('Redis clients disconnected'); - } catch (error) { - logger.error('Error disconnecting Redis clients:', error); + public async del(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.del(key); + return true; + } catch (error) { + logger.error("Redis DEL error:", error); + return false; + } + } + + public async sadd(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.sadd(key, member); + return true; + } catch (error) { + logger.error("Redis SADD error:", error); + return false; + } + } + + public async srem(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.srem(key, member); + return true; + } catch (error) { + logger.error("Redis SREM error:", error); + return false; + } + } + + public async smembers(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return []; + + try { + return await this.client.smembers(key); + } catch (error) { + logger.error("Redis SMEMBERS error:", error); + return []; + } + } + + public async hset( + key: string, + field: string, + value: string + ): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hset(key, field, value); + return true; + } catch (error) { + logger.error("Redis HSET error:", error); + return false; + } + } + + public async hget(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; + + try { + return await this.client.hget(key, field); + } catch (error) { + logger.error("Redis HGET error:", error); + return null; + } + } + + public async hdel(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hdel(key, field); + return true; + } catch (error) { + logger.error("Redis HDEL error:", error); + return false; + } + } + + public async hgetall(key: string): Promise> { + if (!this.isRedisEnabled() || !this.client) return {}; + + try { + return await this.client.hgetall(key); + } catch (error) { + logger.error("Redis HGETALL error:", error); + return {}; + } + } + + public async publish(channel: string, message: string): Promise { + if (!this.isRedisEnabled() || !this.publisher) return false; + + try { + await this.publisher.publish(channel, message); + return true; + } catch (error) { + logger.error("Redis PUBLISH error:", error); + return false; + } + } + + public async subscribe( + channel: string, + callback: (channel: string, message: string) => void + ): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + // Add callback to subscribers map + if (!this.subscribers.has(channel)) { + this.subscribers.set(channel, new Set()); + // Only subscribe to the channel if it's the first subscriber + await this.subscriber.subscribe(channel); + } + + this.subscribers.get(channel)!.add(callback); + return true; + } catch (error) { + logger.error("Redis SUBSCRIBE error:", error); + return false; + } + } + + public async unsubscribe( + channel: string, + callback?: (channel: string, message: string) => void + ): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + const channelSubscribers = this.subscribers.get(channel); + if (!channelSubscribers) return true; + + if (callback) { + // Remove specific callback + channelSubscribers.delete(callback); + if (channelSubscribers.size === 0) { + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + } else { + // Remove all callbacks for this channel + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + + return true; + } catch (error) { + logger.error("Redis UNSUBSCRIBE error:", error); + return false; + } + } + + public async disconnect(): Promise { + try { + if (this.client) { + await this.client.quit(); + this.client = null; + } + if (this.publisher) { + await this.publisher.quit(); + this.publisher = null; + } + if (this.subscriber) { + await this.subscriber.quit(); + this.subscriber = null; + } + this.subscribers.clear(); + logger.info("Redis clients disconnected"); + } catch (error) { + logger.error("Error disconnecting Redis clients:", error); + } } - } } -// Export singleton instance export const redisManager = RedisManager.getInstance(); -export default redisManager; \ No newline at end of file +export default redisManager; diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index 13efce5d..aca2e262 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -131,6 +131,32 @@ export const configSchema = z.object({ .optional() }) .optional(), + redis: z + .object({ + enabled: z.boolean(), + host: z.string().optional(), + port: portSchema.optional(), + password: z.string().optional(), + db: z.number().int().nonnegative().optional().default(0), + tls: z + .object({ + rejectUnauthorized: z.boolean().optional().default(true) + }) + .optional() + }) + .refine( + (redis) => { + if (!redis.enabled) { + return true; + } + return redis.host !== undefined && redis.port !== undefined; + }, + { + message: + "If Redis is enabled, connection details must be provided" + } + ) + .optional(), traefik: z .object({ http_entrypoint: z.string().optional().default("web"),