diff --git a/concord-server/schema.prisma b/concord-server/schema.prisma index a4fbfa8..b0cc65d 100644 --- a/concord-server/schema.prisma +++ b/concord-server/schema.prisma @@ -89,7 +89,6 @@ model Channel { model ChannelPin { messageId String @unique channelId String @unique - Message Message @relation(fields: [messageId], references: [id]) Channel Channel @relation(fields: [channelId], references: [id]) createdAt DateTime @default(now()) diff --git a/concord-server/src/services/realtime.ts b/concord-server/src/services/realtime.ts index 434de72..9083788 100644 --- a/concord-server/src/services/realtime.ts +++ b/concord-server/src/services/realtime.ts @@ -1,15 +1,23 @@ -import { readonly } from "zod"; +import { PrismaClient } from "@prisma/client"; +import type { Server, Socket } from "socket.io"; +import { sendMessageToChannel } from "./messageService"; + +const prisma = new PrismaClient(); const EVENTS = { NEW_CHANNEL_MESSAGE: "new_channel_message", DELETE_CHANNEL_MESSAGE: "delete_channel_message", + MESSAGE_PING: "message_ping", }; -export async function sendMessageToChannel( +export async function sendMessageToChannelEvent( instanceId: string, categoryId: string, channelId: string, - message: any, + userId: string, + content: string, + token: string, + repliedMessageId: string | null, event: string, io: any, ): Promise { @@ -19,10 +27,15 @@ export async function sendMessageToChannel( throw new Error("Event not implemented"); } - //TODO: add prisma to save channel message to DB + const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId); + + if (!newMessage) { + console.log("services::realtime::sendMessageToChannel - could not create new message"); + return "failed to create message" + } return new Promise((resolve) => { - io.to(instanceId).emit(event, message, (ack: any) => { + io.to(instanceId).emit(event, newMessage, (ack: any) => { if (ack && ack.status === "received") { console.log(`Message ${ack.messageId} acknowledged by client.`); resolve(true); @@ -88,3 +101,122 @@ export async function removeMessageFromChannel( return false; } } + + +export async function messagePing( + instanceId: string, + categoryId: string, + channelId: string, + messageId: string, + userId: string, + pingType: string, + text: string, + io: any, + mentionedUserIds?: string[], +): Promise { + try { + const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } }); + if (!curInstance) throw new Error("instance not found"); + + const curCategory = await prisma.category.findUnique({ where: { id: categoryId } }); + if (!curCategory) throw new Error("category not found"); + + const curChannel = await prisma.channel.findUnique({ where: { id: channelId } }); + if (!curChannel) throw new Error("channel not found"); + + if (pingType === "mention") { + if (!mentionedUserIds || mentionedUserIds.length === 0) { + throw new Error("no mentioned users provided for mention ping"); + } + + // Persist pings (best-effort) + try { + const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m })); + await prisma.messagePing.createMany({ data: rows, skipDuplicates: true }); + } catch (e) { + console.warn("services::realtime::messagePing - could not persist pings", e); + } + + const timeoutMs = 5000; + const results: Array<{ userId: string; delivered: boolean }> = []; + + // For each mentioned user, find sockets on this server with matching socket.data.userId + await Promise.all( + mentionedUserIds.map(async (mentionedUserId) => { + const socketsForUser: Socket[] = []; + for (const sock of Array.from((io as Server).sockets.sockets.values())) { + try { + if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket); + } catch {} + } + + if (socketsForUser.length === 0) { + results.push({ userId: mentsocketsionedUserId, delivered: false }); + return; + } + + const perSocket = socketsForUser.map( + (sock) => + new Promise((resolve) => { + let done = false; + try { + sock.emit( + EVENTS.MESSAGE_PING, + { categoryId, channelId, messageId, pingType, text, mentionedUserId }, + (ack: any) => { + if (done) return; + done = true; + resolve(!!ack && ack.status === "received"); + }, + ); + } catch (e) { + if (!done) { + done = true; + resolve(false); + } + } + + setTimeout(() => { + if (done) return; + done = true; + resolve(false); + }, timeoutMs); + }), + ); + + try { + const settled = await Promise.all(perSocket); + const delivered = settled.some(Boolean); + results.push({ userId: mentionedUserId, delivered }); + } catch { + results.push({ userId: mentionedUserId, delivered: false }); + } + }), + ); + + const anyDelivered = results.some((r) => r.delivered); + if (anyDelivered) { + console.log("services::realtime::messagePing delivered to some users", results); + return true; + } + + console.log("services::realtime::messagePing no acknowledgments", results); + return "no acknowledgment"; + } + + // Fallback: emit a generic ping to the instance (fire-and-forget ack optional) + return new Promise((resolve) => { + (io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => { + if (ack && ack.status === "received") { + resolve(true); + } else { + resolve("no acknowledgment"); + } + }); + }); + } catch (err) { + const errMessage = err as Error; + console.log("services::realtime::messagePing - ", errMessage); + return false; + } +} \ No newline at end of file diff --git a/concord-server/src/validators/index.ts b/concord-server/src/validators/index.ts new file mode 100644 index 0000000..1348a03 --- /dev/null +++ b/concord-server/src/validators/index.ts @@ -0,0 +1,71 @@ +import { Hono } from "hono"; +import { cors } from "hono/cors"; +import { Server as Engine } from "@socket.io/bun-engine"; +import { Server } from "socket.io"; +import routes from "./routes/index"; +import { Scalar } from "@scalar/hono-api-reference"; +import { openAPIRouteHandler } from "hono-openapi"; + +//initialize socket.io server +const io = new Server(); + +//initialize bun engine +//then bind to socket.io server +const engine = new Engine(); +io.bind(engine); + +io.on("connection", (socket) => { + //get userId and clientId from query params + const userId = socket.handshake.query.userId; + const clientId = socket.handshake.query.clientId; + if (!userId || Array.isArray(userId)) { + socket.disconnect(); + throw new Error("Invalid user ID"); + } + + if (!clientId || Array.isArray(clientId)) { + socket.disconnect(); + throw new Error("Invalid client ID"); + } + + socket.join(userId); + console.log( + `User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`, + ); + + socket.on("disconnect", () => { + console.log(`User ${userId} disconnected from socket ${socket.id}`); + }); +}); + +const app = new Hono(); + +app.use( + "*", + cors({ + origin: "http://localhost:5173", + allowHeaders: ["Content-Type", "Authorization"], + allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], + credentials: true, + }), +); + +app.route("/api", routes); + +app.get( + "/openapi", + openAPIRouteHandler(app, { + documentation: { + info: { + title: "Hono API", + version: "1.0.0", + description: "Greeting API", + }, + servers: [{ url: "http://localhost:3000", description: "Local Server" }], + }, + }), +); + +app.get("/scalar", Scalar({ url: "/openapi" })); + +export default app; diff --git a/concord-server/src/validators/realtimeValidator.ts b/concord-server/src/validators/realtimeValidator.ts new file mode 100644 index 0000000..e69de29