diff --git a/concord-server/src/controller/realtimeController.ts b/concord-server/src/controller/realtimeController.ts deleted file mode 100644 index 834e4b6..0000000 --- a/concord-server/src/controller/realtimeController.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Context } from "hono"; -import { - sendMessageToChannelEvent, - removeMessageFromChannel, -} from "../services/realtime.js"; -import { success } from "zod"; -import { PostMessageToChannelInput } from "../validators/realtimeValidator.js"; -import { sendMessage } from "./messageController.js"; - -export async function postMessageToChannel(io: any, c: Context, data: PostMessageToChannelInput) { - const instanceId = data.instanceId; - const categoryId = data.categoryId; - const channelId = data.channelId; - const userId = data.userId; - const content = data.content; - const token = data.token; - const repliedMessageId = data.repliedMessageId ?? null; - const event = "new_channel_message"; - return sendMessageToChannelEvent( - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId, - event, - io - ); -} - -export async function deleteMessageFromChannel(io: any, c: Context) { - try { - io = c.get("io"); - - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const messageId = c.req.param("messageId"); - - const result = await removeMessageFromChannel( - instanceId, - categoryId, - channelId, - messageId, - "delete_channel_message", - io, - ); - - if (result === "event not implemented") { - console.log( - "controller::realtime::deleteMessageFromChannel - Event not implemented", - ); - return c.json({ - success: false, - message: "Event not implemented or recognized", - status: 400, - }); - } - - if (result === "no acknowledgment") { - console.log( - "controller::realtime::deleteMessageFromChannel - No acknowledgment received from client", - ); - return c.json({ - success: false, - message: "No acknowledgment received from client", - status: 500, - }); - } - - if (!result) { - throw new Error("failed to delete message"); - } - - c.json({ - success: true, - message: "Message deleted successfully", - status: 200, - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return c.json({ - success: false, - message: errMessage.message, - status: 500, - }); - } -} diff --git a/concord-server/src/index.ts b/concord-server/src/index.ts index 1348a03..5356a92 100644 --- a/concord-server/src/index.ts +++ b/concord-server/src/index.ts @@ -13,11 +13,11 @@ const io = new Server(); //then bind to socket.io server const engine = new Engine(); io.bind(engine); - +/* io.on("connection", (socket) => { //get userId and clientId from query params - const userId = socket.handshake.query.userId; - const clientId = socket.handshake.query.clientId; + const userId = socket.handshake.query.userId + const clientId = socket.handshake.query.clientId if (!userId || Array.isArray(userId)) { socket.disconnect(); throw new Error("Invalid user ID"); @@ -28,15 +28,23 @@ io.on("connection", (socket) => { throw new Error("Invalid client ID"); } + socket.join(userId); console.log( `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, - ); + ) socket.on("disconnect", () => { console.log(`User ${userId} disconnected from socket ${socket.id}`); - }); + }) }); +*/ + +io.on("ping", (socket) => { + console.log(`New client connected: ${socket.id}`) + socket.emit("pong", socket. ) +}) + const app = new Hono(); diff --git a/concord-server/src/routes/realtimeRoutes.ts b/concord-server/src/routes/realtimeRoutes.ts deleted file mode 100644 index cb44dd1..0000000 --- a/concord-server/src/routes/realtimeRoutes.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Hono } from "hono"; -import { zValidator } from "@hono/zod-validator"; -import { describeRoute, resolver } from "hono-openapi"; -import { - postMessageToChannelSchema -} from "../validators/realtimeValidator.js"; -import { - postMessageToChannel, - deleteMessageFromChannel, -} from "../controller/realtimeController"; - - -const realtimeRoutes = new Hono(); - -realtimeRoutes.post( - "/message/:instanceId/:categoryId/:channelId", - describeRoute({ - description: "Post a message to a channel", - responses: { - 200: { - description: "Message posted successfully", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 400: { - description: "Bad Request - Invalid input data", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 401: { - description: "Unauthorized - Invalid token", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 404: { - description: "Instance, Category, Channel, or User not found", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - 500: { - description: "Server error", - content: { - "application/json": { schema: resolver(postMessageToChannelSchema) }, - }, - }, - }, - }), - zValidator("json", postMessageToChannelSchema), - async (c) => { - const instanceId = c.req.param("instanceId"); - const categoryId = c.req.param("categoryId"); - const channelId = c.req.param("channelId"); - const { userId, content, repliedMessageId, token } = await c.req.json(); - - const ioServer = (c as any).get("io"); - if (!ioServer) { - return c.json({ success: false, error: "Realtime server not available" }, 500); - } - - const result = await postMessageToChannel(ioServer, c, { - instanceId, - categoryId, - channelId, - userId, - content, - token, - repliedMessageId: repliedMessageId ?? null, - }) - - if (result === "event not implemented") { - return c.json({ success: false, message: "Event not implemented or recognized" }, 400); - } - - if (result === "no acknowledgment") { - return c.json({ success: false, message: "No acknowledgment received from client" }, 500); - } - - if (!result) { - return c.json({ success: false, message: "Failed to post message" }, 500); - } - - return c.json({ success: true, result }, 200); - } -); - -export default realtimeRoutes; \ No newline at end of file diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts deleted file mode 100644 index 9083788..0000000 --- a/concord-server/src/services/realtime.ts +++ /dev/null @@ -1,222 +0,0 @@ -import { PrismaClient } from "@prisma/client"; -import type { Server, Socket } from "socket.io"; -import { sendMessageToChannel } from "./messageService"; - -const prisma = new PrismaClient(); - -const EVENTS = { - NEW_CHANNEL_MESSAGE: "new_channel_message", - DELETE_CHANNEL_MESSAGE: "delete_channel_message", - MESSAGE_PING: "message_ping", -}; - -export async function sendMessageToChannelEvent( - instanceId: string, - categoryId: string, - channelId: string, - userId: string, - content: string, - token: string, - repliedMessageId: string | null, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.NEW_CHANNEL_MESSAGE === event) { - throw new Error("Event not implemented"); - } - - const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId); - - if (!newMessage) { - console.log("services::realtime::sendMessageToChannel - could not create new message"); - return "failed to create message" - } - - return new Promise((resolve) => { - io.to(instanceId).emit(event, newMessage, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::sendMessageToChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::sendMessageToChannel - Event not implemented. Attempted event: ${event}`, - ); - return "event not implemented"; - } - console.log("services::realtime::sendMessageToChannel - ", errMessage); - return false; - } -} - -export async function removeMessageFromChannel( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - event: string, - io: any, -): Promise { - try { - //TODO: implement middleware to replace this - if (EVENTS.DELETE_CHANNEL_MESSAGE === event) { - throw new Error("event not implemented"); - } - - //TODO: add prisma to flag a channel message as deleted - - return new Promise((resolve) => { - io.to(instanceId).emit(event, { messageId }, (ack: any) => { - if (ack && ack.status === "received") { - console.log(`Message ${ack.messageId} acknowledged by client.`); - resolve(true); - } else { - console.log( - "services::realtime::deleteMessageFromChannel No acknowledgment received from client.", - ); - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - if (errMessage.message === "Event not implemented") { - console.log( - `services::realtime::deleteMessageFromChannel - Event not implemented. Attempted event: ${event}`, - ); - return false; - } - console.log("services::realtime::deleteMessageFromChannel - ", errMessage); - return false; - } -} - - -export async function messagePing( - instanceId: string, - categoryId: string, - channelId: string, - messageId: string, - userId: string, - pingType: string, - text: string, - io: any, - mentionedUserIds?: string[], -): Promise { - try { - const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } }); - if (!curInstance) throw new Error("instance not found"); - - const curCategory = await prisma.category.findUnique({ where: { id: categoryId } }); - if (!curCategory) throw new Error("category not found"); - - const curChannel = await prisma.channel.findUnique({ where: { id: channelId } }); - if (!curChannel) throw new Error("channel not found"); - - if (pingType === "mention") { - if (!mentionedUserIds || mentionedUserIds.length === 0) { - throw new Error("no mentioned users provided for mention ping"); - } - - // Persist pings (best-effort) - try { - const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m })); - await prisma.messagePing.createMany({ data: rows, skipDuplicates: true }); - } catch (e) { - console.warn("services::realtime::messagePing - could not persist pings", e); - } - - const timeoutMs = 5000; - const results: Array<{ userId: string; delivered: boolean }> = []; - - // For each mentioned user, find sockets on this server with matching socket.data.userId - await Promise.all( - mentionedUserIds.map(async (mentionedUserId) => { - const socketsForUser: Socket[] = []; - for (const sock of Array.from((io as Server).sockets.sockets.values())) { - try { - if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket); - } catch {} - } - - if (socketsForUser.length === 0) { - results.push({ userId: mentsocketsionedUserId, delivered: false }); - return; - } - - const perSocket = socketsForUser.map( - (sock) => - new Promise((resolve) => { - let done = false; - try { - sock.emit( - EVENTS.MESSAGE_PING, - { categoryId, channelId, messageId, pingType, text, mentionedUserId }, - (ack: any) => { - if (done) return; - done = true; - resolve(!!ack && ack.status === "received"); - }, - ); - } catch (e) { - if (!done) { - done = true; - resolve(false); - } - } - - setTimeout(() => { - if (done) return; - done = true; - resolve(false); - }, timeoutMs); - }), - ); - - try { - const settled = await Promise.all(perSocket); - const delivered = settled.some(Boolean); - results.push({ userId: mentionedUserId, delivered }); - } catch { - results.push({ userId: mentionedUserId, delivered: false }); - } - }), - ); - - const anyDelivered = results.some((r) => r.delivered); - if (anyDelivered) { - console.log("services::realtime::messagePing delivered to some users", results); - return true; - } - - console.log("services::realtime::messagePing no acknowledgments", results); - return "no acknowledgment"; - } - - // Fallback: emit a generic ping to the instance (fire-and-forget ack optional) - return new Promise((resolve) => { - (io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => { - if (ack && ack.status === "received") { - resolve(true); - } else { - resolve("no acknowledgment"); - } - }); - }); - } catch (err) { - const errMessage = err as Error; - console.log("services::realtime::messagePing - ", errMessage); - return false; - } -} \ No newline at end of file