diff --git a/concord-server/migrations/20250928083913_maxchar2000/migration.sql b/concord-server/migrations/20250928083913_maxchar2000/migration.sql new file mode 100644 index 0000000..096b66b --- /dev/null +++ b/concord-server/migrations/20250928083913_maxchar2000/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - You are about to alter the column `text` on the `Message` table. The data in that column could be lost. The data in that column will be cast from `Text` to `VarChar(2000)`. + +*/ +-- AlterTable +ALTER TABLE "public"."Message" ALTER COLUMN "text" SET DATA TYPE VARCHAR(2000); diff --git a/concord-server/src/controller/realtime.ts b/concord-server/src/controller/realtime.ts deleted file mode 100644 index 5f9b961..0000000 --- a/concord-server/src/controller/realtime.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { Context } from "hono"; -import { - sendMessageToChannel, - removeMessageFromChannel, -} from "../services/realtime.js"; -import { success } from "zod"; - -export async function postMessageToChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const message = await c.req.json(); - - const result = await sendMessageToChannel( - instanceId, - categoryId, - channelId, - message, - "new_channel_message", - io, - ); - - if (result === "Event not implemented") { - console.log( - "controller::realtime::postMessageToChannel - Failed to send message", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::postMessageToChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to send message"); - } - - return c.json({ - success: true, - message: "Message sent successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("controller::realtime::postMessageToChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } -} - -export async function deleteMessageFromChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const messageId = c.req.param("messageId"); - - const result = await removeMessageFromChannel( - instanceId, - categoryId, - channelId, - messageId, - "delete_channel_message", - io, - ); - - if (result === "event not implemented") { - console.log( - "controller::realtime::deleteMessageFromChannel - Event not implemented", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::deleteMessageFromChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to delete message"); - } - - c.json({ - success: true, - message: "Message deleted successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } -} diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1348a03..4d257c4 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -5,46 +5,20 @@ import { Server } from "socket.io"; import routes from "./routes/index"; import { Scalar } from "@scalar/hono-api-reference"; import { openAPIRouteHandler } from "hono-openapi"; +import { registerSocketHandlers } from "./sockets"; -//initialize socket.io server -const io = new Server(); - -//initialize bun engine -//then bind to socket.io server -const engine = new Engine(); -io.bind(engine); - -io.on("connection", (socket) => { - //get userId and clientId from query params - const userId = socket.handshake.query.userId; - const clientId = socket.handshake.query.clientId; - if (!userId || Array.isArray(userId)) { - socket.disconnect(); - throw new Error("Invalid user ID"); - } - - if (!clientId || Array.isArray(clientId)) { - socket.disconnect(); - throw new Error("Invalid client ID"); - } - - socket.join(userId); - console.log( - `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ); - - socket.on("disconnect", () => { - console.log(`User ${userId} disconnected from socket ${socket.id}`); - }); -}); - +// Routes const app = new Hono(); app.use( "*", cors({ - origin: "http://localhost:5173", - allowHeaders: ["Content-Type", "Authorization"], + origin: ["http://localhost:5173", "https://concord.kpuig.net"], + allowHeaders: [ + "Content-Type", + "Authorization", + "Access-Control-Allow-Origin", + ], allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], credentials: true, }), @@ -68,4 +42,44 @@ app.get( app.get("/scalar", Scalar({ url: "/openapi" })); -export default app; +// initialize socket.io server +const io = new Server({ + cors: { + origin: ["http://localhost:5173", "https://concord.kpuig.net"], + credentials: true, + }, +}); +const engine = new Engine(); +io.bind(engine); + +// Register socket.io events +registerSocketHandlers(io); + +const { websocket } = engine.handler(); + +export default { + port: 3000, + idleTimeout: 30, // must be greater than the "pingInterval" option of the engine, which defaults to 25 seconds + + async fetch(req: Request, server: Bun.Server) { + const url = new URL(req.url); + + if (url.pathname === "/socket.io/") { + const response = await engine.handleRequest(req, server); + // Add CORS headers explicitly + const origin = req.headers.get("Origin"); + if ( + origin && + ["http://localhost:5173", "https://concord.kpuig.net"].includes(origin) + ) { + response.headers.set("Access-Control-Allow-Origin", origin); + } + response.headers.set("Access-Control-Allow-Credentials", "true"); + return response; + } else { + return app.fetch(req, server); + } + }, + + websocket, +}; diff --git a/concord-server/src/routes/realtime.ts b/concord-server/src/routes/realtime.ts deleted file mode 100644 index 933b839..0000000 --- a/concord-server/src/routes/realtime.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Hono } from "hono"; -import { zValidator } from "@hono/zod-validator"; -import { describeRoute, resolver } from "hono-openapi"; -import { - postMessageToChannel, - deleteMessageFromChannel, -} from "../controller/realtime"; - -const app = new Hono(); - -app.post( - "message/", - zValidator({ - body: z.object({ - content: z.string().min(1).max(500), - }), - }), - async (c) => { - const { instanceId, categoryId, channelId } = c.req.params; - const { content } = c.req.body; - - return postMessageToChannel(c.get("io"), { - instanceId, - categoryId, - channelId, - content, - }); - }, -); diff --git a/concord-server/src/services/instanceService.ts b/concord-server/src/services/instanceService.ts index bb4008a..7ac6381 100644 --- a/concord-server/src/services/instanceService.ts +++ b/concord-server/src/services/instanceService.ts @@ -57,3 +57,64 @@ export async function getAllInstances() { }; } } + +export async function getInstanceByChannelId(id: string) { + try { + const instance = await prisma.instance.findFirst({ + where: { + Category: { + some: { + Channel: { + some: { + id: id + } + } + } + } + }, + }); + + if (!instance) { + return null; + } + + return instance; + } catch (error) { + console.error("Error fetching instance by channel ID:", error); + return null; + } +} + +export async function getInstancesByUserId(id: string) { + try { + const user = await getUserInformation(id); + if (user && user.admin) { + const adminInstances = await getAllInstances(); + if (adminInstances && adminInstances.success) { + return adminInstances.data; + } + } + + const instance = await prisma.instance.findMany({ + where: { + Role: { + some: { + User: { + id: id + } + } + } + } + }); + + if (!instance) { + return null; + } + + return instance; + } catch (error) { + console.error("Error fetching instance by channel ID:", error); + return null; + } +} + diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts deleted file mode 100644 index 434de72..0000000 --- a/concord-server/src/services/realtime.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { readonly } from "zod"; - -const EVENTS = { - NEW_CHANNEL_MESSAGE: "new_channel_message", - DELETE_CHANNEL_MESSAGE: "delete_channel_message", -}; - -export async function sendMessageToChannel( - instanceId: string, - categoryId: string, - channelId: string, - message: any, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.NEW_CHANNEL_MESSAGE === event) { - throw new Error("Event not implemented"); - } - - //TODO: add prisma to save channel message to DB - - return new Promise((resolve) => { - io.to(instanceId).emit(event, message, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::sendMessageToChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::sendMessageToChannel - Event not implemented. Attempted event: ${event}`, - ); - return "event not implemented"; - } - console.log("services::realtime::sendMessageToChannel - ", errMessage); - return false; - } -} - -export async function removeMessageFromChannel( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.DELETE_CHANNEL_MESSAGE === event) { - throw new Error("event not implemented"); - } - - //TODO: add prisma to flag a channel message as deleted - - return new Promise((resolve) => { - io.to(instanceId).emit(event, { messageId }, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::deleteMessageFromChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::deleteMessageFromChannel - Event not implemented. Attempted event: ${event}`, - ); - return false; - } - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return false; - } -} diff --git a/concord-server/src/sockets/index.ts b/concord-server/src/sockets/index.ts new file mode 100644 index 0000000..816e14d --- /dev/null +++ b/concord-server/src/sockets/index.ts @@ -0,0 +1,16 @@ +import { Server } from "socket.io"; +import { registerVoiceHandlers } from "./voiceHandler"; + +export function registerSocketHandlers(io: Server) { + // bad practice + io.on("connection", (socket) => { + console.log("connected"); + socket.on("ping", (c) => { + console.log(c); + socket.emit("pong", c); + }); + }); + + // good practice + registerVoiceHandlers(io); +} diff --git a/concord-server/src/sockets/voiceHandler.ts b/concord-server/src/sockets/voiceHandler.ts new file mode 100644 index 0000000..50c285f --- /dev/null +++ b/concord-server/src/sockets/voiceHandler.ts @@ -0,0 +1,309 @@ +import { Server, Socket } from "socket.io"; +import { getUserCredentials, getUserInformation } from "../services/userService"; +import { getAllInstances, getInstanceByChannelId, getInstancesByUserId } from "../services/instanceService"; +import { getCategoriesByInstance, getCategory, getChannel } from "../services/channelService"; + +// Change to Map of voiceChannelId to Map of userId to socket +const voiceChannelMembers = new Map>(); + +// Types for WebRTC messages +interface WebRTCOffer { + targetUserId: string; + sdp: RTCSessionDescriptionInit; +} + +interface WebRTCAnswer { + targetUserId: string; + sdp: RTCSessionDescriptionInit; +} + +interface WebRTCIceCandidate { + targetUserId: string; + candidate: RTCIceCandidateInit; +} + +// Future ICE server configuration +// This can be expanded later to include TURN servers +interface IceServerConfig { + urls: string | string[]; + username?: string; + credential?: string; +} + +export function registerVoiceHandlers(io: Server) { + io.on("connection", (socket: Socket) => { + // Join voice channel + socket.on("join-voicechannel", async (data) => { + const payload = data as { + userId: string + userToken: string, + voiceChannelId: string, + }; + if (!payload) { + socket.emit("error-voicechannel", "no payload in voice conn") + return; + } + + // Initialize map for channel if not present + if (!voiceChannelMembers.has(payload.voiceChannelId)) { + voiceChannelMembers.set(payload.voiceChannelId, new Map()); + } + + const channelMembers = voiceChannelMembers.get(payload.voiceChannelId)!; + + // Remove user if already present in this channel + if (channelMembers.has(payload.userId)) { + channelMembers.delete(payload.userId); + } + + // authenticate user + const userCreds = await getUserCredentials(payload.userId); + if (!userCreds || !userCreds.token || userCreds.token != payload.userToken) { + socket.emit("error-voicechannel", "bad user creds in voice conn"); + return; + } + + // determine if channel is voice channel + const channel = await getChannel(payload.voiceChannelId); + if (!channel || channel.type !== "voice" || !channel.categoryId) { + socket.emit("error-voicechannel", "bad channel or channel type in voice conn"); + return; + } + + // authorize user using role + const user = await getUserInformation(payload.userId); + const instance = await getInstanceByChannelId(payload.voiceChannelId); + const instances = await getInstancesByUserId(payload.userId); + if (!user || !instance || !instances || !instances.find(e => e.id === instance.id)) { + socket.emit("error-voicechannel", "user not authorized for channel in voice conn"); + return; + } + + // add to map + channelMembers.set(payload.userId, socket); + + socket.join(payload.voiceChannelId); + socket.emit("joined-voicechannel", { + voiceChannelId: payload.voiceChannelId, + connectedUserIds: Array.from(channelMembers.keys()).filter(e => e !== payload.userId), + iceServers: getIceServers() // Send ICE server config to client + }); + socket.to(payload.voiceChannelId).emit("user-joined-voicechannel", { userId: payload.userId }); + + // Store userId in socket.data for easier access later + socket.data.userId = payload.userId; + socket.data.currentVoiceChannelId = payload.voiceChannelId; + }); + + // Leave voice channel + socket.on("leave-voicechannel", async (data) => { + const payload = data as { + userId: string, + userToken: string, + voiceChannelId: string, + }; + if (!payload) { + socket.emit("error-voicechannel", "no payload in leave voice request"); + return; + } + + const channelMembers = voiceChannelMembers.get(payload.voiceChannelId); + if (!channelMembers) { + socket.emit("error-voicechannel", "voice channel not found"); + return; + } + + // authenticate user + const userCreds = await getUserCredentials(payload.userId); + if (!userCreds || !userCreds.token || userCreds.token != payload.userToken) { + socket.emit("error-voicechannel", "bad user creds in leave voice request"); + return; + } + + // Remove user from channel + if (channelMembers.has(payload.userId)) { + channelMembers.delete(payload.userId); + + // Leave the socket.io room + socket.leave(payload.voiceChannelId); + + // Notify other users in the channel + io.to(payload.voiceChannelId).emit("user-left-voicechannel", { + userId: payload.userId, + voiceChannelId: payload.voiceChannelId + }); + + // Clean up empty channels + if (channelMembers.size === 0) { + voiceChannelMembers.delete(payload.voiceChannelId); + } + + // Confirm to the user that they've left + socket.emit("left-voicechannel", { + voiceChannelId: payload.voiceChannelId + }); + + // Clear socket data + socket.data.currentVoiceChannelId = undefined; + } else { + socket.emit("error-voicechannel", "user not in voice channel"); + } + }); + + // Handle disconnection + socket.on("disconnect", () => { + // Get the user ID and current voice channel from socket data + const userId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + // If we have the channel ID stored, use it directly + if (userId && voiceChannelId) { + const channelMembers = voiceChannelMembers.get(voiceChannelId); + if (channelMembers && channelMembers.has(userId)) { + // Remove the user from the channel + channelMembers.delete(userId); + + // Notify other members + io.to(voiceChannelId).emit("user-left-voicechannel", { + userId, + voiceChannelId, + reason: "disconnected" + }); + + // Clean up empty channels + if (channelMembers.size === 0) { + voiceChannelMembers.delete(voiceChannelId); + } + } + } else { + // If we don't have the info stored, search through all channels + voiceChannelMembers.forEach((members, channelId) => { + // Use Array.from to convert Map entries to array for iteration + Array.from(members.entries()).forEach(([memberId, memberSocket]) => { + if (memberSocket.id === socket.id) { + // Found the user in this channel + members.delete(memberId); + + // Notify other members + io.to(channelId).emit("user-left-voicechannel", { + userId: memberId, + voiceChannelId: channelId, + reason: "disconnected" + }); + + // Clean up empty channels + if (members.size === 0) { + voiceChannelMembers.delete(channelId); + } + } + }); + }); + } + }); + + // Handle WebRTC Offer + socket.on("webrtc-offer", async (data) => { + const payload = data as { targetUserId: string; sdp: any }; + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!payload || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid WebRTC offer payload or sender not in voice channel"); + return; + } + + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(payload.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-offer", { + senderUserId: senderUserId, + sdp: payload.sdp + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } + }); + + // Handle WebRTC Answer + socket.on("webrtc-answer", (data: WebRTCAnswer) => { + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!data || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid WebRTC answer data"); + return; + } + + // Forward the answer to the target user + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(data.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-answer", { + senderUserId: senderUserId, + sdp: data.sdp + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } + }); + + // Handle ICE Candidates + socket.on("webrtc-ice-candidate", (data: WebRTCIceCandidate) => { + const senderUserId = socket.data.userId; + const voiceChannelId = socket.data.currentVoiceChannelId; + + if (!data || !senderUserId || !voiceChannelId) { + socket.emit("error-voicechannel", "Invalid ICE candidate data"); + return; + } + + // Forward the ICE candidate to the target user + const channelMembers = voiceChannelMembers.get(voiceChannelId); + const targetSocket = channelMembers?.get(data.targetUserId); + + if (targetSocket) { + targetSocket.emit("webrtc-ice-candidate", { + senderUserId: senderUserId, + candidate: data.candidate + }); + } else { + socket.emit("error-voicechannel", "Target user not found in voice channel"); + } + }); + }); +} + +/** + * Get the current ICE server configuration. + * This function returns STUN servers and includes TURN server credentials + * if they are available in the environment variables. + */ +function getIceServers(): IceServerConfig[] { + const iceServers: IceServerConfig[] = [ + { urls: 'stun:stun.l.google.com:19302' }, + { urls: 'stun:stun1.l.google.com:19302' }, + ]; + + // Add own STUN server if configured + const stunServerUrl = process.env.STUN_SERVER_URL; + if (stunServerUrl) { + iceServers.push({ urls: stunServerUrl }); + } + + // Add TURN server if configured in environment variables + const turnServerUrl = process.env.TURN_SERVER_URL; + const turnUsername = process.env.TURN_SERVER_USERNAME; + const turnCredential = process.env.TURN_SERVER_CREDENTIAL; + + if (turnServerUrl && turnUsername && turnCredential) { + iceServers.push({ + urls: turnServerUrl, + username: turnUsername, + credential: turnCredential, + }); + } + + return iceServers; +} diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts new file mode 100644 index 0000000..859b4e8 --- /dev/null +++ b/concord-server/src/validators/realtimeValidator.ts @@ -0,0 +1,18 @@ +import { z } from "zod"; + +export const postMessageToChannelSchema = z.object({ + instanceId: z.uuidv7(), + categoryId: z.uuidv7(), + channelId: z.uuidv7(), + userId: z.uuidv7(), + content: z.string().min(1).max(2000), + repliedMessageId: z.uuidv7().optional(), + token: z.string(), +}); + +//TODO: add more realtime related validators as needed + +export type PostMessageToChannelInput = z.infer< + typeof postMessageToChannelSchema +>; +//TODO: create more input schemas for other realtime actions