fosrl.pangolin/server/routers/ws.ts

267 lines
8.4 KiB
TypeScript
Raw Normal View History

2024-11-10 17:34:07 -05:00
import { Router, Request, Response } from "express";
import { Server as HttpServer } from "http";
import { WebSocket, WebSocketServer } from "ws";
import { IncomingMessage } from "http";
import { Socket } from "net";
import { Newt, newts, NewtSession } from "@server/db/schema";
import { eq } from "drizzle-orm";
import db from "@server/db";
import { validateNewtSessionToken } from "@server/auth/newt";
2024-11-10 21:06:36 -05:00
import { messageHandlers } from "./messageHandlers";
2024-11-04 00:29:25 -05:00
// Custom interfaces
interface WebSocketRequest extends IncomingMessage {
2024-11-10 17:08:11 -05:00
token?: string;
2024-11-04 00:29:25 -05:00
}
interface AuthenticatedWebSocket extends WebSocket {
2024-11-10 17:08:11 -05:00
newt?: Newt;
isAlive?: boolean;
2024-11-04 00:29:25 -05:00
}
interface TokenPayload {
2024-11-10 17:08:11 -05:00
newt: Newt;
session: NewtSession;
2024-11-04 00:29:25 -05:00
}
2024-11-10 17:34:07 -05:00
interface WSMessage {
type: string;
data: any;
}
interface HandlerResponse {
message: WSMessage;
broadcast?: boolean;
excludeSender?: boolean;
targetNewtId?: string;
}
interface HandlerContext {
message: WSMessage;
senderWs: WebSocket;
senderNewtId: string;
sendToClient: (newtId: string, message: WSMessage) => boolean;
broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => void;
connectedClients: Map<string, WebSocket[]>;
}
export type MessageHandler = (context: HandlerContext) => Promise<HandlerResponse | void>;
2024-11-04 00:29:25 -05:00
const router: Router = Router();
const wss: WebSocketServer = new WebSocketServer({ noServer: true });
2024-11-10 17:34:07 -05:00
// Client tracking map
let connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
// Helper functions for client management
const addClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
const existingClients = connectedClients.get(newtId) || [];
existingClients.push(ws);
connectedClients.set(newtId, existingClients);
console.log(`Client added to tracking - Newt ID: ${newtId}, Total connections: ${existingClients.length}`);
};
const removeClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
const existingClients = connectedClients.get(newtId) || [];
const updatedClients = existingClients.filter(client => client !== ws);
if (updatedClients.length === 0) {
connectedClients.delete(newtId);
console.log(`All connections removed for Newt ID: ${newtId}`);
} else {
connectedClients.set(newtId, updatedClients);
console.log(`Connection removed - Newt ID: ${newtId}, Remaining connections: ${updatedClients.length}`);
}
};
// Helper functions for sending messages
const sendToClient = (newtId: string, message: WSMessage): boolean => {
const clients = connectedClients.get(newtId);
if (!clients || clients.length === 0) {
console.log(`No active connections found for Newt ID: ${newtId}`);
return false;
}
const messageString = JSON.stringify(message);
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
return true;
};
const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void => {
connectedClients.forEach((clients, newtId) => {
if (newtId !== excludeNewtId) {
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(message));
}
});
}
});
};
// Token verification middleware (unchanged)
2024-11-04 00:29:25 -05:00
const verifyToken = async (token: string): Promise<TokenPayload | null> => {
2024-11-10 17:08:11 -05:00
try {
2024-11-10 17:34:07 -05:00
const { session, newt } = await validateNewtSessionToken(token);
2024-11-10 17:08:11 -05:00
if (!session || !newt) {
return null;
}
const existingNewt = await db
.select()
.from(newts)
.where(eq(newts.newtId, newt.newtId));
if (!existingNewt || !existingNewt[0]) {
return null;
}
return { newt: existingNewt[0], session };
} catch (error) {
2024-11-10 17:34:07 -05:00
console.error("Token verification failed:", error);
2024-11-10 17:08:11 -05:00
return null;
}
2024-11-04 00:29:25 -05:00
};
2024-11-10 17:34:07 -05:00
// Router endpoint (unchanged)
router.get("/ws", (req: Request, res: Response) => {
res.status(200).send("WebSocket endpoint");
2024-11-04 00:29:25 -05:00
});
2024-11-10 17:34:07 -05:00
// WebSocket upgrade handler
2024-11-04 00:29:25 -05:00
const handleWSUpgrade = (server: HttpServer): void => {
2024-11-10 17:34:07 -05:00
server.on("upgrade", async (request: WebSocketRequest, socket: Socket, head: Buffer) => {
2024-11-10 17:08:11 -05:00
try {
2024-11-10 17:34:07 -05:00
const token = request.url?.includes("?")
? new URLSearchParams(request.url.split("?")[1]).get("token") || ""
: request.headers["sec-websocket-protocol"];
2024-11-10 17:08:11 -05:00
if (!token) {
2024-11-10 17:34:07 -05:00
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
2024-11-10 17:08:11 -05:00
socket.destroy();
return;
}
const tokenPayload = await verifyToken(token);
if (!tokenPayload) {
2024-11-10 17:34:07 -05:00
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
2024-11-10 17:08:11 -05:00
socket.destroy();
return;
}
request.token = token;
wss.handleUpgrade(request, socket, head, (ws: AuthenticatedWebSocket) => {
ws.newt = tokenPayload.newt;
ws.isAlive = true;
2024-11-10 17:34:07 -05:00
wss.emit("connection", ws, request);
2024-11-10 17:08:11 -05:00
});
} catch (error) {
2024-11-10 17:34:07 -05:00
console.error("Upgrade error:", error);
socket.write("HTTP/1.1 500 Internal Server Error\r\n\r\n");
2024-11-10 17:08:11 -05:00
socket.destroy();
}
});
2024-11-04 00:29:25 -05:00
};
// WebSocket connection handler
2024-11-10 17:34:07 -05:00
wss.on("connection", (ws: AuthenticatedWebSocket, request: WebSocketRequest) => {
const newtId = ws.newt?.newtId;
if (!newtId) {
console.error("Connection attempt without newt ID");
return ws.terminate();
}
// Add client to tracking
addClient(newtId, ws);
2024-11-10 17:08:11 -05:00
// Set up ping-pong for connection health check
const pingInterval = setInterval(() => {
if (ws.isAlive === false) {
clearInterval(pingInterval);
2024-11-10 17:34:07 -05:00
removeClient(newtId, ws);
2024-11-10 17:08:11 -05:00
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
}, 30000);
2024-11-10 17:34:07 -05:00
ws.on("pong", () => {
2024-11-10 17:08:11 -05:00
ws.isAlive = true;
});
2024-11-10 17:34:07 -05:00
ws.on("message", async (data) => {
2024-11-10 17:08:11 -05:00
try {
const message: WSMessage = JSON.parse(data.toString());
2024-11-10 21:06:36 -05:00
// console.log(`Message received from Newt ID ${newtId}:`, message);
2024-11-10 17:34:07 -05:00
// Validate message format
if (!message.type || typeof message.type !== "string") {
throw new Error("Invalid message format: missing or invalid type");
}
// Get the appropriate handler for the message type
const handler = messageHandlers[message.type];
if (!handler) {
throw new Error(`Unsupported message type: ${message.type}`);
}
// Process the message and get response
const response = await handler({
message,
senderWs: ws,
senderNewtId: newtId,
sendToClient,
broadcastToAllExcept,
connectedClients
});
// Send response if one was returned
if (response) {
if (response.broadcast) {
// Broadcast to all clients except sender if specified
broadcastToAllExcept(response.message, response.excludeSender ? newtId : undefined);
} else if (response.targetNewtId) {
// Send to specific client if targetNewtId is provided
sendToClient(response.targetNewtId, response.message);
} else {
// Send back to sender
ws.send(JSON.stringify(response.message));
}
}
2024-11-10 17:08:11 -05:00
} catch (error) {
2024-11-10 17:34:07 -05:00
console.error("Message handling error:", error);
2024-11-10 17:08:11 -05:00
ws.send(JSON.stringify({
2024-11-10 17:34:07 -05:00
type: "error",
data: {
message: error instanceof Error ? error.message : "Unknown error occurred",
originalMessage: data.toString()
}
2024-11-10 17:08:11 -05:00
}));
}
2024-11-10 17:34:07 -05:00
});
2024-11-10 17:08:11 -05:00
2024-11-10 17:34:07 -05:00
ws.on("close", () => {
2024-11-10 17:08:11 -05:00
clearInterval(pingInterval);
2024-11-10 17:34:07 -05:00
removeClient(newtId, ws);
console.log(`Client disconnected - Newt ID: ${newtId}`);
2024-11-10 17:08:11 -05:00
});
2024-11-10 17:34:07 -05:00
ws.on("error", (error: Error) => {
console.error(`WebSocket error for Newt ID ${newtId}:`, error);
2024-11-10 17:08:11 -05:00
});
2024-11-04 00:29:25 -05:00
});
export {
2024-11-10 17:08:11 -05:00
router,
2024-11-10 17:34:07 -05:00
handleWSUpgrade,
sendToClient,
broadcastToAllExcept,
connectedClients
2024-11-04 00:29:25 -05:00
};