Merge branch 'clients' of https://github.com/fosrl/pangolin_dg into clients

This commit is contained in:
miloschwartz 2025-02-21 14:39:31 -05:00
commit 255e29d9c8
No known key found for this signature in database
10 changed files with 139 additions and 26 deletions

View file

@ -32,6 +32,11 @@ gerbil:
site_block_size: 30
subnet_group: 100.89.137.0/20
wg_site:
start_port: 51820
block_size: 24
subnet_group: 100.89.137.0/20
rate_limits:
global:
window_minutes: 1

View file

@ -12,6 +12,7 @@ import {
} from "@server/lib/consts";
import { passwordSchema } from "@server/auth/passwordSchema";
import stoi from "./stoi";
import { start } from "repl";
const portSchema = z.number().positive().gt(0).lte(65535);
const hostnameSchema = z
@ -112,6 +113,7 @@ const configSchema = z.object({
wg_site: z.object({
block_size: z.number().positive().gt(0),
subnet_group: z.string(),
start_port: portSchema
}),
rate_limits: z.object({
global: z.object({

View file

@ -30,12 +30,13 @@ export const receiveBandwidth = async (
const { publicKey, bytesIn, bytesOut } = peer;
// Find the site by public key
const site = await trx.query.sites.findFirst({
where: eq(sites.pubKey, publicKey)
});
const [site] = await trx
.select()
.from(sites)
.where(eq(sites.pubKey, publicKey))
.limit(1);
if (!site) {
logger.warn(`Site not found for public key: ${publicKey}`);
continue;
}
let online = site.online;

View file

@ -1,4 +1,4 @@
import { handleNewtRegisterMessage } from "./newt";
import { handleNewtRegisterMessage, handleReceiveBandwidthMessage } from "./newt";
import { handleOlmRegisterMessage } from "./olm";
import { handleGetConfigMessage } from "./newt/handleGetConfigMessage";
import { MessageHandler } from "./ws";
@ -7,4 +7,5 @@ export const messageHandlers: Record<string, MessageHandler> = {
"newt/wg/register": handleNewtRegisterMessage,
"olm/wg/register": handleOlmRegisterMessage,
"newt/wg/get-config": handleGetConfigMessage,
"newt/receive-bandwidth": handleReceiveBandwidthMessage
};

View file

@ -3,7 +3,7 @@ import { MessageHandler } from "../ws";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import db from "@server/db";
import { clients, Site, sites } from "@server/db/schema";
import { clients, Newt, Site, sites } from "@server/db/schema";
import { eq, isNotNull } from "drizzle-orm";
import { findNextAvailableCidr } from "@server/lib/ip";
import config from "@server/lib/config";
@ -17,7 +17,8 @@ const inputSchema = z.object({
type Input = z.infer<typeof inputSchema>;
export const handleGetConfigMessage: MessageHandler = async (context) => {
const { message, newt, sendToClient } = context;
const { message, client, sendToClient } = context;
const newt = client as Newt;
logger.debug("Handling Newt get config message!");
@ -40,7 +41,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
return;
}
const { publicKey, endpoint, listenPort } = message.data as Input;
const { publicKey, endpoint } = message.data as Input;
const siteId = newt.siteId;
@ -57,6 +58,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
let site: Site | undefined;
if (!site) {
const address = await getNextAvailableSubnet();
const listenPort = await getNextAvailablePort();
// create a new exit node
const [updateRes] = await db
@ -91,7 +93,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
clientsRes.map(async (client) => {
return {
publicKey: client.pubKey,
allowedIps: "0.0.0.0/0"
allowedIps: "0.0.0.0/0" // TODO: We should lock this down more
};
})
);
@ -106,7 +108,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
return {
message: {
type: "newt/wg/connect", // what to make the response type?
type: "newt/wg/receive-config", // what to make the response type?
data: {
config: configResponse
}
@ -145,3 +147,24 @@ async function getNextAvailableSubnet(): Promise<string> {
subnet.split("/")[1];
return subnet;
}
async function getNextAvailablePort(): Promise<number> {
// Get all existing ports from exitNodes table
const existingPorts = await db.select({
listenPort: sites.listenPort,
}).from(sites);
// Find the first available port between 1024 and 65535
let nextPort = config.getRawConfig().wg_site.start_port;
for (const port of existingPorts) {
if (port.listenPort && port.listenPort > nextPort) {
break;
}
nextPort++;
if (nextPort > 65535) {
throw new Error('No available ports remaining in space');
}
}
return nextPort;
}

View file

@ -2,6 +2,7 @@ import db from "@server/db";
import { MessageHandler } from "../ws";
import {
exitNodes,
Newt,
resources,
sites,
Target,
@ -13,8 +14,7 @@ import logger from "@server/logger";
export const handleNewtRegisterMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
const newt = client;
const newt = client as Newt;
logger.info("Handling register newt message!");

View file

@ -0,0 +1,68 @@
import db from "@server/db";
import { MessageHandler } from "../ws";
import { clients, Newt } from "@server/db/schema";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
interface PeerBandwidth {
publicKey: string;
bytesIn: number;
bytesOut: number;
}
export const handleReceiveBandwidthMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
const newt = client as Newt;
const bandwidthData: PeerBandwidth[] = message.data;
if (!Array.isArray(bandwidthData)) {
throw new Error("Invalid bandwidth data");
}
await db.transaction(async (trx) => {
for (const peer of bandwidthData) {
const { publicKey, bytesIn, bytesOut } = peer;
// Find the site by public key
const [client] = await trx
.select()
.from(clients)
.where(eq(clients.pubKey, publicKey))
.limit(1);
if (!client) {
continue;
}
let online = client.online;
// if the bandwidth for the site is > 0 then set it to online. if it has been less than 0 (no update) for 5 minutes then set it to offline
if (bytesIn > 0 || bytesOut > 0) {
online = true;
} else if (client.lastBandwidthUpdate) {
const lastBandwidthUpdate = new Date(
client.lastBandwidthUpdate
);
const currentTime = new Date();
const diff =
currentTime.getTime() - lastBandwidthUpdate.getTime();
if (diff < 300000) {
online = false;
}
}
// Update the site's bandwidth usage
await trx
.update(clients)
.set({
megabytesOut: (client.megabytesIn || 0) + bytesIn,
megabytesIn: (client.megabytesOut || 0) + bytesOut,
lastBandwidthUpdate: new Date().toISOString(),
online
})
.where(eq(clients.clientId, client.clientId));
}
});
logger.info("Handling register olm message!");
};

View file

@ -1,3 +1,4 @@
export * from "./createNewt";
export * from "./getToken";
export * from "./handleRegisterMessage";
export * from "./handleNewtRegisterMessage";
export* from "./handleReceiveBandwidthMessage";

View file

@ -20,7 +20,7 @@ export async function addPeer(siteId: number, peer: {
}
sendToClient(newt.newtId, {
type: 'add_peer',
type: 'newt/wg/peer/add',
data: peer
});
}
@ -38,7 +38,7 @@ export async function deletePeer(siteId: number, publicKey: string) {
}
sendToClient(newt.newtId, {
type: 'delete_peer',
type: 'newt/wg/peer/remove',
data: {
publicKey
}

View file

@ -1,6 +1,8 @@
import db from "@server/db";
import { MessageHandler } from "../ws";
import {
clients,
Olm,
olms,
sites,
} from "@server/db/schema";
@ -9,9 +11,8 @@ import { addPeer, deletePeer } from "../newt/peers";
import logger from "@server/logger";
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
const olm = client;
const { message, client: c, sendToClient } = context;
const olm = c as Olm;
logger.info("Handling register olm message!");
@ -20,12 +21,12 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
return;
}
if (!olm.siteId) {
if (!olm.clientId) {
logger.warn("Olm has no site!"); // TODO: Maybe we create the site here?
return;
}
const siteId = olm.siteId;
const clientId = olm.clientId;
const { publicKey } = message.data;
if (!publicKey) {
@ -33,28 +34,39 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
return;
}
const [client] = await db
.select()
.from(clients)
.where(eq(clients.clientId, clientId))
.limit(1);
if (!client || !client.siteId) {
logger.warn("Site not found or does not have exit node");
return;
}
const [site] = await db
.select()
.from(sites)
.where(eq(sites.siteId, siteId))
.where(eq(sites.siteId, client.siteId))
.limit(1);
if (!site) {
if (!client) {
logger.warn("Site not found or does not have exit node");
return;
}
await db
.update(olms)
.update(clients)
.set({
pubKey: publicKey
})
.where(eq(olms.olmId, olm.olmId))
.where(eq(clients.clientId, olm.clientId))
.returning();
if (olm.pubKey && olm.pubKey !== publicKey) {
if (client.pubKey && client.pubKey !== publicKey) {
logger.info("Public key mismatch. Deleting old peer...");
await deletePeer(site.siteId, site.pubKey);
await deletePeer(site.siteId, client.pubKey);
}
if (!site.subnet) {