diff --git a/server/routers/newt/handleNewtMessage.ts b/server/routers/newt/handleNewtMessage.ts new file mode 100644 index 00000000..70d241ed --- /dev/null +++ b/server/routers/newt/handleNewtMessage.ts @@ -0,0 +1,22 @@ +// messageHandlers/chat.ts +import { MessageHandler } from "../ws"; + +export const handleNewtMessage: MessageHandler = async (context) => { + const { message, senderNewtId, sendToClient } = context; + + // Process chat message + // ... your chat logic here ... + + // Example response + return { + message: { + type: 'newt_response', + data: { + originalMessage: message.data, + timestamp: new Date().toISOString() + } + }, + broadcast: false, // Send to all clients + excludeSender: false // Include sender in broadcast + }; +}; \ No newline at end of file diff --git a/server/routers/newt/index.ts b/server/routers/newt/index.ts index 19a0d9f0..e572f7fc 100644 --- a/server/routers/newt/index.ts +++ b/server/routers/newt/index.ts @@ -1,2 +1,3 @@ export * from "./createNewt"; -export * from "./getToken"; \ No newline at end of file +export * from "./getToken"; +export * from "./handleNewtMessage"; \ No newline at end of file diff --git a/server/routers/ws.ts b/server/routers/ws.ts index ac73ccf4..965c3886 100644 --- a/server/routers/ws.ts +++ b/server/routers/ws.ts @@ -1,12 +1,13 @@ -import { Router, Request, Response } from 'express'; -import { Server as HttpServer } from 'http'; -import { WebSocket, WebSocketServer } from 'ws'; -import { IncomingMessage } from 'http'; -import { Socket } from 'net'; -import { Newt, newts, NewtSession } from '@server/db/schema'; -import { eq } from 'drizzle-orm'; -import db from '@server/db'; -import { validateNewtSessionToken } from '@server/auth/newt'; +import { Router, Request, Response } from "express"; +import { Server as HttpServer } from "http"; +import { WebSocket, WebSocketServer } from "ws"; +import { IncomingMessage } from "http"; +import { Socket } from "net"; +import { Newt, newts, NewtSession } from "@server/db/schema"; +import { eq } from "drizzle-orm"; +import db from "@server/db"; +import { validateNewtSessionToken } from "@server/auth/newt"; +import { handleNewtMessage } from "./newt"; // Custom interfaces interface WebSocketRequest extends IncomingMessage { @@ -23,16 +24,93 @@ interface TokenPayload { session: NewtSession; } +interface WSMessage { + type: string; + data: any; +} + +interface HandlerResponse { + message: WSMessage; + broadcast?: boolean; + excludeSender?: boolean; + targetNewtId?: string; +} + +interface HandlerContext { + message: WSMessage; + senderWs: WebSocket; + senderNewtId: string; + sendToClient: (newtId: string, message: WSMessage) => boolean; + broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => void; + connectedClients: Map; +} + +export type MessageHandler = (context: HandlerContext) => Promise; + +const messageHandlers: Record = { + "newt": handleNewtMessage, +}; + const router: Router = Router(); const wss: WebSocketServer = new WebSocketServer({ noServer: true }); -// Token verification middleware +// Client tracking map +let connectedClients: Map = new Map(); + +// Helper functions for client management +const addClient = (newtId: string, ws: AuthenticatedWebSocket): void => { + const existingClients = connectedClients.get(newtId) || []; + existingClients.push(ws); + connectedClients.set(newtId, existingClients); + console.log(`Client added to tracking - Newt ID: ${newtId}, Total connections: ${existingClients.length}`); +}; + +const removeClient = (newtId: string, ws: AuthenticatedWebSocket): void => { + const existingClients = connectedClients.get(newtId) || []; + const updatedClients = existingClients.filter(client => client !== ws); + + if (updatedClients.length === 0) { + connectedClients.delete(newtId); + console.log(`All connections removed for Newt ID: ${newtId}`); + } else { + connectedClients.set(newtId, updatedClients); + console.log(`Connection removed - Newt ID: ${newtId}, Remaining connections: ${updatedClients.length}`); + } +}; + +// Helper functions for sending messages +const sendToClient = (newtId: string, message: WSMessage): boolean => { + const clients = connectedClients.get(newtId); + if (!clients || clients.length === 0) { + console.log(`No active connections found for Newt ID: ${newtId}`); + return false; + } + + const messageString = JSON.stringify(message); + clients.forEach(client => { + if (client.readyState === WebSocket.OPEN) { + client.send(messageString); + } + }); + return true; +}; + +const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void => { + connectedClients.forEach((clients, newtId) => { + if (newtId !== excludeNewtId) { + clients.forEach(client => { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify(message)); + } + }); + } + }); +}; + +// Token verification middleware (unchanged) const verifyToken = async (token: string): Promise => { try { - - const { session, newt } = await validateNewtSessionToken( - token - ); + const { session, newt } = await validateNewtSessionToken(token); if (!session || !newt) { return null; @@ -49,115 +127,145 @@ const verifyToken = async (token: string): Promise => { return { newt: existingNewt[0], session }; } catch (error) { - console.error('Token verification failed:', error); + console.error("Token verification failed:", error); return null; } }; -// Handle WebSocket upgrade requests -router.get('/ws', (req: Request, res: Response) => { - // WebSocket upgrade will be handled by the server - res.status(200).send('WebSocket endpoint'); +// Router endpoint (unchanged) +router.get("/ws", (req: Request, res: Response) => { + res.status(200).send("WebSocket endpoint"); }); -// Set up WebSocket server handling +// WebSocket upgrade handler const handleWSUpgrade = (server: HttpServer): void => { - server.on('upgrade', async (request: WebSocketRequest, socket: Socket, head: Buffer) => { + server.on("upgrade", async (request: WebSocketRequest, socket: Socket, head: Buffer) => { try { - // Extract token from query parameters or headers - const token = request.url?.includes('?') - ? new URLSearchParams(request.url.split('?')[1]).get('token') || '' - : request.headers['sec-websocket-protocol']; + const token = request.url?.includes("?") + ? new URLSearchParams(request.url.split("?")[1]).get("token") || "" + : request.headers["sec-websocket-protocol"]; if (!token) { - socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); socket.destroy(); return; } - // Verify the token const tokenPayload = await verifyToken(token); if (!tokenPayload) { - socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n'); + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); socket.destroy(); return; } - // Store token payload data in the request for later use request.token = token; wss.handleUpgrade(request, socket, head, (ws: AuthenticatedWebSocket) => { - // Attach newt data to the WebSocket instance ws.newt = tokenPayload.newt; ws.isAlive = true; - wss.emit('connection', ws, request); + wss.emit("connection", ws, request); }); } catch (error) { - console.error('Upgrade error:', error); - socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n'); + console.error("Upgrade error:", error); + socket.write("HTTP/1.1 500 Internal Server Error\r\n\r\n"); socket.destroy(); } }); }; -// WebSocket message interface -interface WSMessage { - type: string; - data: any; -} - // WebSocket connection handler -wss.on('connection', (ws: AuthenticatedWebSocket, request: WebSocketRequest) => { - console.log(`Client connected - Newt ID: ${ws.newt?.newtId}`); +wss.on("connection", (ws: AuthenticatedWebSocket, request: WebSocketRequest) => { + const newtId = ws.newt?.newtId; + if (!newtId) { + console.error("Connection attempt without newt ID"); + return ws.terminate(); + } + + // Add client to tracking + addClient(newtId, ws); // Set up ping-pong for connection health check const pingInterval = setInterval(() => { if (ws.isAlive === false) { clearInterval(pingInterval); + removeClient(newtId, ws); return ws.terminate(); } ws.isAlive = false; ws.ping(); }, 30000); - // Handle pong response - ws.on('pong', () => { + ws.on("pong", () => { ws.isAlive = true; }); - // Set up message handler - ws.on('message', (data) => { + ws.on("message", async (data) => { try { const message: WSMessage = JSON.parse(data.toString()); - console.log('Received:', message); - - // Echo the message back - ws.send(JSON.stringify({ - type: 'echo', - data: message - })); + console.log(`Message received from Newt ID ${newtId}:`, message); + + // Validate message format + if (!message.type || typeof message.type !== "string") { + throw new Error("Invalid message format: missing or invalid type"); + } + + // Get the appropriate handler for the message type + const handler = messageHandlers[message.type]; + if (!handler) { + throw new Error(`Unsupported message type: ${message.type}`); + } + + // Process the message and get response + const response = await handler({ + message, + senderWs: ws, + senderNewtId: newtId, + sendToClient, + broadcastToAllExcept, + connectedClients + }); + + // Send response if one was returned + if (response) { + if (response.broadcast) { + // Broadcast to all clients except sender if specified + broadcastToAllExcept(response.message, response.excludeSender ? newtId : undefined); + } else if (response.targetNewtId) { + // Send to specific client if targetNewtId is provided + sendToClient(response.targetNewtId, response.message); + } else { + // Send back to sender + ws.send(JSON.stringify(response.message)); + } + } + } catch (error) { - console.error('Message parsing error:', error); + console.error("Message handling error:", error); ws.send(JSON.stringify({ - type: 'error', - data: 'Invalid message format' + type: "error", + data: { + message: error instanceof Error ? error.message : "Unknown error occurred", + originalMessage: data.toString() + } })); } - }); + }); - // Handle client disconnect - ws.on('close', () => { + ws.on("close", () => { clearInterval(pingInterval); - console.log(`Client disconnected - Newt ID: ${ws.newt?.newtId}`); + removeClient(newtId, ws); + console.log(`Client disconnected - Newt ID: ${newtId}`); }); - // Handle errors - ws.on('error', (error: Error) => { - console.error('WebSocket error:', error); + ws.on("error", (error: Error) => { + console.error(`WebSocket error for Newt ID ${newtId}:`, error); }); }); export { router, - handleWSUpgrade + handleWSUpgrade, + sendToClient, + broadcastToAllExcept, + connectedClients }; \ No newline at end of file