From 4adcf810046d5fc194458caf4f1ff4c0a1e4391c Mon Sep 17 00:00:00 2001 From: PrimarchPaul Date: Sun, 28 Sep 2025 02:10:41 -0400 Subject: [PATCH 1/3] nuked all realtime code --- .../src/controller/realtimeController.ts | 90 ------- concord-server/src/index.ts | 18 +- concord-server/src/routes/realtimeRoutes.ts | 90 ------- concord-server/src/services/realtime.ts | 222 ------------------ 4 files changed, 13 insertions(+), 407 deletions(-) delete mode 100644 concord-server/src/controller/realtimeController.ts delete mode 100644 concord-server/src/routes/realtimeRoutes.ts delete mode 100644 concord-server/src/services/realtime.ts diff --git a/concord-server/src/controller/realtimeController.ts b/concord-server/src/controller/realtimeController.ts deleted file mode 100644 index 834e4b6..0000000 --- a/concord-server/src/controller/realtimeController.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Context } from "hono"; -import { - sendMessageToChannelEvent, - removeMessageFromChannel, -} from "../services/realtime.js"; -import { success } from "zod"; -import { PostMessageToChannelInput } from "../validators/realtimeValidator.js"; -import { sendMessage } from "./messageController.js"; - -export async function postMessageToChannel(io: any, c: Context, data: PostMessageToChannelInput) { - const instanceId = data.instanceId; - const categoryId = data.categoryId; - const channelId = data.channelId; - const userId = data.userId; - const content = data.content; - const token = data.token; - const repliedMessageId = data.repliedMessageId ?? null; - const event = "new_channel_message"; - return sendMessageToChannelEvent( - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId, - event, - io - ); -} - -export async function deleteMessageFromChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const messageId = c.req.param("messageId"); - - const result = await removeMessageFromChannel( - instanceId, - categoryId, - channelId, - messageId, - "delete_channel_message", - io, - ); - - if (result === "event not implemented") { - console.log( - "controller::realtime::deleteMessageFromChannel - Event not implemented", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::deleteMessageFromChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to delete message"); - } - - c.json({ - success: true, - message: "Message deleted successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } -} diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1348a03..5356a92 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -13,11 +13,11 @@ const io = new Server(); //then bind to socket.io server const engine = new Engine(); io.bind(engine); - +/* io.on("connection", (socket) => { //get userId and clientId from query params - const userId = socket.handshake.query.userId; - const clientId = socket.handshake.query.clientId; + const userId = socket.handshake.query.userId + const clientId = socket.handshake.query.clientId if (!userId || Array.isArray(userId)) { socket.disconnect(); throw new Error("Invalid user ID"); @@ -28,15 +28,23 @@ io.on("connection", (socket) => { throw new Error("Invalid client ID"); } + socket.join(userId); console.log( `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ); + ) socket.on("disconnect", () => { console.log(`User ${userId} disconnected from socket ${socket.id}`); - }); + }) }); +*/ + +io.on("ping", (socket) => { + console.log(`New client connected: ${socket.id}`) + socket.emit("pong", socket. ) +}) + const app = new Hono(); diff --git a/concord-server/src/routes/realtimeRoutes.ts b/concord-server/src/routes/realtimeRoutes.ts deleted file mode 100644 index cb44dd1..0000000 --- a/concord-server/src/routes/realtimeRoutes.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Hono } from "hono"; -import { zValidator } from "@hono/zod-validator"; -import { describeRoute, resolver } from "hono-openapi"; -import { - postMessageToChannelSchema -} from "../validators/realtimeValidator.js"; -import { - postMessageToChannel, - deleteMessageFromChannel, -} from "../controller/realtimeController"; - - -const realtimeRoutes = new Hono(); - -realtimeRoutes.post( - "/message/:instanceId/:categoryId/:channelId", - describeRoute({ - description: "Post a message to a channel", - responses: { - 200: { - description: "Message posted successfully", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 400: { - description: "Bad Request - Invalid input data", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 401: { - description: "Unauthorized - Invalid token", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 404: { - description: "Instance, Category, Channel, or User not found", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 500: { - description: "Server error", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - }, - }), - zValidator("json", postMessageToChannelSchema), - async (c) => { - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const { userId, content, repliedMessageId, token } = await c.req.json(); - - const ioServer = (c as any).get("io"); - if (!ioServer) { - return c.json({ success: false, error: "Realtime server not available" }, 500); - } - - const result = await postMessageToChannel(ioServer, c, { - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId: repliedMessageId ?? null, - }) - - if (result === "event not implemented") { - return c.json({ success: false, message: "Event not implemented or recognized" }, 400); - } - - if (result === "no acknowledgment") { - return c.json({ success: false, message: "No acknowledgment received from client" }, 500); - } - - if (!result) { - return c.json({ success: false, message: "Failed to post message" }, 500); - } - - return c.json({ success: true, result }, 200); - } -); - -export default realtimeRoutes; \ No newline at end of file diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts deleted file mode 100644 index 9083788..0000000 --- a/concord-server/src/services/realtime.ts +++ /dev/null @@ -1,222 +0,0 @@ -import { PrismaClient } from "@prisma/client"; -import type { Server, Socket } from "socket.io"; -import { sendMessageToChannel } from "./messageService"; - -const prisma = new PrismaClient(); - -const EVENTS = { - NEW_CHANNEL_MESSAGE: "new_channel_message", - DELETE_CHANNEL_MESSAGE: "delete_channel_message", - MESSAGE_PING: "message_ping", -}; - -export async function sendMessageToChannelEvent( - instanceId: string, - categoryId: string, - channelId: string, - userId: string, - content: string, - token: string, - repliedMessageId: string | null, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.NEW_CHANNEL_MESSAGE === event) { - throw new Error("Event not implemented"); - } - - const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId); - - if (!newMessage) { - console.log("services::realtime::sendMessageToChannel - could not create new message"); - return "failed to create message" - } - - return new Promise((resolve) => { - io.to(instanceId).emit(event, newMessage, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::sendMessageToChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::sendMessageToChannel - Event not implemented. Attempted event: ${event}`, - ); - return "event not implemented"; - } - console.log("services::realtime::sendMessageToChannel - ", errMessage); - return false; - } -} - -export async function removeMessageFromChannel( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.DELETE_CHANNEL_MESSAGE === event) { - throw new Error("event not implemented"); - } - - //TODO: add prisma to flag a channel message as deleted - - return new Promise((resolve) => { - io.to(instanceId).emit(event, { messageId }, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::deleteMessageFromChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::deleteMessageFromChannel - Event not implemented. Attempted event: ${event}`, - ); - return false; - } - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return false; - } -} - - -export async function messagePing( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - userId: string, - pingType: string, - text: string, - io: any, - mentionedUserIds?: string[], -): Promise { - try { - const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } }); - if (!curInstance) throw new Error("instance not found"); - - const curCategory = await prisma.category.findUnique({ where: { id: categoryId } }); - if (!curCategory) throw new Error("category not found"); - - const curChannel = await prisma.channel.findUnique({ where: { id: channelId } }); - if (!curChannel) throw new Error("channel not found"); - - if (pingType === "mention") { - if (!mentionedUserIds || mentionedUserIds.length === 0) { - throw new Error("no mentioned users provided for mention ping"); - } - - // Persist pings (best-effort) - try { - const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m })); - await prisma.messagePing.createMany({ data: rows, skipDuplicates: true }); - } catch (e) { - console.warn("services::realtime::messagePing - could not persist pings", e); - } - - const timeoutMs = 5000; - const results: Array<{ userId: string; delivered: boolean }> = []; - - // For each mentioned user, find sockets on this server with matching socket.data.userId - await Promise.all( - mentionedUserIds.map(async (mentionedUserId) => { - const socketsForUser: Socket[] = []; - for (const sock of Array.from((io as Server).sockets.sockets.values())) { - try { - if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket); - } catch {} - } - - if (socketsForUser.length === 0) { - results.push({ userId: mentsocketsionedUserId, delivered: false }); - return; - } - - const perSocket = socketsForUser.map( - (sock) => - new Promise((resolve) => { - let done = false; - try { - sock.emit( - EVENTS.MESSAGE_PING, - { categoryId, channelId, messageId, pingType, text, mentionedUserId }, - (ack: any) => { - if (done) return; - done = true; - resolve(!!ack && ack.status === "received"); - }, - ); - } catch (e) { - if (!done) { - done = true; - resolve(false); - } - } - - setTimeout(() => { - if (done) return; - done = true; - resolve(false); - }, timeoutMs); - }), - ); - - try { - const settled = await Promise.all(perSocket); - const delivered = settled.some(Boolean); - results.push({ userId: mentionedUserId, delivered }); - } catch { - results.push({ userId: mentionedUserId, delivered: false }); - } - }), - ); - - const anyDelivered = results.some((r) => r.delivered); - if (anyDelivered) { - console.log("services::realtime::messagePing delivered to some users", results); - return true; - } - - console.log("services::realtime::messagePing no acknowledgments", results); - return "no acknowledgment"; - } - - // Fallback: emit a generic ping to the instance (fire-and-forget ack optional) - return new Promise((resolve) => { - (io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => { - if (ack && ack.status === "received") { - resolve(true); - } else { - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::messagePing - ", errMessage); - return false; - } -} \ No newline at end of file From 460991f2aec4a19844c27e94395d9f7b64bc6422 Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 03:11:34 -0400 Subject: [PATCH 2/3] socket.io work!!! --- concord-server/src/index.ts | 85 +++++++++++++++--------------- concord-server/src/routes/index.ts | 2 - 2 files changed, 43 insertions(+), 44 deletions(-) diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 5356a92..1332236 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -6,53 +6,14 @@ import routes from "./routes/index"; import { Scalar } from "@scalar/hono-api-reference"; import { openAPIRouteHandler } from "hono-openapi"; -//initialize socket.io server -const io = new Server(); - -//initialize bun engine -//then bind to socket.io server -const engine = new Engine(); -io.bind(engine); -/* -io.on("connection", (socket) => { - //get userId and clientId from query params - const userId = socket.handshake.query.userId - const clientId = socket.handshake.query.clientId - if (!userId || Array.isArray(userId)) { - socket.disconnect(); - throw new Error("Invalid user ID"); - } - - if (!clientId || Array.isArray(clientId)) { - socket.disconnect(); - throw new Error("Invalid client ID"); - } - - - socket.join(userId); - console.log( - `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ) - - socket.on("disconnect", () => { - console.log(`User ${userId} disconnected from socket ${socket.id}`); - }) -}); -*/ - -io.on("ping", (socket) => { - console.log(`New client connected: ${socket.id}`) - socket.emit("pong", socket. ) -}) - - +// Routes const app = new Hono(); app.use( "*", cors({ origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization"], + allowHeaders: ["Content-Type", "Authorization", "Access-Control-Allow-Origin"], allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], credentials: true, }), @@ -76,4 +37,44 @@ app.get( app.get("/scalar", Scalar({ url: "/openapi" })); -export default app; +// initialize socket.io server +const io = new Server({ + cors: { + origin: "http://localhost:5173", + credentials: true, + } +}); +const engine = new Engine(); +io.bind(engine); + +// Register socket.io events +io.on("connection", (socket) => { + console.log("connected1"); + socket.on("ping", (c) => { + console.log(c); + socket.emit("pong", c); + }); +}); + +const { websocket } = engine.handler(); + +export default { + port: 3000, + idleTimeout: 30, // must be greater than the "pingInterval" option of the engine, which defaults to 25 seconds + + async fetch(req: Request, server: Bun.Server) { + const url = new URL(req.url); + + if (url.pathname === "/socket.io/") { + const response = await engine.handleRequest(req, server); + // Add CORS headers explicitly + response.headers.set("Access-Control-Allow-Origin", "http://localhost:5173"); + response.headers.set("Access-Control-Allow-Credentials", "true"); + return response; + } else { + return app.fetch(req, server); + } + }, + + websocket +}; diff --git a/concord-server/src/routes/index.ts b/concord-server/src/routes/index.ts index bb09610..84ef367 100644 --- a/concord-server/src/routes/index.ts +++ b/concord-server/src/routes/index.ts @@ -5,7 +5,6 @@ import messageRoutes from "./messageRoutes"; import { channelRoutes } from "./channelRoutes"; import instanceRoutes from "./instanceRoutes"; import { categoryRoutes } from "./categoryRoutes"; -import realtimeRoutes from "./realtimeRoutes"; const routes = new Hono(); @@ -14,6 +13,5 @@ routes.route("/message", messageRoutes); routes.route("/channel", channelRoutes); routes.route("/instance", instanceRoutes); routes.route("/category", categoryRoutes); -routes.route("/realtime", realtimeRoutes); export default routes; From b3e24f449387e257685521d8d714a4bf5d9bc52f Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 03:18:11 -0400 Subject: [PATCH 3/3] pretty + socket separation --- concord-server/src/index.ts | 24 +++++++------- concord-server/src/sockets/index.ts | 16 +++++++++ concord-server/src/sockets/voiceHandler.ts | 33 +++++++++++++++++++ .../src/validators/channelValidator.ts | 2 +- .../src/validators/realtimeValidator.ts | 20 ++++++----- 5 files changed, 74 insertions(+), 21 deletions(-) create mode 100644 concord-server/src/sockets/index.ts create mode 100644 concord-server/src/sockets/voiceHandler.ts diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1332236..816c3af 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -5,6 +5,7 @@ import { Server } from "socket.io"; import routes from "./routes/index"; import { Scalar } from "@scalar/hono-api-reference"; import { openAPIRouteHandler } from "hono-openapi"; +import { registerSocketHandlers } from "./sockets"; // Routes const app = new Hono(); @@ -13,7 +14,11 @@ app.use( "*", cors({ origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization", "Access-Control-Allow-Origin"], + allowHeaders: [ + "Content-Type", + "Authorization", + "Access-Control-Allow-Origin", + ], allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], credentials: true, }), @@ -42,19 +47,13 @@ const io = new Server({ cors: { origin: "http://localhost:5173", credentials: true, - } + }, }); const engine = new Engine(); io.bind(engine); // Register socket.io events -io.on("connection", (socket) => { - console.log("connected1"); - socket.on("ping", (c) => { - console.log(c); - socket.emit("pong", c); - }); -}); +registerSocketHandlers(io); const { websocket } = engine.handler(); @@ -68,7 +67,10 @@ export default { if (url.pathname === "/socket.io/") { const response = await engine.handleRequest(req, server); // Add CORS headers explicitly - response.headers.set("Access-Control-Allow-Origin", "http://localhost:5173"); + response.headers.set( + "Access-Control-Allow-Origin", + "http://localhost:5173", + ); response.headers.set("Access-Control-Allow-Credentials", "true"); return response; } else { @@ -76,5 +78,5 @@ export default { } }, - websocket + websocket, }; diff --git a/concord-server/src/sockets/index.ts b/concord-server/src/sockets/index.ts new file mode 100644 index 0000000..816e14d --- /dev/null +++ b/concord-server/src/sockets/index.ts @@ -0,0 +1,16 @@ +import { Server } from "socket.io"; +import { registerVoiceHandlers } from "./voiceHandler"; + +export function registerSocketHandlers(io: Server) { + // bad practice + io.on("connection", (socket) => { + console.log("connected"); + socket.on("ping", (c) => { + console.log(c); + socket.emit("pong", c); + }); + }); + + // good practice + registerVoiceHandlers(io); +} diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts new file mode 100644 index 0000000..1dc0484 --- /dev/null +++ b/concord-server/src/sockets/voiceHandler.ts @@ -0,0 +1,33 @@ +import { Server, Socket } from "socket.io"; + +//TEST IGNORE +export function registerVoiceHandlers(io: Server) { + io.on("connection", (socket: Socket) => { + console.log(`Voice socket connected: ${socket.id}`); + + socket.on("join-voice-channel", (channelId: string) => { + socket.join(channelId); + console.log(`Socket ${socket.id} joined voice channel ${channelId}`); + // Optionally, notify others in the channel + socket.to(channelId).emit("user-joined-voice", socket.id); + }); + + socket.on("leave-voice-channel", (channelId: string) => { + socket.leave(channelId); + console.log(`Socket ${socket.id} left voice channel ${channelId}`); + // Optionally, notify others in the channel + socket.to(channelId).emit("user-left-voice", socket.id); + }); + + socket.on("voice-data", (channelId: string, data: any) => { + // Broadcast voice data to all other clients in the same channel + socket.to(channelId).emit("voice-data", socket.id, data); + }); + + socket.on("disconnect", () => { + console.log(`Voice socket disconnected: ${socket.id}`); + // Handle user leaving all voice channels they were in + // (e.g., iterate through socket.rooms if you need to emit to specific channels) + }); + }); +} diff --git a/concord-server/src/validators/channelValidator.ts b/concord-server/src/validators/channelValidator.ts index 626e1fb..23cb918 100644 --- a/concord-server/src/validators/channelValidator.ts +++ b/concord-server/src/validators/channelValidator.ts @@ -52,5 +52,5 @@ export type GetChannelsByCategoryIdInput = z.infer< export type UpdateChannelInput = z.infer; export type DeleteChannelInput = z.infer; export type DeleteChannelsByCategoryIdInput = z.infer< -typeof deleteChannelsByCategoryIdSchema + typeof deleteChannelsByCategoryIdSchema >; diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts index f9c971d..859b4e8 100644 --- a/concord-server/src/validators/realtimeValidator.ts +++ b/concord-server/src/validators/realtimeValidator.ts @@ -1,16 +1,18 @@ import { z } from "zod"; export const postMessageToChannelSchema = z.object({ - instanceId: z.uuidv7(), - categoryId: z.uuidv7(), - channelId: z.uuidv7(), - userId: z.uuidv7(), - content: z.string().min(1).max(2000), - repliedMessageId: z.uuidv7().optional(), - token: z.string(), + instanceId: z.uuidv7(), + categoryId: z.uuidv7(), + channelId: z.uuidv7(), + userId: z.uuidv7(), + content: z.string().min(1).max(2000), + repliedMessageId: z.uuidv7().optional(), + token: z.string(), }); //TODO: add more realtime related validators as needed -export type PostMessageToChannelInput = z.infer; -//TODO: create more input schemas for other realtime actions \ No newline at end of file +export type PostMessageToChannelInput = z.infer< + typeof postMessageToChannelSchema +>; +//TODO: create more input schemas for other realtime actions