Merge pull request #6 from k-puig/feature/websockets
feat: send message to channel event
This commit is contained in:
@@ -89,7 +89,6 @@ model Channel {
|
|||||||
model ChannelPin {
|
model ChannelPin {
|
||||||
messageId String @unique
|
messageId String @unique
|
||||||
channelId String @unique
|
channelId String @unique
|
||||||
|
|
||||||
Message Message @relation(fields: [messageId], references: [id])
|
Message Message @relation(fields: [messageId], references: [id])
|
||||||
Channel Channel @relation(fields: [channelId], references: [id])
|
Channel Channel @relation(fields: [channelId], references: [id])
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
|
|||||||
@@ -1,15 +1,23 @@
|
|||||||
import { readonly } from "zod";
|
import { PrismaClient } from "@prisma/client";
|
||||||
|
import type { Server, Socket } from "socket.io";
|
||||||
|
import { sendMessageToChannel } from "./messageService";
|
||||||
|
|
||||||
|
const prisma = new PrismaClient();
|
||||||
|
|
||||||
const EVENTS = {
|
const EVENTS = {
|
||||||
NEW_CHANNEL_MESSAGE: "new_channel_message",
|
NEW_CHANNEL_MESSAGE: "new_channel_message",
|
||||||
DELETE_CHANNEL_MESSAGE: "delete_channel_message",
|
DELETE_CHANNEL_MESSAGE: "delete_channel_message",
|
||||||
|
MESSAGE_PING: "message_ping",
|
||||||
};
|
};
|
||||||
|
|
||||||
export async function sendMessageToChannel(
|
export async function sendMessageToChannelEvent(
|
||||||
instanceId: string,
|
instanceId: string,
|
||||||
categoryId: string,
|
categoryId: string,
|
||||||
channelId: string,
|
channelId: string,
|
||||||
message: any,
|
userId: string,
|
||||||
|
content: string,
|
||||||
|
token: string,
|
||||||
|
repliedMessageId: string | null,
|
||||||
event: string,
|
event: string,
|
||||||
io: any,
|
io: any,
|
||||||
): Promise<string | boolean> {
|
): Promise<string | boolean> {
|
||||||
@@ -19,10 +27,15 @@ export async function sendMessageToChannel(
|
|||||||
throw new Error("Event not implemented");
|
throw new Error("Event not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: add prisma to save channel message to DB
|
const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId);
|
||||||
|
|
||||||
|
if (!newMessage) {
|
||||||
|
console.log("services::realtime::sendMessageToChannel - could not create new message");
|
||||||
|
return "failed to create message"
|
||||||
|
}
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
io.to(instanceId).emit(event, message, (ack: any) => {
|
io.to(instanceId).emit(event, newMessage, (ack: any) => {
|
||||||
if (ack && ack.status === "received") {
|
if (ack && ack.status === "received") {
|
||||||
console.log(`Message ${ack.messageId} acknowledged by client.`);
|
console.log(`Message ${ack.messageId} acknowledged by client.`);
|
||||||
resolve(true);
|
resolve(true);
|
||||||
@@ -88,3 +101,122 @@ export async function removeMessageFromChannel(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export async function messagePing(
|
||||||
|
instanceId: string,
|
||||||
|
categoryId: string,
|
||||||
|
channelId: string,
|
||||||
|
messageId: string,
|
||||||
|
userId: string,
|
||||||
|
pingType: string,
|
||||||
|
text: string,
|
||||||
|
io: any,
|
||||||
|
mentionedUserIds?: string[],
|
||||||
|
): Promise<string | boolean> {
|
||||||
|
try {
|
||||||
|
const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } });
|
||||||
|
if (!curInstance) throw new Error("instance not found");
|
||||||
|
|
||||||
|
const curCategory = await prisma.category.findUnique({ where: { id: categoryId } });
|
||||||
|
if (!curCategory) throw new Error("category not found");
|
||||||
|
|
||||||
|
const curChannel = await prisma.channel.findUnique({ where: { id: channelId } });
|
||||||
|
if (!curChannel) throw new Error("channel not found");
|
||||||
|
|
||||||
|
if (pingType === "mention") {
|
||||||
|
if (!mentionedUserIds || mentionedUserIds.length === 0) {
|
||||||
|
throw new Error("no mentioned users provided for mention ping");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist pings (best-effort)
|
||||||
|
try {
|
||||||
|
const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m }));
|
||||||
|
await prisma.messagePing.createMany({ data: rows, skipDuplicates: true });
|
||||||
|
} catch (e) {
|
||||||
|
console.warn("services::realtime::messagePing - could not persist pings", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
const timeoutMs = 5000;
|
||||||
|
const results: Array<{ userId: string; delivered: boolean }> = [];
|
||||||
|
|
||||||
|
// For each mentioned user, find sockets on this server with matching socket.data.userId
|
||||||
|
await Promise.all(
|
||||||
|
mentionedUserIds.map(async (mentionedUserId) => {
|
||||||
|
const socketsForUser: Socket[] = [];
|
||||||
|
for (const sock of Array.from((io as Server).sockets.sockets.values())) {
|
||||||
|
try {
|
||||||
|
if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket);
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (socketsForUser.length === 0) {
|
||||||
|
results.push({ userId: mentsocketsionedUserId, delivered: false });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const perSocket = socketsForUser.map(
|
||||||
|
(sock) =>
|
||||||
|
new Promise<boolean>((resolve) => {
|
||||||
|
let done = false;
|
||||||
|
try {
|
||||||
|
sock.emit(
|
||||||
|
EVENTS.MESSAGE_PING,
|
||||||
|
{ categoryId, channelId, messageId, pingType, text, mentionedUserId },
|
||||||
|
(ack: any) => {
|
||||||
|
if (done) return;
|
||||||
|
done = true;
|
||||||
|
resolve(!!ack && ack.status === "received");
|
||||||
|
},
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
if (!done) {
|
||||||
|
done = true;
|
||||||
|
resolve(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
if (done) return;
|
||||||
|
done = true;
|
||||||
|
resolve(false);
|
||||||
|
}, timeoutMs);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const settled = await Promise.all(perSocket);
|
||||||
|
const delivered = settled.some(Boolean);
|
||||||
|
results.push({ userId: mentionedUserId, delivered });
|
||||||
|
} catch {
|
||||||
|
results.push({ userId: mentionedUserId, delivered: false });
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const anyDelivered = results.some((r) => r.delivered);
|
||||||
|
if (anyDelivered) {
|
||||||
|
console.log("services::realtime::messagePing delivered to some users", results);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("services::realtime::messagePing no acknowledgments", results);
|
||||||
|
return "no acknowledgment";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: emit a generic ping to the instance (fire-and-forget ack optional)
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
(io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => {
|
||||||
|
if (ack && ack.status === "received") {
|
||||||
|
resolve(true);
|
||||||
|
} else {
|
||||||
|
resolve("no acknowledgment");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
const errMessage = err as Error;
|
||||||
|
console.log("services::realtime::messagePing - ", errMessage);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
71
concord-server/src/validators/index.ts
Normal file
71
concord-server/src/validators/index.ts
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import { Hono } from "hono";
|
||||||
|
import { cors } from "hono/cors";
|
||||||
|
import { Server as Engine } from "@socket.io/bun-engine";
|
||||||
|
import { Server } from "socket.io";
|
||||||
|
import routes from "./routes/index";
|
||||||
|
import { Scalar } from "@scalar/hono-api-reference";
|
||||||
|
import { openAPIRouteHandler } from "hono-openapi";
|
||||||
|
|
||||||
|
//initialize socket.io server
|
||||||
|
const io = new Server();
|
||||||
|
|
||||||
|
//initialize bun engine
|
||||||
|
//then bind to socket.io server
|
||||||
|
const engine = new Engine();
|
||||||
|
io.bind(engine);
|
||||||
|
|
||||||
|
io.on("connection", (socket) => {
|
||||||
|
//get userId and clientId from query params
|
||||||
|
const userId = socket.handshake.query.userId;
|
||||||
|
const clientId = socket.handshake.query.clientId;
|
||||||
|
if (!userId || Array.isArray(userId)) {
|
||||||
|
socket.disconnect();
|
||||||
|
throw new Error("Invalid user ID");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clientId || Array.isArray(clientId)) {
|
||||||
|
socket.disconnect();
|
||||||
|
throw new Error("Invalid client ID");
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.join(userId);
|
||||||
|
console.log(
|
||||||
|
`User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
socket.on("disconnect", () => {
|
||||||
|
console.log(`User ${userId} disconnected from socket ${socket.id}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const app = new Hono();
|
||||||
|
|
||||||
|
app.use(
|
||||||
|
"*",
|
||||||
|
cors({
|
||||||
|
origin: "http://localhost:5173",
|
||||||
|
allowHeaders: ["Content-Type", "Authorization"],
|
||||||
|
allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
||||||
|
credentials: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
app.route("/api", routes);
|
||||||
|
|
||||||
|
app.get(
|
||||||
|
"/openapi",
|
||||||
|
openAPIRouteHandler(app, {
|
||||||
|
documentation: {
|
||||||
|
info: {
|
||||||
|
title: "Hono API",
|
||||||
|
version: "1.0.0",
|
||||||
|
description: "Greeting API",
|
||||||
|
},
|
||||||
|
servers: [{ url: "http://localhost:3000", description: "Local Server" }],
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
app.get("/scalar", Scalar({ url: "/openapi" }));
|
||||||
|
|
||||||
|
export default app;
|
||||||
0
concord-server/src/validators/realtimeValidator.ts
Normal file
0
concord-server/src/validators/realtimeValidator.ts
Normal file
Reference in New Issue
Block a user