diff --git a/docker-compose.pgr.yml b/docker-compose.pgr.yml index faf52faa..aeffc2cf 100644 --- a/docker-compose.pgr.yml +++ b/docker-compose.pgr.yml @@ -9,11 +9,4 @@ services: POSTGRES_PASSWORD: password # Default password (change for production!) ports: - "5432:5432" # Map host port 5432 to container port 5432 - restart: no - - redis: - image: redis:latest # Use the latest Redis image - container_name: dev_redis # Name your Redis container - ports: - - "6379:6379" # Map host port 6379 to container port 6379 - restart: no \ No newline at end of file + restart: no \ No newline at end of file diff --git a/package.json b/package.json index c8c4111f..a34c4d30 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,6 @@ "http-errors": "2.0.0", "i": "^0.3.7", "input-otp": "1.4.2", - "ioredis": "^5.6.1", "jmespath": "^0.16.0", "js-yaml": "4.1.0", "jsonwebtoken": "^9.0.2", @@ -95,7 +94,6 @@ "oslo": "1.2.1", "pg": "^8.16.2", "qrcode.react": "4.2.0", - "rate-limit-redis": "^4.2.1", "react": "19.1.0", "react-dom": "19.1.0", "react-easy-sort": "^1.6.0", diff --git a/server/db/redis.ts b/server/db/redis.ts deleted file mode 100644 index c57b447c..00000000 --- a/server/db/redis.ts +++ /dev/null @@ -1,442 +0,0 @@ -import Redis, { RedisOptions } from "ioredis"; -import logger from "@server/logger"; -import config from "@server/lib/config"; -import { build } from "@server/build"; - -class RedisManager { - public client: Redis | null = null; - private subscriber: Redis | null = null; - private publisher: Redis | null = null; - private isEnabled: boolean = false; - private isHealthy: boolean = true; - private lastHealthCheck: number = 0; - private healthCheckInterval: number = 30000; // 30 seconds - private subscribers: Map< - string, - Set<(channel: string, message: string) => void> - > = new Map(); - - constructor() { - if (build == "oss") { - this.isEnabled = false; - } else { - this.isEnabled = config.getRawConfig().flags?.enable_redis || false; - } - if (this.isEnabled) { - this.initializeClients(); - } - } - - private getRedisConfig(): RedisOptions { - const redisConfig = config.getRawConfig().redis!; - const opts: RedisOptions = { - host: redisConfig.host!, - port: redisConfig.port!, - password: redisConfig.password, - db: redisConfig.db, - // tls: { - // rejectUnauthorized: - // redisConfig.tls?.reject_unauthorized || false - // } - }; - return opts; - } - - // Add reconnection logic in initializeClients - private initializeClients(): void { - const config = this.getRedisConfig(); - - try { - this.client = new Redis({ - ...config, - enableReadyCheck: false, - maxRetriesPerRequest: 3, - keepAlive: 30000, - connectTimeout: 10000, // 10 seconds - commandTimeout: 5000, // 5 seconds - }); - - this.publisher = new Redis({ - ...config, - enableReadyCheck: false, - maxRetriesPerRequest: 3, - keepAlive: 30000, - connectTimeout: 10000, // 10 seconds - commandTimeout: 5000, // 5 seconds - }); - - this.subscriber = new Redis({ - ...config, - enableReadyCheck: false, - maxRetriesPerRequest: 3, - keepAlive: 30000, - connectTimeout: 10000, // 10 seconds - commandTimeout: 5000, // 5 seconds - }); - - // Add reconnection handlers - this.client.on("error", (err) => { - logger.error("Redis client error:", err); - this.isHealthy = false; - }); - - this.client.on("reconnecting", () => { - logger.info("Redis client reconnecting..."); - this.isHealthy = false; - }); - - this.client.on("ready", () => { - logger.info("Redis client ready"); - this.isHealthy = true; - }); - - this.publisher.on("error", (err) => { - logger.error("Redis publisher error:", err); - this.isHealthy = false; - }); - - this.publisher.on("ready", () => { - logger.info("Redis publisher ready"); - }); - - this.subscriber.on("error", (err) => { - logger.error("Redis subscriber error:", err); - this.isHealthy = false; - }); - - this.subscriber.on("ready", () => { - logger.info("Redis subscriber ready"); - }); - - // Set up connection handlers - this.client.on("connect", () => { - logger.info("Redis client connected"); - }); - - this.publisher.on("connect", () => { - logger.info("Redis publisher connected"); - }); - - this.subscriber.on("connect", () => { - logger.info("Redis subscriber connected"); - }); - - // Set up message handler for subscriber - this.subscriber.on( - "message", - (channel: string, message: string) => { - const channelSubscribers = this.subscribers.get(channel); - if (channelSubscribers) { - channelSubscribers.forEach((callback) => { - try { - callback(channel, message); - } catch (error) { - logger.error( - `Error in subscriber callback for channel ${channel}:`, - error - ); - } - }); - } - } - ); - - logger.info("Redis clients initialized successfully"); - - // Start periodic health monitoring - this.startHealthMonitoring(); - } catch (error) { - logger.error("Failed to initialize Redis clients:", error); - this.isEnabled = false; - } - } - - private startHealthMonitoring(): void { - if (!this.isEnabled) return; - - // Check health every 30 seconds - setInterval(async () => { - try { - await this.checkRedisHealth(); - } catch (error) { - logger.error("Error during Redis health monitoring:", error); - } - }, this.healthCheckInterval); - } - - public isRedisEnabled(): boolean { - return this.isEnabled && this.client !== null && this.isHealthy; - } - - private async checkRedisHealth(): Promise { - const now = Date.now(); - - // Only check health every 30 seconds - if (now - this.lastHealthCheck < this.healthCheckInterval) { - return this.isHealthy; - } - - this.lastHealthCheck = now; - - if (!this.client) { - this.isHealthy = false; - return false; - } - - try { - await Promise.race([ - this.client.ping(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Health check timeout')), 2000) - ) - ]); - this.isHealthy = true; - return true; - } catch (error) { - logger.error("Redis health check failed:", error); - this.isHealthy = false; - return false; - } - } - - public getClient(): Redis { - return this.client!; - } - - public async set( - key: string, - value: string, - ttl?: number - ): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - if (ttl) { - await this.client.setex(key, ttl, value); - } else { - await this.client.set(key, value); - } - return true; - } catch (error) { - logger.error("Redis SET error:", error); - return false; - } - } - - public async get(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.get(key); - } catch (error) { - logger.error("Redis GET error:", error); - return null; - } - } - - public async del(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.del(key); - return true; - } catch (error) { - logger.error("Redis DEL error:", error); - return false; - } - } - - public async sadd(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.sadd(key, member); - return true; - } catch (error) { - logger.error("Redis SADD error:", error); - return false; - } - } - - public async srem(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.srem(key, member); - return true; - } catch (error) { - logger.error("Redis SREM error:", error); - return false; - } - } - - public async smembers(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return []; - - try { - return await this.client.smembers(key); - } catch (error) { - logger.error("Redis SMEMBERS error:", error); - return []; - } - } - - public async hset( - key: string, - field: string, - value: string - ): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hset(key, field, value); - return true; - } catch (error) { - logger.error("Redis HSET error:", error); - return false; - } - } - - public async hget(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.hget(key, field); - } catch (error) { - logger.error("Redis HGET error:", error); - return null; - } - } - - public async hdel(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hdel(key, field); - return true; - } catch (error) { - logger.error("Redis HDEL error:", error); - return false; - } - } - - public async hgetall(key: string): Promise> { - if (!this.isRedisEnabled() || !this.client) return {}; - - try { - return await this.client.hgetall(key); - } catch (error) { - logger.error("Redis HGETALL error:", error); - return {}; - } - } - - public async publish(channel: string, message: string): Promise { - if (!this.isRedisEnabled() || !this.publisher) return false; - - // Quick health check before attempting to publish - const isHealthy = await this.checkRedisHealth(); - if (!isHealthy) { - logger.warn("Skipping Redis publish due to unhealthy connection"); - return false; - } - - try { - // Add timeout to prevent hanging - await Promise.race([ - this.publisher.publish(channel, message), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Redis publish timeout')), 3000) - ) - ]); - return true; - } catch (error) { - logger.error("Redis PUBLISH error:", error); - this.isHealthy = false; // Mark as unhealthy on error - return false; - } - } - - public async subscribe( - channel: string, - callback: (channel: string, message: string) => void - ): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - // Add callback to subscribers map - if (!this.subscribers.has(channel)) { - this.subscribers.set(channel, new Set()); - // Only subscribe to the channel if it's the first subscriber - await Promise.race([ - this.subscriber.subscribe(channel), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Redis subscribe timeout')), 5000) - ) - ]); - } - - this.subscribers.get(channel)!.add(callback); - return true; - } catch (error) { - logger.error("Redis SUBSCRIBE error:", error); - this.isHealthy = false; - return false; - } - } - - public async unsubscribe( - channel: string, - callback?: (channel: string, message: string) => void - ): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - const channelSubscribers = this.subscribers.get(channel); - if (!channelSubscribers) return true; - - if (callback) { - // Remove specific callback - channelSubscribers.delete(callback); - if (channelSubscribers.size === 0) { - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); - } - } else { - // Remove all callbacks for this channel - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); - } - - return true; - } catch (error) { - logger.error("Redis UNSUBSCRIBE error:", error); - return false; - } - } - - public async disconnect(): Promise { - try { - if (this.client) { - await this.client.quit(); - this.client = null; - } - if (this.publisher) { - await this.publisher.quit(); - this.publisher = null; - } - if (this.subscriber) { - await this.subscriber.quit(); - this.subscriber = null; - } - this.subscribers.clear(); - logger.info("Redis clients disconnected"); - } catch (error) { - logger.error("Error disconnecting Redis clients:", error); - } - } -} - -export const redisManager = new RedisManager(); -export const redis = redisManager.getClient(); -export default redisManager; diff --git a/server/lib/rateLimitStore.ts b/server/lib/rateLimitStore.ts index 78e1a9fc..2f6dc675 100644 --- a/server/lib/rateLimitStore.ts +++ b/server/lib/rateLimitStore.ts @@ -1,16 +1,6 @@ import { MemoryStore, Store } from "express-rate-limit"; -import config from "./config"; -import redisManager from "@server/db/redis"; -import { RedisStore } from "rate-limit-redis"; export function createStore(): Store { let rateLimitStore: Store = new MemoryStore(); - if (config.getRawConfig().flags?.enable_redis) { - const client = redisManager.client!; - rateLimitStore = new RedisStore({ - sendCommand: async (command: string, ...args: string[]) => - (await client.call(command, args)) as any - }); - } return rateLimitStore; } diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index 07203213..864a6a40 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -124,22 +124,6 @@ export const configSchema = z .optional() }) .optional(), - redis: z - .object({ - host: z.string(), - port: portSchema, - password: z.string().optional(), - db: z.number().int().nonnegative().optional().default(0), - tls: z - .object({ - reject_unauthorized: z - .boolean() - .optional() - .default(true) - }) - .optional() - }) - .optional(), traefik: z .object({ http_entrypoint: z.string().optional().default("web"), @@ -237,7 +221,6 @@ export const configSchema = z disable_user_create_org: z.boolean().optional(), allow_raw_resources: z.boolean().optional(), enable_integration_api: z.boolean().optional(), - enable_redis: z.boolean().optional(), disable_local_sites: z.boolean().optional(), disable_basic_wireguard_sites: z.boolean().optional(), disable_config_managed_domains: z.boolean().optional(), @@ -245,18 +228,6 @@ export const configSchema = z }) .optional() }) - .refine( - (data) => { - if (data.flags?.enable_redis) { - return data?.redis !== undefined; - } - return true; - }, - { - message: - "If Redis is enabled, configuration details must be provided" - } - ) .refine( (data) => { const keys = Object.keys(data.domains || {}); @@ -272,20 +243,6 @@ export const configSchema = z message: "At least one domain must be defined" } ) - .refine( - (data) => { - if (build == "oss" && data.redis) { - return false; - } - if (build == "oss" && data.flags?.enable_redis) { - return false; - } - return true; - }, - { - message: "Redis" - } - ); export function readConfigFile() { const loadConfig = (configPath: string) => { diff --git a/server/routers/ws.ts b/server/routers/ws.ts index c925ac5c..0d9f84d3 100644 --- a/server/routers/ws.ts +++ b/server/routers/ws.ts @@ -10,7 +10,6 @@ import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { validateOlmSessionToken } from "@server/auth/sessions/olm"; import { messageHandlers } from "./messageHandlers"; import logger from "@server/logger"; -import redisManager from "@server/db/redis"; import { v4 as uuidv4 } from "uuid"; // Custom interfaces @@ -54,14 +53,6 @@ interface HandlerContext { connectedClients: Map; } -interface RedisMessage { - type: 'direct' | 'broadcast'; - targetClientId?: string; - excludeClientId?: string; - message: WSMessage; - fromNodeId: string; -} - export type MessageHandler = (context: HandlerContext) => Promise; const router: Router = Router(); @@ -69,41 +60,12 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true }); // Generate unique node ID for this instance const NODE_ID = uuidv4(); -const REDIS_CHANNEL = 'websocket_messages'; // Client tracking map (local to this node) let connectedClients: Map = new Map(); // Helper to get map key const getClientMapKey = (clientId: string) => clientId; -// Redis keys (generalized) -const getConnectionsKey = (clientId: string) => `ws:connections:${clientId}`; -const getNodeConnectionsKey = (nodeId: string, clientId: string) => `ws:node:${nodeId}:${clientId}`; - -// Initialize Redis subscription for cross-node messaging -const initializeRedisSubscription = async (): Promise => { - if (!redisManager.isRedisEnabled()) return; - - await redisManager.subscribe(REDIS_CHANNEL, async (channel: string, message: string) => { - try { - const redisMessage: RedisMessage = JSON.parse(message); - - // Ignore messages from this node - if (redisMessage.fromNodeId === NODE_ID) return; - - if (redisMessage.type === 'direct' && redisMessage.targetClientId) { - // Send to specific client on this node - await sendToClientLocal(redisMessage.targetClientId, redisMessage.message); - } else if (redisMessage.type === 'broadcast') { - // Broadcast to all clients on this node except excluded - await broadcastToAllExceptLocal(redisMessage.message, redisMessage.excludeClientId); - } - } catch (error) { - logger.error('Error processing Redis message:', error); - } - }); -}; - // Helper functions for client management const addClient = async (clientType: ClientType, clientId: string, ws: AuthenticatedWebSocket): Promise => { // Generate unique connection ID @@ -116,12 +78,6 @@ const addClient = async (clientType: ClientType, clientId: string, ws: Authentic existingClients.push(ws); connectedClients.set(mapKey, existingClients); - // Add to Redis tracking if enabled - if (redisManager.isRedisEnabled()) { - await redisManager.sadd(getConnectionsKey(clientId), NODE_ID); - await redisManager.hset(getNodeConnectionsKey(NODE_ID, clientId), connectionId, Date.now().toString()); - } - logger.info(`Client added to tracking - ${clientType.toUpperCase()} ID: ${clientId}, Connection ID: ${connectionId}, Total connections: ${existingClients.length}`); }; @@ -132,19 +88,10 @@ const removeClient = async (clientType: ClientType, clientId: string, ws: Authen if (updatedClients.length === 0) { connectedClients.delete(mapKey); - if (redisManager.isRedisEnabled()) { - await redisManager.srem(getConnectionsKey(clientId), NODE_ID); - await redisManager.del(getNodeConnectionsKey(NODE_ID, clientId)); - } - logger.info(`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`); } else { connectedClients.set(mapKey, updatedClients); - if (redisManager.isRedisEnabled() && ws.connectionId) { - await redisManager.hdel(getNodeConnectionsKey(NODE_ID, clientId), ws.connectionId); - } - logger.info(`Connection removed - ${clientType.toUpperCase()} ID: ${clientId}, Remaining connections: ${updatedClients.length}`); } }; @@ -178,64 +125,31 @@ const broadcastToAllExceptLocal = async (message: WSMessage, excludeClientId?: s }); }; -// Cross-node message sending (via Redis) +// Cross-node message sending const sendToClient = async (clientId: string, message: WSMessage): Promise => { // Try to send locally first const localSent = await sendToClientLocal(clientId, message); - // If Redis is enabled, also send via Redis pub/sub to other nodes - if (redisManager.isRedisEnabled()) { - const redisMessage: RedisMessage = { - type: 'direct', - targetClientId: clientId, - message, - fromNodeId: NODE_ID - }; - - await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage)); - } - return localSent; }; const broadcastToAllExcept = async (message: WSMessage, excludeClientId?: string): Promise => { // Broadcast locally await broadcastToAllExceptLocal(message, excludeClientId); - - // If Redis is enabled, also broadcast via Redis pub/sub to other nodes - if (redisManager.isRedisEnabled()) { - const redisMessage: RedisMessage = { - type: 'broadcast', - excludeClientId, - message, - fromNodeId: NODE_ID - }; - - await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage)); - } }; // Check if a client has active connections across all nodes const hasActiveConnections = async (clientId: string): Promise => { - if (!redisManager.isRedisEnabled()) { const mapKey = getClientMapKey(clientId); const clients = connectedClients.get(mapKey); return !!(clients && clients.length > 0); - } - - const activeNodes = await redisManager.smembers(getConnectionsKey(clientId)); - return activeNodes.length > 0; }; // Get all active nodes for a client const getActiveNodes = async (clientType: ClientType, clientId: string): Promise => { - if (!redisManager.isRedisEnabled()) { const mapKey = getClientMapKey(clientId); const clients = connectedClients.get(mapKey); return (clients && clients.length > 0) ? [NODE_ID] : []; - } - - return await redisManager.smembers(getConnectionsKey(clientId)); }; // Token verification middleware @@ -391,16 +305,6 @@ const handleWSUpgrade = (server: HttpServer): void => { }); }; -// Initialize Redis subscription when the module is loaded -if (redisManager.isRedisEnabled()) { - initializeRedisSubscription().catch(error => { - logger.error('Failed to initialize Redis subscription:', error); - }); - logger.info(`WebSocket handler initialized with Redis support - Node ID: ${NODE_ID}`); -} else { - logger.debug('WebSocket handler initialized in local mode (Redis disabled)'); -} - // Cleanup function for graceful shutdown const cleanup = async (): Promise => { try { @@ -413,14 +317,6 @@ const cleanup = async (): Promise => { }); }); - // Clean up Redis tracking for this node - if (redisManager.isRedisEnabled()) { - const keys = await redisManager.getClient()?.keys(`ws:node:${NODE_ID}:*`) || []; - if (keys.length > 0) { - await Promise.all(keys.map(key => redisManager.del(key))); - } - } - logger.info('WebSocket cleanup completed'); } catch (error) { logger.error('Error during WebSocket cleanup:', error);