From db9a9a5d1363da6a9b092b4065d35a4eaf9ebe32 Mon Sep 17 00:00:00 2001 From: PrimarchPaul Date: Sun, 28 Sep 2025 00:41:42 -0400 Subject: [PATCH 01/10] feat: send message to channel event --- concord-server/schema.prisma | 1 - concord-server/src/services/realtime.ts | 142 +++++++++++++++++- concord-server/src/validators/index.ts | 71 +++++++++ .../src/validators/realtimeValidator.ts | 0 4 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 concord-server/src/validators/index.ts create mode 100644 concord-server/src/validators/realtimeValidator.ts diff --git a/concord-server/schema.prisma b/concord-server/schema.prisma index a4fbfa8..b0cc65d 100644 --- a/concord-server/schema.prisma +++ b/concord-server/schema.prisma @@ -89,7 +89,6 @@ model Channel { model ChannelPin { messageId String @unique channelId String @unique - Message Message @relation(fields: [messageId], references: [id]) Channel Channel @relation(fields: [channelId], references: [id]) createdAt DateTime @default(now()) diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts index 434de72..9083788 100644 --- a/concord-server/src/services/realtime.ts +++ b/concord-server/src/services/realtime.ts @@ -1,15 +1,23 @@ -import { readonly } from "zod"; +import { PrismaClient } from "@prisma/client"; +import type { Server, Socket } from "socket.io"; +import { sendMessageToChannel } from "./messageService"; + +const prisma = new PrismaClient(); const EVENTS = { NEW_CHANNEL_MESSAGE: "new_channel_message", DELETE_CHANNEL_MESSAGE: "delete_channel_message", + MESSAGE_PING: "message_ping", }; -export async function sendMessageToChannel( +export async function sendMessageToChannelEvent( instanceId: string, categoryId: string, channelId: string, - message: any, + userId: string, + content: string, + token: string, + repliedMessageId: string | null, event: string, io: any, ): Promise { @@ -19,10 +27,15 @@ export async function sendMessageToChannel( throw new Error("Event not implemented"); } - //TODO: add prisma to save channel message to DB + const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId); + + if (!newMessage) { + console.log("services::realtime::sendMessageToChannel - could not create new message"); + return "failed to create message" + } return new Promise((resolve) => { - io.to(instanceId).emit(event, message, (ack: any) => { + io.to(instanceId).emit(event, newMessage, (ack: any) => { if (ack && ack.status === "received") { console.log(`Message ${ack.messageId} acknowledged by client.`); resolve(true); @@ -88,3 +101,122 @@ export async function removeMessageFromChannel( return false; } } + + +export async function messagePing( + instanceId: string, + categoryId: string, + channelId: string, + messageId: string, + userId: string, + pingType: string, + text: string, + io: any, + mentionedUserIds?: string[], +): Promise { + try { + const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } }); + if (!curInstance) throw new Error("instance not found"); + + const curCategory = await prisma.category.findUnique({ where: { id: categoryId } }); + if (!curCategory) throw new Error("category not found"); + + const curChannel = await prisma.channel.findUnique({ where: { id: channelId } }); + if (!curChannel) throw new Error("channel not found"); + + if (pingType === "mention") { + if (!mentionedUserIds || mentionedUserIds.length === 0) { + throw new Error("no mentioned users provided for mention ping"); + } + + // Persist pings (best-effort) + try { + const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m })); + await prisma.messagePing.createMany({ data: rows, skipDuplicates: true }); + } catch (e) { + console.warn("services::realtime::messagePing - could not persist pings", e); + } + + const timeoutMs = 5000; + const results: Array<{ userId: string; delivered: boolean }> = []; + + // For each mentioned user, find sockets on this server with matching socket.data.userId + await Promise.all( + mentionedUserIds.map(async (mentionedUserId) => { + const socketsForUser: Socket[] = []; + for (const sock of Array.from((io as Server).sockets.sockets.values())) { + try { + if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket); + } catch {} + } + + if (socketsForUser.length === 0) { + results.push({ userId: mentsocketsionedUserId, delivered: false }); + return; + } + + const perSocket = socketsForUser.map( + (sock) => + new Promise((resolve) => { + let done = false; + try { + sock.emit( + EVENTS.MESSAGE_PING, + { categoryId, channelId, messageId, pingType, text, mentionedUserId }, + (ack: any) => { + if (done) return; + done = true; + resolve(!!ack && ack.status === "received"); + }, + ); + } catch (e) { + if (!done) { + done = true; + resolve(false); + } + } + + setTimeout(() => { + if (done) return; + done = true; + resolve(false); + }, timeoutMs); + }), + ); + + try { + const settled = await Promise.all(perSocket); + const delivered = settled.some(Boolean); + results.push({ userId: mentionedUserId, delivered }); + } catch { + results.push({ userId: mentionedUserId, delivered: false }); + } + }), + ); + + const anyDelivered = results.some((r) => r.delivered); + if (anyDelivered) { + console.log("services::realtime::messagePing delivered to some users", results); + return true; + } + + console.log("services::realtime::messagePing no acknowledgments", results); + return "no acknowledgment"; + } + + // Fallback: emit a generic ping to the instance (fire-and-forget ack optional) + return new Promise((resolve) => { + (io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => { + if (ack && ack.status === "received") { + resolve(true); + } else { + resolve("no acknowledgment"); + } + }); + }); + } catch (err) { + const errMessage = err as Error; + console.log("services::realtime::messagePing - ", errMessage); + return false; + } +} \ No newline at end of file diff --git a/concord-server/src/validators/index.ts b/concord-server/src/validators/index.ts new file mode 100644 index 0000000..1348a03 --- /dev/null +++ b/concord-server/src/validators/index.ts @@ -0,0 +1,71 @@ +import { Hono } from "hono"; +import { cors } from "hono/cors"; +import { Server as Engine } from "@socket.io/bun-engine"; +import { Server } from "socket.io"; +import routes from "./routes/index"; +import { Scalar } from "@scalar/hono-api-reference"; +import { openAPIRouteHandler } from "hono-openapi"; + +//initialize socket.io server +const io = new Server(); + +//initialize bun engine +//then bind to socket.io server +const engine = new Engine(); +io.bind(engine); + +io.on("connection", (socket) => { + //get userId and clientId from query params + const userId = socket.handshake.query.userId; + const clientId = socket.handshake.query.clientId; + if (!userId || Array.isArray(userId)) { + socket.disconnect(); + throw new Error("Invalid user ID"); + } + + if (!clientId || Array.isArray(clientId)) { + socket.disconnect(); + throw new Error("Invalid client ID"); + } + + socket.join(userId); + console.log( + `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, + ); + + socket.on("disconnect", () => { + console.log(`User ${userId} disconnected from socket ${socket.id}`); + }); +}); + +const app = new Hono(); + +app.use( + "*", + cors({ + origin: "http://localhost:5173", + allowHeaders: ["Content-Type", "Authorization"], + allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], + credentials: true, + }), +); + +app.route("/api", routes); + +app.get( + "/openapi", + openAPIRouteHandler(app, { + documentation: { + info: { + title: "Hono API", + version: "1.0.0", + description: "Greeting API", + }, + servers: [{ url: "http://localhost:3000", description: "Local Server" }], + }, + }), +); + +app.get("/scalar", Scalar({ url: "/openapi" })); + +export default app; diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts new file mode 100644 index 0000000..e69de29 From 6ef53fd964577794c8cf17e0d46d491c39ce6024 Mon Sep 17 00:00:00 2001 From: PrimarchPaul Date: Sun, 28 Sep 2025 01:45:43 -0400 Subject: [PATCH 02/10] feat: completed messagetochannel endpoint --- concord-server/schema.prisma | 2 +- .../{realtime.ts => realtimeController.ts} | 82 +++++------------ concord-server/src/routes/realtime.ts | 29 ------ concord-server/src/routes/realtimeRoutes.ts | 88 +++++++++++++++++++ .../src/validators/channelValidator.ts | 2 +- concord-server/src/validators/index.ts | 71 --------------- .../src/validators/realtimeValidator.ts | 16 ++++ 7 files changed, 129 insertions(+), 161 deletions(-) rename concord-server/src/controller/{realtime.ts => realtimeController.ts} (52%) delete mode 100644 concord-server/src/routes/realtime.ts create mode 100644 concord-server/src/routes/realtimeRoutes.ts delete mode 100644 concord-server/src/validators/index.ts diff --git a/concord-server/schema.prisma b/concord-server/schema.prisma index b0cc65d..bdfcf43 100644 --- a/concord-server/schema.prisma +++ b/concord-server/schema.prisma @@ -103,7 +103,7 @@ model Message { User User @relation(fields: [userId], references: [id]) userId String deleted Boolean - text String + text String @db.VarChar(2000) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt replies Reply? @relation("MessageToReply") diff --git a/concord-server/src/controller/realtime.ts b/concord-server/src/controller/realtimeController.ts similarity index 52% rename from concord-server/src/controller/realtime.ts rename to concord-server/src/controller/realtimeController.ts index 5f9b961..834e4b6 100644 --- a/concord-server/src/controller/realtime.ts +++ b/concord-server/src/controller/realtimeController.ts @@ -1,68 +1,32 @@ import { Context } from "hono"; import { - sendMessageToChannel, + sendMessageToChannelEvent, removeMessageFromChannel, } from "../services/realtime.js"; import { success } from "zod"; +import { PostMessageToChannelInput } from "../validators/realtimeValidator.js"; +import { sendMessage } from "./messageController.js"; -export async function postMessageToChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const message = await c.req.json(); - - const result = await sendMessageToChannel( - instanceId, - categoryId, - channelId, - message, - "new_channel_message", - io, - ); - - if (result === "Event not implemented") { - console.log( - "controller::realtime::postMessageToChannel - Failed to send message", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::postMessageToChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to send message"); - } - - return c.json({ - success: true, - message: "Message sent successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("controller::realtime::postMessageToChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } +export async function postMessageToChannel(io: any, c: Context, data: PostMessageToChannelInput) { + const instanceId = data.instanceId; + const categoryId = data.categoryId; + const channelId = data.channelId; + const userId = data.userId; + const content = data.content; + const token = data.token; + const repliedMessageId = data.repliedMessageId ?? null; + const event = "new_channel_message"; + return sendMessageToChannelEvent( + instanceId, + categoryId, + channelId, + userId, + content, + token, + repliedMessageId, + event, + io + ); } export async function deleteMessageFromChannel(io: any, c: Context) { diff --git a/concord-server/src/routes/realtime.ts b/concord-server/src/routes/realtime.ts deleted file mode 100644 index 933b839..0000000 --- a/concord-server/src/routes/realtime.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Hono } from "hono"; -import { zValidator } from "@hono/zod-validator"; -import { describeRoute, resolver } from "hono-openapi"; -import { - postMessageToChannel, - deleteMessageFromChannel, -} from "../controller/realtime"; - -const app = new Hono(); - -app.post( - "message/", - zValidator({ - body: z.object({ - content: z.string().min(1).max(500), - }), - }), - async (c) => { - const { instanceId, categoryId, channelId } = c.req.params; - const { content } = c.req.body; - - return postMessageToChannel(c.get("io"), { - instanceId, - categoryId, - channelId, - content, - }); - }, -); diff --git a/concord-server/src/routes/realtimeRoutes.ts b/concord-server/src/routes/realtimeRoutes.ts new file mode 100644 index 0000000..aa496a6 --- /dev/null +++ b/concord-server/src/routes/realtimeRoutes.ts @@ -0,0 +1,88 @@ +import { Hono } from "hono"; +import { zValidator } from "@hono/zod-validator"; +import { describeRoute, resolver } from "hono-openapi"; +import { + postMessageToChannelSchema +} from "../validators/realtimeValidator.js"; +import { + postMessageToChannel, + deleteMessageFromChannel, +} from "../controller/realtimeController"; + + +const realtimeRoutes = new Hono(); + +realtimeRoutes.post( + "message/:instanceId/:categoryId/:channelId", + describeRoute({ + description: "Post a message to a channel", + responses: { + 200: { + description: "Message posted successfully", + content: { + "application/json": { schema: resolver(postMessageToChannelSchema) }, + }, + }, + 400: { + description: "Bad Request - Invalid input data", + content: { + "application/json": { schema: resolver(postMessageToChannelSchema) }, + }, + }, + 401: { + description: "Unauthorized - Invalid token", + content: { + "application/json": { schema: resolver(postMessageToChannelSchema) }, + }, + }, + 404: { + description: "Instance, Category, Channel, or User not found", + content: { + "application/json": { schema: resolver(postMessageToChannelSchema) }, + }, + }, + 500: { + description: "Server error", + content: { + "application/json": { schema: resolver(postMessageToChannelSchema) }, + }, + }, + }, + }), + zValidator("json", postMessageToChannelSchema), + async (c) => { + const instanceId = c.req.param("instanceId"); + const categoryId = c.req.param("categoryId"); + const channelId = c.req.param("channelId"); + const { userId, content, repliedMessageId, token } = await c.req.json(); + + const ioServer = (c as any).get("io"); + if (!ioServer) { + return c.json({ success: false, error: "Realtime server not available" }, 500); + } + + const result = await postMessageToChannel(ioServer, c, { + instanceId, + categoryId, + channelId, + userId, + content, + token, + repliedMessageId: repliedMessageId ?? null, + }) + + if (result === "event not implemented") { + return c.json({ success: false, message: "Event not implemented or recognized" }, 400); + } + + if (result === "no acknowledgment") { + return c.json({ success: false, message: "No acknowledgment received from client" }, 500); + } + + if (!result) { + return c.json({ success: false, message: "Failed to post message" }, 500); + } + + return c.json({ success: true, result }, 200); + } +); \ No newline at end of file diff --git a/concord-server/src/validators/channelValidator.ts b/concord-server/src/validators/channelValidator.ts index 23cb918..626e1fb 100644 --- a/concord-server/src/validators/channelValidator.ts +++ b/concord-server/src/validators/channelValidator.ts @@ -52,5 +52,5 @@ export type GetChannelsByCategoryIdInput = z.infer< export type UpdateChannelInput = z.infer; export type DeleteChannelInput = z.infer; export type DeleteChannelsByCategoryIdInput = z.infer< - typeof deleteChannelsByCategoryIdSchema +typeof deleteChannelsByCategoryIdSchema >; diff --git a/concord-server/src/validators/index.ts b/concord-server/src/validators/index.ts deleted file mode 100644 index 1348a03..0000000 --- a/concord-server/src/validators/index.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { Hono } from "hono"; -import { cors } from "hono/cors"; -import { Server as Engine } from "@socket.io/bun-engine"; -import { Server } from "socket.io"; -import routes from "./routes/index"; -import { Scalar } from "@scalar/hono-api-reference"; -import { openAPIRouteHandler } from "hono-openapi"; - -//initialize socket.io server -const io = new Server(); - -//initialize bun engine -//then bind to socket.io server -const engine = new Engine(); -io.bind(engine); - -io.on("connection", (socket) => { - //get userId and clientId from query params - const userId = socket.handshake.query.userId; - const clientId = socket.handshake.query.clientId; - if (!userId || Array.isArray(userId)) { - socket.disconnect(); - throw new Error("Invalid user ID"); - } - - if (!clientId || Array.isArray(clientId)) { - socket.disconnect(); - throw new Error("Invalid client ID"); - } - - socket.join(userId); - console.log( - `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ); - - socket.on("disconnect", () => { - console.log(`User ${userId} disconnected from socket ${socket.id}`); - }); -}); - -const app = new Hono(); - -app.use( - "*", - cors({ - origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization"], - allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], - credentials: true, - }), -); - -app.route("/api", routes); - -app.get( - "/openapi", - openAPIRouteHandler(app, { - documentation: { - info: { - title: "Hono API", - version: "1.0.0", - description: "Greeting API", - }, - servers: [{ url: "http://localhost:3000", description: "Local Server" }], - }, - }), -); - -app.get("/scalar", Scalar({ url: "/openapi" })); - -export default app; diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts index e69de29..f9c971d 100644 --- a/concord-server/src/validators/realtimeValidator.ts +++ b/concord-server/src/validators/realtimeValidator.ts @@ -0,0 +1,16 @@ +import { z } from "zod"; + +export const postMessageToChannelSchema = z.object({ + instanceId: z.uuidv7(), + categoryId: z.uuidv7(), + channelId: z.uuidv7(), + userId: z.uuidv7(), + content: z.string().min(1).max(2000), + repliedMessageId: z.uuidv7().optional(), + token: z.string(), +}); + +//TODO: add more realtime related validators as needed + +export type PostMessageToChannelInput = z.infer; +//TODO: create more input schemas for other realtime actions \ No newline at end of file From de827ad441e8a844aa4e0167e671e4e61d2f44af Mon Sep 17 00:00:00 2001 From: PrimarchPaul Date: Sun, 28 Sep 2025 01:49:24 -0400 Subject: [PATCH 03/10] oops, forgot to mount the routes --- concord-server/src/routes/index.ts | 2 ++ concord-server/src/routes/realtimeRoutes.ts | 8 +++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/concord-server/src/routes/index.ts b/concord-server/src/routes/index.ts index 84ef367..bb09610 100644 --- a/concord-server/src/routes/index.ts +++ b/concord-server/src/routes/index.ts @@ -5,6 +5,7 @@ import messageRoutes from "./messageRoutes"; import { channelRoutes } from "./channelRoutes"; import instanceRoutes from "./instanceRoutes"; import { categoryRoutes } from "./categoryRoutes"; +import realtimeRoutes from "./realtimeRoutes"; const routes = new Hono(); @@ -13,5 +14,6 @@ routes.route("/message", messageRoutes); routes.route("/channel", channelRoutes); routes.route("/instance", instanceRoutes); routes.route("/category", categoryRoutes); +routes.route("/realtime", realtimeRoutes); export default routes; diff --git a/concord-server/src/routes/realtimeRoutes.ts b/concord-server/src/routes/realtimeRoutes.ts index aa496a6..cb44dd1 100644 --- a/concord-server/src/routes/realtimeRoutes.ts +++ b/concord-server/src/routes/realtimeRoutes.ts @@ -13,7 +13,7 @@ import { const realtimeRoutes = new Hono(); realtimeRoutes.post( - "message/:instanceId/:categoryId/:channelId", + "/message/:instanceId/:categoryId/:channelId", describeRoute({ description: "Post a message to a channel", responses: { @@ -70,7 +70,7 @@ realtimeRoutes.post( token, repliedMessageId: repliedMessageId ?? null, }) - + if (result === "event not implemented") { return c.json({ success: false, message: "Event not implemented or recognized" }, 400); } @@ -85,4 +85,6 @@ realtimeRoutes.post( return c.json({ success: true, result }, 200); } -); \ No newline at end of file +); + +export default realtimeRoutes; \ No newline at end of file From 4adcf810046d5fc194458caf4f1ff4c0a1e4391c Mon Sep 17 00:00:00 2001 From: PrimarchPaul Date: Sun, 28 Sep 2025 02:10:41 -0400 Subject: [PATCH 04/10] nuked all realtime code --- .../src/controller/realtimeController.ts | 90 ------- concord-server/src/index.ts | 18 +- concord-server/src/routes/realtimeRoutes.ts | 90 ------- concord-server/src/services/realtime.ts | 222 ------------------ 4 files changed, 13 insertions(+), 407 deletions(-) delete mode 100644 concord-server/src/controller/realtimeController.ts delete mode 100644 concord-server/src/routes/realtimeRoutes.ts delete mode 100644 concord-server/src/services/realtime.ts diff --git a/concord-server/src/controller/realtimeController.ts b/concord-server/src/controller/realtimeController.ts deleted file mode 100644 index 834e4b6..0000000 --- a/concord-server/src/controller/realtimeController.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Context } from "hono"; -import { - sendMessageToChannelEvent, - removeMessageFromChannel, -} from "../services/realtime.js"; -import { success } from "zod"; -import { PostMessageToChannelInput } from "../validators/realtimeValidator.js"; -import { sendMessage } from "./messageController.js"; - -export async function postMessageToChannel(io: any, c: Context, data: PostMessageToChannelInput) { - const instanceId = data.instanceId; - const categoryId = data.categoryId; - const channelId = data.channelId; - const userId = data.userId; - const content = data.content; - const token = data.token; - const repliedMessageId = data.repliedMessageId ?? null; - const event = "new_channel_message"; - return sendMessageToChannelEvent( - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId, - event, - io - ); -} - -export async function deleteMessageFromChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const messageId = c.req.param("messageId"); - - const result = await removeMessageFromChannel( - instanceId, - categoryId, - channelId, - messageId, - "delete_channel_message", - io, - ); - - if (result === "event not implemented") { - console.log( - "controller::realtime::deleteMessageFromChannel - Event not implemented", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::deleteMessageFromChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to delete message"); - } - - c.json({ - success: true, - message: "Message deleted successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } -} diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1348a03..5356a92 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -13,11 +13,11 @@ const io = new Server(); //then bind to socket.io server const engine = new Engine(); io.bind(engine); - +/* io.on("connection", (socket) => { //get userId and clientId from query params - const userId = socket.handshake.query.userId; - const clientId = socket.handshake.query.clientId; + const userId = socket.handshake.query.userId + const clientId = socket.handshake.query.clientId if (!userId || Array.isArray(userId)) { socket.disconnect(); throw new Error("Invalid user ID"); @@ -28,15 +28,23 @@ io.on("connection", (socket) => { throw new Error("Invalid client ID"); } + socket.join(userId); console.log( `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ); + ) socket.on("disconnect", () => { console.log(`User ${userId} disconnected from socket ${socket.id}`); - }); + }) }); +*/ + +io.on("ping", (socket) => { + console.log(`New client connected: ${socket.id}`) + socket.emit("pong", socket. ) +}) + const app = new Hono(); diff --git a/concord-server/src/routes/realtimeRoutes.ts b/concord-server/src/routes/realtimeRoutes.ts deleted file mode 100644 index cb44dd1..0000000 --- a/concord-server/src/routes/realtimeRoutes.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Hono } from "hono"; -import { zValidator } from "@hono/zod-validator"; -import { describeRoute, resolver } from "hono-openapi"; -import { - postMessageToChannelSchema -} from "../validators/realtimeValidator.js"; -import { - postMessageToChannel, - deleteMessageFromChannel, -} from "../controller/realtimeController"; - - -const realtimeRoutes = new Hono(); - -realtimeRoutes.post( - "/message/:instanceId/:categoryId/:channelId", - describeRoute({ - description: "Post a message to a channel", - responses: { - 200: { - description: "Message posted successfully", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 400: { - description: "Bad Request - Invalid input data", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 401: { - description: "Unauthorized - Invalid token", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 404: { - description: "Instance, Category, Channel, or User not found", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 500: { - description: "Server error", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - }, - }), - zValidator("json", postMessageToChannelSchema), - async (c) => { - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const { userId, content, repliedMessageId, token } = await c.req.json(); - - const ioServer = (c as any).get("io"); - if (!ioServer) { - return c.json({ success: false, error: "Realtime server not available" }, 500); - } - - const result = await postMessageToChannel(ioServer, c, { - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId: repliedMessageId ?? null, - }) - - if (result === "event not implemented") { - return c.json({ success: false, message: "Event not implemented or recognized" }, 400); - } - - if (result === "no acknowledgment") { - return c.json({ success: false, message: "No acknowledgment received from client" }, 500); - } - - if (!result) { - return c.json({ success: false, message: "Failed to post message" }, 500); - } - - return c.json({ success: true, result }, 200); - } -); - -export default realtimeRoutes; \ No newline at end of file diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts deleted file mode 100644 index 9083788..0000000 --- a/concord-server/src/services/realtime.ts +++ /dev/null @@ -1,222 +0,0 @@ -import { PrismaClient } from "@prisma/client"; -import type { Server, Socket } from "socket.io"; -import { sendMessageToChannel } from "./messageService"; - -const prisma = new PrismaClient(); - -const EVENTS = { - NEW_CHANNEL_MESSAGE: "new_channel_message", - DELETE_CHANNEL_MESSAGE: "delete_channel_message", - MESSAGE_PING: "message_ping", -}; - -export async function sendMessageToChannelEvent( - instanceId: string, - categoryId: string, - channelId: string, - userId: string, - content: string, - token: string, - repliedMessageId: string | null, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.NEW_CHANNEL_MESSAGE === event) { - throw new Error("Event not implemented"); - } - - const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId); - - if (!newMessage) { - console.log("services::realtime::sendMessageToChannel - could not create new message"); - return "failed to create message" - } - - return new Promise((resolve) => { - io.to(instanceId).emit(event, newMessage, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::sendMessageToChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::sendMessageToChannel - Event not implemented. Attempted event: ${event}`, - ); - return "event not implemented"; - } - console.log("services::realtime::sendMessageToChannel - ", errMessage); - return false; - } -} - -export async function removeMessageFromChannel( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.DELETE_CHANNEL_MESSAGE === event) { - throw new Error("event not implemented"); - } - - //TODO: add prisma to flag a channel message as deleted - - return new Promise((resolve) => { - io.to(instanceId).emit(event, { messageId }, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::deleteMessageFromChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::deleteMessageFromChannel - Event not implemented. Attempted event: ${event}`, - ); - return false; - } - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return false; - } -} - - -export async function messagePing( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - userId: string, - pingType: string, - text: string, - io: any, - mentionedUserIds?: string[], -): Promise { - try { - const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } }); - if (!curInstance) throw new Error("instance not found"); - - const curCategory = await prisma.category.findUnique({ where: { id: categoryId } }); - if (!curCategory) throw new Error("category not found"); - - const curChannel = await prisma.channel.findUnique({ where: { id: channelId } }); - if (!curChannel) throw new Error("channel not found"); - - if (pingType === "mention") { - if (!mentionedUserIds || mentionedUserIds.length === 0) { - throw new Error("no mentioned users provided for mention ping"); - } - - // Persist pings (best-effort) - try { - const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m })); - await prisma.messagePing.createMany({ data: rows, skipDuplicates: true }); - } catch (e) { - console.warn("services::realtime::messagePing - could not persist pings", e); - } - - const timeoutMs = 5000; - const results: Array<{ userId: string; delivered: boolean }> = []; - - // For each mentioned user, find sockets on this server with matching socket.data.userId - await Promise.all( - mentionedUserIds.map(async (mentionedUserId) => { - const socketsForUser: Socket[] = []; - for (const sock of Array.from((io as Server).sockets.sockets.values())) { - try { - if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket); - } catch {} - } - - if (socketsForUser.length === 0) { - results.push({ userId: mentsocketsionedUserId, delivered: false }); - return; - } - - const perSocket = socketsForUser.map( - (sock) => - new Promise((resolve) => { - let done = false; - try { - sock.emit( - EVENTS.MESSAGE_PING, - { categoryId, channelId, messageId, pingType, text, mentionedUserId }, - (ack: any) => { - if (done) return; - done = true; - resolve(!!ack && ack.status === "received"); - }, - ); - } catch (e) { - if (!done) { - done = true; - resolve(false); - } - } - - setTimeout(() => { - if (done) return; - done = true; - resolve(false); - }, timeoutMs); - }), - ); - - try { - const settled = await Promise.all(perSocket); - const delivered = settled.some(Boolean); - results.push({ userId: mentionedUserId, delivered }); - } catch { - results.push({ userId: mentionedUserId, delivered: false }); - } - }), - ); - - const anyDelivered = results.some((r) => r.delivered); - if (anyDelivered) { - console.log("services::realtime::messagePing delivered to some users", results); - return true; - } - - console.log("services::realtime::messagePing no acknowledgments", results); - return "no acknowledgment"; - } - - // Fallback: emit a generic ping to the instance (fire-and-forget ack optional) - return new Promise((resolve) => { - (io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => { - if (ack && ack.status === "received") { - resolve(true); - } else { - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::messagePing - ", errMessage); - return false; - } -} \ No newline at end of file From 460991f2aec4a19844c27e94395d9f7b64bc6422 Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 03:11:34 -0400 Subject: [PATCH 05/10] socket.io work!!! --- concord-server/src/index.ts | 85 +++++++++++++++--------------- concord-server/src/routes/index.ts | 2 - 2 files changed, 43 insertions(+), 44 deletions(-) diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 5356a92..1332236 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -6,53 +6,14 @@ import routes from "./routes/index"; import { Scalar } from "@scalar/hono-api-reference"; import { openAPIRouteHandler } from "hono-openapi"; -//initialize socket.io server -const io = new Server(); - -//initialize bun engine -//then bind to socket.io server -const engine = new Engine(); -io.bind(engine); -/* -io.on("connection", (socket) => { - //get userId and clientId from query params - const userId = socket.handshake.query.userId - const clientId = socket.handshake.query.clientId - if (!userId || Array.isArray(userId)) { - socket.disconnect(); - throw new Error("Invalid user ID"); - } - - if (!clientId || Array.isArray(clientId)) { - socket.disconnect(); - throw new Error("Invalid client ID"); - } - - - socket.join(userId); - console.log( - `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ) - - socket.on("disconnect", () => { - console.log(`User ${userId} disconnected from socket ${socket.id}`); - }) -}); -*/ - -io.on("ping", (socket) => { - console.log(`New client connected: ${socket.id}`) - socket.emit("pong", socket. ) -}) - - +// Routes const app = new Hono(); app.use( "*", cors({ origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization"], + allowHeaders: ["Content-Type", "Authorization", "Access-Control-Allow-Origin"], allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], credentials: true, }), @@ -76,4 +37,44 @@ app.get( app.get("/scalar", Scalar({ url: "/openapi" })); -export default app; +// initialize socket.io server +const io = new Server({ + cors: { + origin: "http://localhost:5173", + credentials: true, + } +}); +const engine = new Engine(); +io.bind(engine); + +// Register socket.io events +io.on("connection", (socket) => { + console.log("connected1"); + socket.on("ping", (c) => { + console.log(c); + socket.emit("pong", c); + }); +}); + +const { websocket } = engine.handler(); + +export default { + port: 3000, + idleTimeout: 30, // must be greater than the "pingInterval" option of the engine, which defaults to 25 seconds + + async fetch(req: Request, server: Bun.Server) { + const url = new URL(req.url); + + if (url.pathname === "/socket.io/") { + const response = await engine.handleRequest(req, server); + // Add CORS headers explicitly + response.headers.set("Access-Control-Allow-Origin", "http://localhost:5173"); + response.headers.set("Access-Control-Allow-Credentials", "true"); + return response; + } else { + return app.fetch(req, server); + } + }, + + websocket +}; diff --git a/concord-server/src/routes/index.ts b/concord-server/src/routes/index.ts index bb09610..84ef367 100644 --- a/concord-server/src/routes/index.ts +++ b/concord-server/src/routes/index.ts @@ -5,7 +5,6 @@ import messageRoutes from "./messageRoutes"; import { channelRoutes } from "./channelRoutes"; import instanceRoutes from "./instanceRoutes"; import { categoryRoutes } from "./categoryRoutes"; -import realtimeRoutes from "./realtimeRoutes"; const routes = new Hono(); @@ -14,6 +13,5 @@ routes.route("/message", messageRoutes); routes.route("/channel", channelRoutes); routes.route("/instance", instanceRoutes); routes.route("/category", categoryRoutes); -routes.route("/realtime", realtimeRoutes); export default routes; From b3e24f449387e257685521d8d714a4bf5d9bc52f Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 03:18:11 -0400 Subject: [PATCH 06/10] pretty + socket separation --- concord-server/src/index.ts | 24 +++++++------- concord-server/src/sockets/index.ts | 16 +++++++++ concord-server/src/sockets/voiceHandler.ts | 33 +++++++++++++++++++ .../src/validators/channelValidator.ts | 2 +- .../src/validators/realtimeValidator.ts | 20 ++++++----- 5 files changed, 74 insertions(+), 21 deletions(-) create mode 100644 concord-server/src/sockets/index.ts create mode 100644 concord-server/src/sockets/voiceHandler.ts diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1332236..816c3af 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -5,6 +5,7 @@ import { Server } from "socket.io"; import routes from "./routes/index"; import { Scalar } from "@scalar/hono-api-reference"; import { openAPIRouteHandler } from "hono-openapi"; +import { registerSocketHandlers } from "./sockets"; // Routes const app = new Hono(); @@ -13,7 +14,11 @@ app.use( "*", cors({ origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization", "Access-Control-Allow-Origin"], + allowHeaders: [ + "Content-Type", + "Authorization", + "Access-Control-Allow-Origin", + ], allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], credentials: true, }), @@ -42,19 +47,13 @@ const io = new Server({ cors: { origin: "http://localhost:5173", credentials: true, - } + }, }); const engine = new Engine(); io.bind(engine); // Register socket.io events -io.on("connection", (socket) => { - console.log("connected1"); - socket.on("ping", (c) => { - console.log(c); - socket.emit("pong", c); - }); -}); +registerSocketHandlers(io); const { websocket } = engine.handler(); @@ -68,7 +67,10 @@ export default { if (url.pathname === "/socket.io/") { const response = await engine.handleRequest(req, server); // Add CORS headers explicitly - response.headers.set("Access-Control-Allow-Origin", "http://localhost:5173"); + response.headers.set( + "Access-Control-Allow-Origin", + "http://localhost:5173", + ); response.headers.set("Access-Control-Allow-Credentials", "true"); return response; } else { @@ -76,5 +78,5 @@ export default { } }, - websocket + websocket, }; diff --git a/concord-server/src/sockets/index.ts b/concord-server/src/sockets/index.ts new file mode 100644 index 0000000..816e14d --- /dev/null +++ b/concord-server/src/sockets/index.ts @@ -0,0 +1,16 @@ +import { Server } from "socket.io"; +import { registerVoiceHandlers } from "./voiceHandler"; + +export function registerSocketHandlers(io: Server) { + // bad practice + io.on("connection", (socket) => { + console.log("connected"); + socket.on("ping", (c) => { + console.log(c); + socket.emit("pong", c); + }); + }); + + // good practice + registerVoiceHandlers(io); +} diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts new file mode 100644 index 0000000..1dc0484 --- /dev/null +++ b/concord-server/src/sockets/voiceHandler.ts @@ -0,0 +1,33 @@ +import { Server, Socket } from "socket.io"; + +//TEST IGNORE +export function registerVoiceHandlers(io: Server) { + io.on("connection", (socket: Socket) => { + console.log(`Voice socket connected: ${socket.id}`); + + socket.on("join-voice-channel", (channelId: string) => { + socket.join(channelId); + console.log(`Socket ${socket.id} joined voice channel ${channelId}`); + // Optionally, notify others in the channel + socket.to(channelId).emit("user-joined-voice", socket.id); + }); + + socket.on("leave-voice-channel", (channelId: string) => { + socket.leave(channelId); + console.log(`Socket ${socket.id} left voice channel ${channelId}`); + // Optionally, notify others in the channel + socket.to(channelId).emit("user-left-voice", socket.id); + }); + + socket.on("voice-data", (channelId: string, data: any) => { + // Broadcast voice data to all other clients in the same channel + socket.to(channelId).emit("voice-data", socket.id, data); + }); + + socket.on("disconnect", () => { + console.log(`Voice socket disconnected: ${socket.id}`); + // Handle user leaving all voice channels they were in + // (e.g., iterate through socket.rooms if you need to emit to specific channels) + }); + }); +} diff --git a/concord-server/src/validators/channelValidator.ts b/concord-server/src/validators/channelValidator.ts index 626e1fb..23cb918 100644 --- a/concord-server/src/validators/channelValidator.ts +++ b/concord-server/src/validators/channelValidator.ts @@ -52,5 +52,5 @@ export type GetChannelsByCategoryIdInput = z.infer< export type UpdateChannelInput = z.infer; export type DeleteChannelInput = z.infer; export type DeleteChannelsByCategoryIdInput = z.infer< -typeof deleteChannelsByCategoryIdSchema + typeof deleteChannelsByCategoryIdSchema >; diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts index f9c971d..859b4e8 100644 --- a/concord-server/src/validators/realtimeValidator.ts +++ b/concord-server/src/validators/realtimeValidator.ts @@ -1,16 +1,18 @@ import { z } from "zod"; export const postMessageToChannelSchema = z.object({ - instanceId: z.uuidv7(), - categoryId: z.uuidv7(), - channelId: z.uuidv7(), - userId: z.uuidv7(), - content: z.string().min(1).max(2000), - repliedMessageId: z.uuidv7().optional(), - token: z.string(), + instanceId: z.uuidv7(), + categoryId: z.uuidv7(), + channelId: z.uuidv7(), + userId: z.uuidv7(), + content: z.string().min(1).max(2000), + repliedMessageId: z.uuidv7().optional(), + token: z.string(), }); //TODO: add more realtime related validators as needed -export type PostMessageToChannelInput = z.infer; -//TODO: create more input schemas for other realtime actions \ No newline at end of file +export type PostMessageToChannelInput = z.infer< + typeof postMessageToChannelSchema +>; +//TODO: create more input schemas for other realtime actions From 226ee3f998d872576bc5cbc3e0a97cf0f5afbd2a Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 05:39:05 -0400 Subject: [PATCH 07/10] Join voice channel with sockets --- .../20250928083913_maxchar2000/migration.sql | 8 ++ .../src/services/instanceService.ts | 61 +++++++++++++ concord-server/src/sockets/voiceHandler.ts | 85 +++++++++++++------ 3 files changed, 130 insertions(+), 24 deletions(-) create mode 100644 concord-server/migrations/20250928083913_maxchar2000/migration.sql diff --git a/concord-server/migrations/20250928083913_maxchar2000/migration.sql b/concord-server/migrations/20250928083913_maxchar2000/migration.sql new file mode 100644 index 0000000..096b66b --- /dev/null +++ b/concord-server/migrations/20250928083913_maxchar2000/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - You are about to alter the column `text` on the `Message` table. The data in that column could be lost. The data in that column will be cast from `Text` to `VarChar(2000)`. + +*/ +-- AlterTable +ALTER TABLE "public"."Message" ALTER COLUMN "text" SET DATA TYPE VARCHAR(2000); diff --git a/concord-server/src/services/instanceService.ts b/concord-server/src/services/instanceService.ts index bb4008a..7ac6381 100644 --- a/concord-server/src/services/instanceService.ts +++ b/concord-server/src/services/instanceService.ts @@ -57,3 +57,64 @@ export async function getAllInstances() { }; } } + +export async function getInstanceByChannelId(id: string) { + try { + const instance = await prisma.instance.findFirst({ + where: { + Category: { + some: { + Channel: { + some: { + id: id + } + } + } + } + }, + }); + + if (!instance) { + return null; + } + + return instance; + } catch (error) { + console.error("Error fetching instance by channel ID:", error); + return null; + } +} + +export async function getInstancesByUserId(id: string) { + try { + const user = await getUserInformation(id); + if (user && user.admin) { + const adminInstances = await getAllInstances(); + if (adminInstances && adminInstances.success) { + return adminInstances.data; + } + } + + const instance = await prisma.instance.findMany({ + where: { + Role: { + some: { + User: { + id: id + } + } + } + } + }); + + if (!instance) { + return null; + } + + return instance; + } catch (error) { + console.error("Error fetching instance by channel ID:", error); + return null; + } +} + diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts index 1dc0484..7be95c8 100644 --- a/concord-server/src/sockets/voiceHandler.ts +++ b/concord-server/src/sockets/voiceHandler.ts @@ -1,33 +1,70 @@ import { Server, Socket } from "socket.io"; +import { getUserCredentials, getUserInformation } from "../services/userService"; +import { getAllInstances, getInstanceByChannelId, getInstancesByUserId } from "../services/instanceService"; +import { getCategoriesByInstance, getCategory, getChannel } from "../services/channelService"; + +// Change to Map of voiceChannelId to Map of userId to socket +const voiceChannelMembers = new Map>(); -//TEST IGNORE export function registerVoiceHandlers(io: Server) { io.on("connection", (socket: Socket) => { - console.log(`Voice socket connected: ${socket.id}`); + // Join voice channel + socket.on("join-voicechannel", async (data) => { + const payload = data as { + userId: string + userToken: string, + voiceChannelId: string, + }; + if (!payload) { + socket.emit("error-voicechannel", "no payload in voice conn") + return; + } - socket.on("join-voice-channel", (channelId: string) => { - socket.join(channelId); - console.log(`Socket ${socket.id} joined voice channel ${channelId}`); - // Optionally, notify others in the channel - socket.to(channelId).emit("user-joined-voice", socket.id); + // Initialize map for channel if not present + if (!voiceChannelMembers.has(payload.voiceChannelId)) { + voiceChannelMembers.set(payload.voiceChannelId, new Map()); + } + + const channelMembers = voiceChannelMembers.get(payload.voiceChannelId)!; + + // Remove user if already present in this channel + if (channelMembers.has(payload.userId)) { + channelMembers.delete(payload.userId); + } + + // authenticate user + const userCreds = await getUserCredentials(payload.userId); + if (!userCreds || !userCreds.token || userCreds.token != payload.userToken) { + socket.emit("error-voicechannel", "bad user creds in voice conn"); + return; + } + + // determine if channel is voice channel + const channel = await getChannel(payload.voiceChannelId); + if (!channel || channel.type !== "voice" || !channel.categoryId) { + socket.emit("error-voicechannel", "bad channel or channel type in voice conn"); + return; + } + + // authorize user using role + const user = await getUserInformation(payload.userId); + const instance = await getInstanceByChannelId(payload.voiceChannelId); + const instances = await getInstancesByUserId(payload.userId); + if (!user || !instance || !instances || !instances.find(e => e.id === instance.id)) { + socket.emit("error-voicechannel", "user not authorized for channel in voice conn"); + return; + } + + // add to map + channelMembers.set(payload.userId, socket); + + socket.join(payload.voiceChannelId); + socket.emit("joined-voicechannel", { + voiceChannelId: payload.voiceChannelId, + connectedUserIds: Array.from(channelMembers.keys()).filter(e => e !== payload.userId) + }); + socket.to(payload.voiceChannelId).emit("user-joined-voicechannel", { userId: payload.userId }); }); - socket.on("leave-voice-channel", (channelId: string) => { - socket.leave(channelId); - console.log(`Socket ${socket.id} left voice channel ${channelId}`); - // Optionally, notify others in the channel - socket.to(channelId).emit("user-left-voice", socket.id); - }); - - socket.on("voice-data", (channelId: string, data: any) => { - // Broadcast voice data to all other clients in the same channel - socket.to(channelId).emit("voice-data", socket.id, data); - }); - - socket.on("disconnect", () => { - console.log(`Voice socket disconnected: ${socket.id}`); - // Handle user leaving all voice channels they were in - // (e.g., iterate through socket.rooms if you need to emit to specific channels) - }); }); } From 7c5d0a4badbe5f132753f2558e906901a2be85be Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 05:58:06 -0400 Subject: [PATCH 08/10] Handle disconnect (willful and otherwise) --- concord-server/src/sockets/voiceHandler.ts | 109 +++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts index 7be95c8..f2adf8c 100644 --- a/concord-server/src/sockets/voiceHandler.ts +++ b/concord-server/src/sockets/voiceHandler.ts @@ -64,7 +64,116 @@ export function registerVoiceHandlers(io: Server) { connectedUserIds: Array.from(channelMembers.keys()).filter(e => e !== payload.userId) }); socket.to(payload.voiceChannelId).emit("user-joined-voicechannel", { userId: payload.userId }); + + // Store userId in socket.data for easier access later + socket.data.userId = payload.userId; + socket.data.currentVoiceChannelId = payload.voiceChannelId; }); + // Leave voice channel + socket.on("leave-voicechannel", async (data) => { + const payload = data as { + userId: string, + userToken: string, + voiceChannelId: string, + }; + if (!payload) { + socket.emit("error-voicechannel", "no payload in leave voice request"); + return; + } + + const channelMembers = voiceChannelMembers.get(payload.voiceChannelId); + if (!channelMembers) { + socket.emit("error-voicechannel", "voice channel not found"); + return; + } + + // authenticate user + const userCreds = await getUserCredentials(payload.userId); + if (!userCreds || !userCreds.token || userCreds.token != payload.userToken) { + socket.emit("error-voicechannel", "bad user creds in leave voice request"); + return; + } + + // Remove user from channel + if (channelMembers.has(payload.userId)) { + channelMembers.delete(payload.userId); + + // Leave the socket.io room + socket.leave(payload.voiceChannelId); + + // Notify other users in the channel + socket.to(payload.voiceChannelId).emit("user-left-voicechannel", { + userId: payload.userId, + voiceChannelId: payload.voiceChannelId + }); + + // Clean up empty channels + if (channelMembers.size === 0) { + voiceChannelMembers.delete(payload.voiceChannelId); + } + + // Confirm to the user that they've left + socket.emit("left-voicechannel", { + voiceChannelId: payload.voiceChannelId + }); + + // Clear socket data + socket.data.currentVoiceChannelId = undefined; + } else { + socket.emit("error-voicechannel", "user not in voice channel"); + } + }); + + // Handle disconnection + socket.on("disconnect", () => { + // Get the user ID and current voice channel from socket data + const userId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + // If we have the channel ID stored, use it directly + if (userId && voiceChannelId) { + const channelMembers = voiceChannelMembers.get(voiceChannelId); + if (channelMembers && channelMembers.has(userId)) { + // Remove the user from the channel + channelMembers.delete(userId); + + // Notify other members + socket.to(voiceChannelId).emit("user-left-voicechannel", { + userId, + voiceChannelId, + reason: "disconnected" + }); + + // Clean up empty channels + if (channelMembers.size === 0) { + voiceChannelMembers.delete(voiceChannelId); + } + } + } else { + // If we don't have the info stored, search through all channels + voiceChannelMembers.forEach((members, channelId) => { + // Use Array.from to convert Map entries to array for iteration + Array.from(members.entries()).forEach(([memberId, memberSocket]) => { + if (memberSocket.id === socket.id) { + // Found the user in this channel + members.delete(memberId); + + // Notify other members + socket.to(channelId).emit("user-left-voicechannel", { + userId: memberId, + voiceChannelId: channelId, + reason: "disconnected" + }); + + // Clean up empty channels + if (members.size === 0) { + voiceChannelMembers.delete(channelId); + } + } + }); + }); + } + }); }); } From 5a5afcec32f1dad0c54f707cda725a00a52f88a4 Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 06:14:22 -0400 Subject: [PATCH 09/10] skeletons of fun --- concord-server/src/sockets/voiceHandler.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts index f2adf8c..e23f6ef 100644 --- a/concord-server/src/sockets/voiceHandler.ts +++ b/concord-server/src/sockets/voiceHandler.ts @@ -175,5 +175,20 @@ export function registerVoiceHandlers(io: Server) { }); } }); + + // Handle WebRTC Offer + socket.on("webrtc-offer", async (data) => { + // Implementation for handling WebRTC offer + }); + + // Handle WebRTC Answer + socket.on("webrtc-answer", async (data) => { + // Implementation for handling WebRTC answer + }); + + // Handle ICE Candidates + socket.on("webrtc-ice-candidate", async (data) => { + // Implementation for handling ICE candidates + }); }); } From afd245467459732ebd497ade2289d7946ba6fbfa Mon Sep 17 00:00:00 2001 From: Kevin Puig <119972216+k-puig@users.noreply.github.com> Date: Sun, 28 Sep 2025 08:24:26 -0400 Subject: [PATCH 10/10] rtc works --- concord-server/schema.prisma | 3 +- concord-server/src/index.ts | 15 ++- concord-server/src/sockets/voiceHandler.ts | 133 +++++++++++++++++++-- 3 files changed, 135 insertions(+), 16 deletions(-) diff --git a/concord-server/schema.prisma b/concord-server/schema.prisma index bdfcf43..bb689df 100644 --- a/concord-server/schema.prisma +++ b/concord-server/schema.prisma @@ -89,6 +89,7 @@ model Channel { model ChannelPin { messageId String @unique channelId String @unique + Message Message @relation(fields: [messageId], references: [id]) Channel Channel @relation(fields: [channelId], references: [id]) createdAt DateTime @default(now()) @@ -113,7 +114,7 @@ model Message { } model Reply { - message Message @relation("MessageToReply", fields: [messageId], references: [id]) //message text + message Message @relation("MessageToReply", fields: [messageId], references: [id]) //message text messageId String @unique //message id of the reply repliesTo Message @relation("ReplyToMessage", fields: [repliesToId], references: [id]) //message id that this message replies to repliesToId String @unique //replies to this message id diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 816c3af..4d257c4 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -13,7 +13,7 @@ const app = new Hono(); app.use( "*", cors({ - origin: "http://localhost:5173", + origin: ["http://localhost:5173", "https://concord.kpuig.net"], allowHeaders: [ "Content-Type", "Authorization", @@ -45,7 +45,7 @@ app.get("/scalar", Scalar({ url: "/openapi" })); // initialize socket.io server const io = new Server({ cors: { - origin: "http://localhost:5173", + origin: ["http://localhost:5173", "https://concord.kpuig.net"], credentials: true, }, }); @@ -67,10 +67,13 @@ export default { if (url.pathname === "/socket.io/") { const response = await engine.handleRequest(req, server); // Add CORS headers explicitly - response.headers.set( - "Access-Control-Allow-Origin", - "http://localhost:5173", - ); + const origin = req.headers.get("Origin"); + if ( + origin && + ["http://localhost:5173", "https://concord.kpuig.net"].includes(origin) + ) { + response.headers.set("Access-Control-Allow-Origin", origin); + } response.headers.set("Access-Control-Allow-Credentials", "true"); return response; } else { diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts index e23f6ef..50c285f 100644 --- a/concord-server/src/sockets/voiceHandler.ts +++ b/concord-server/src/sockets/voiceHandler.ts @@ -6,6 +6,30 @@ import { getCategoriesByInstance, getCategory, getChannel } from "../services/ch // Change to Map of voiceChannelId to Map of userId to socket const voiceChannelMembers = new Map>(); +// Types for WebRTC messages +interface WebRTCOffer { + targetUserId: string; + sdp: RTCSessionDescriptionInit; +} + +interface WebRTCAnswer { + targetUserId: string; + sdp: RTCSessionDescriptionInit; +} + +interface WebRTCIceCandidate { + targetUserId: string; + candidate: RTCIceCandidateInit; +} + +// Future ICE server configuration +// This can be expanded later to include TURN servers +interface IceServerConfig { + urls: string | string[]; + username?: string; + credential?: string; +} + export function registerVoiceHandlers(io: Server) { io.on("connection", (socket: Socket) => { // Join voice channel @@ -61,7 +85,8 @@ export function registerVoiceHandlers(io: Server) { socket.join(payload.voiceChannelId); socket.emit("joined-voicechannel", { voiceChannelId: payload.voiceChannelId, - connectedUserIds: Array.from(channelMembers.keys()).filter(e => e !== payload.userId) + connectedUserIds: Array.from(channelMembers.keys()).filter(e => e !== payload.userId), + iceServers: getIceServers() // Send ICE server config to client }); socket.to(payload.voiceChannelId).emit("user-joined-voicechannel", { userId: payload.userId }); @@ -103,7 +128,7 @@ export function registerVoiceHandlers(io: Server) { socket.leave(payload.voiceChannelId); // Notify other users in the channel - socket.to(payload.voiceChannelId).emit("user-left-voicechannel", { + io.to(payload.voiceChannelId).emit("user-left-voicechannel", { userId: payload.userId, voiceChannelId: payload.voiceChannelId }); @@ -139,7 +164,7 @@ export function registerVoiceHandlers(io: Server) { channelMembers.delete(userId); // Notify other members - socket.to(voiceChannelId).emit("user-left-voicechannel", { + io.to(voiceChannelId).emit("user-left-voicechannel", { userId, voiceChannelId, reason: "disconnected" @@ -160,7 +185,7 @@ export function registerVoiceHandlers(io: Server) { members.delete(memberId); // Notify other members - socket.to(channelId).emit("user-left-voicechannel", { + io.to(channelId).emit("user-left-voicechannel", { userId: memberId, voiceChannelId: channelId, reason: "disconnected" @@ -178,17 +203,107 @@ export function registerVoiceHandlers(io: Server) { // Handle WebRTC Offer socket.on("webrtc-offer", async (data) => { - // Implementation for handling WebRTC offer + const payload = data as { targetUserId: string; sdp: any }; + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!payload || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid WebRTC offer payload or sender not in voice channel"); + return; + } + + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(payload.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-offer", { + senderUserId: senderUserId, + sdp: payload.sdp + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } }); // Handle WebRTC Answer - socket.on("webrtc-answer", async (data) => { - // Implementation for handling WebRTC answer + socket.on("webrtc-answer", (data: WebRTCAnswer) => { + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!data || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid WebRTC answer data"); + return; + } + + // Forward the answer to the target user + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(data.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-answer", { + senderUserId: senderUserId, + sdp: data.sdp + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } }); // Handle ICE Candidates - socket.on("webrtc-ice-candidate", async (data) => { - // Implementation for handling ICE candidates + socket.on("webrtc-ice-candidate", (data: WebRTCIceCandidate) => { + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!data || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid ICE candidate data"); + return; + } + + // Forward the ICE candidate to the target user + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(data.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-ice-candidate", { + senderUserId: senderUserId, + candidate: data.candidate + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } }); }); } + +/** + * Get the current ICE server configuration. + * This function returns STUN servers and includes TURN server credentials + * if they are available in the environment variables. + */ +function getIceServers(): IceServerConfig[] { + const iceServers: IceServerConfig[] = [ + { urls: 'stun:stun.l.google.com:19302' }, + { urls: 'stun:stun1.l.google.com:19302' }, + ]; + + // Add own STUN server if configured + const stunServerUrl = process.env.STUN_SERVER_URL; + if (stunServerUrl) { + iceServers.push({ urls: stunServerUrl }); + } + + // Add TURN server if configured in environment variables + const turnServerUrl = process.env.TURN_SERVER_URL; + const turnUsername = process.env.TURN_SERVER_USERNAME; + const turnCredential = process.env.TURN_SERVER_CREDENTIAL; + + if (turnServerUrl && turnUsername && turnCredential) { + iceServers.push({ + urls: turnServerUrl, + username: turnUsername, + credential: turnCredential, + }); + } + + return iceServers; +}