nuked all realtime code
This commit is contained in:
@@ -1,90 +0,0 @@
|
||||
import { Context } from "hono";
|
||||
import {
|
||||
sendMessageToChannelEvent,
|
||||
removeMessageFromChannel,
|
||||
} from "../services/realtime.js";
|
||||
import { success } from "zod";
|
||||
import { PostMessageToChannelInput } from "../validators/realtimeValidator.js";
|
||||
import { sendMessage } from "./messageController.js";
|
||||
|
||||
export async function postMessageToChannel(io: any, c: Context, data: PostMessageToChannelInput) {
|
||||
const instanceId = data.instanceId;
|
||||
const categoryId = data.categoryId;
|
||||
const channelId = data.channelId;
|
||||
const userId = data.userId;
|
||||
const content = data.content;
|
||||
const token = data.token;
|
||||
const repliedMessageId = data.repliedMessageId ?? null;
|
||||
const event = "new_channel_message";
|
||||
return sendMessageToChannelEvent(
|
||||
instanceId,
|
||||
categoryId,
|
||||
channelId,
|
||||
userId,
|
||||
content,
|
||||
token,
|
||||
repliedMessageId,
|
||||
event,
|
||||
io
|
||||
);
|
||||
}
|
||||
|
||||
export async function deleteMessageFromChannel(io: any, c: Context) {
|
||||
try {
|
||||
io = c.get("io");
|
||||
|
||||
const instanceId = c.req.param("instanceId");
|
||||
const categoryId = c.req.param("categoryId");
|
||||
const channelId = c.req.param("channelId");
|
||||
const messageId = c.req.param("messageId");
|
||||
|
||||
const result = await removeMessageFromChannel(
|
||||
instanceId,
|
||||
categoryId,
|
||||
channelId,
|
||||
messageId,
|
||||
"delete_channel_message",
|
||||
io,
|
||||
);
|
||||
|
||||
if (result === "event not implemented") {
|
||||
console.log(
|
||||
"controller::realtime::deleteMessageFromChannel - Event not implemented",
|
||||
);
|
||||
return c.json({
|
||||
success: false,
|
||||
message: "Event not implemented or recognized",
|
||||
status: 400,
|
||||
});
|
||||
}
|
||||
|
||||
if (result === "no acknowledgment") {
|
||||
console.log(
|
||||
"controller::realtime::deleteMessageFromChannel - No acknowledgment received from client",
|
||||
);
|
||||
return c.json({
|
||||
success: false,
|
||||
message: "No acknowledgment received from client",
|
||||
status: 500,
|
||||
});
|
||||
}
|
||||
|
||||
if (!result) {
|
||||
throw new Error("failed to delete message");
|
||||
}
|
||||
|
||||
c.json({
|
||||
success: true,
|
||||
message: "Message deleted successfully",
|
||||
status: 200,
|
||||
});
|
||||
} catch (err) {
|
||||
const errMessage = err as Error;
|
||||
console.log("services::realtime::deleteMessageFromChannel - ", errMessage);
|
||||
return c.json({
|
||||
success: false,
|
||||
message: errMessage.message,
|
||||
status: 500,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -13,11 +13,11 @@ const io = new Server();
|
||||
//then bind to socket.io server
|
||||
const engine = new Engine();
|
||||
io.bind(engine);
|
||||
|
||||
/*
|
||||
io.on("connection", (socket) => {
|
||||
//get userId and clientId from query params
|
||||
const userId = socket.handshake.query.userId;
|
||||
const clientId = socket.handshake.query.clientId;
|
||||
const userId = socket.handshake.query.userId
|
||||
const clientId = socket.handshake.query.clientId
|
||||
if (!userId || Array.isArray(userId)) {
|
||||
socket.disconnect();
|
||||
throw new Error("Invalid user ID");
|
||||
@@ -28,15 +28,23 @@ io.on("connection", (socket) => {
|
||||
throw new Error("Invalid client ID");
|
||||
}
|
||||
|
||||
|
||||
socket.join(userId);
|
||||
console.log(
|
||||
`User ${userId} connected. Client ID ${clientId} on socket ${socket.id}`,
|
||||
);
|
||||
)
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
console.log(`User ${userId} disconnected from socket ${socket.id}`);
|
||||
});
|
||||
})
|
||||
});
|
||||
*/
|
||||
|
||||
io.on("ping", (socket) => {
|
||||
console.log(`New client connected: ${socket.id}`)
|
||||
socket.emit("pong", socket. )
|
||||
})
|
||||
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
import { Hono } from "hono";
|
||||
import { zValidator } from "@hono/zod-validator";
|
||||
import { describeRoute, resolver } from "hono-openapi";
|
||||
import {
|
||||
postMessageToChannelSchema
|
||||
} from "../validators/realtimeValidator.js";
|
||||
import {
|
||||
postMessageToChannel,
|
||||
deleteMessageFromChannel,
|
||||
} from "../controller/realtimeController";
|
||||
|
||||
|
||||
const realtimeRoutes = new Hono();
|
||||
|
||||
realtimeRoutes.post(
|
||||
"/message/:instanceId/:categoryId/:channelId",
|
||||
describeRoute({
|
||||
description: "Post a message to a channel",
|
||||
responses: {
|
||||
200: {
|
||||
description: "Message posted successfully",
|
||||
content: {
|
||||
"application/json": { schema: resolver(postMessageToChannelSchema) },
|
||||
},
|
||||
},
|
||||
400: {
|
||||
description: "Bad Request - Invalid input data",
|
||||
content: {
|
||||
"application/json": { schema: resolver(postMessageToChannelSchema) },
|
||||
},
|
||||
},
|
||||
401: {
|
||||
description: "Unauthorized - Invalid token",
|
||||
content: {
|
||||
"application/json": { schema: resolver(postMessageToChannelSchema) },
|
||||
},
|
||||
},
|
||||
404: {
|
||||
description: "Instance, Category, Channel, or User not found",
|
||||
content: {
|
||||
"application/json": { schema: resolver(postMessageToChannelSchema) },
|
||||
},
|
||||
},
|
||||
500: {
|
||||
description: "Server error",
|
||||
content: {
|
||||
"application/json": { schema: resolver(postMessageToChannelSchema) },
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
zValidator("json", postMessageToChannelSchema),
|
||||
async (c) => {
|
||||
const instanceId = c.req.param("instanceId");
|
||||
const categoryId = c.req.param("categoryId");
|
||||
const channelId = c.req.param("channelId");
|
||||
const { userId, content, repliedMessageId, token } = await c.req.json();
|
||||
|
||||
const ioServer = (c as any).get("io");
|
||||
if (!ioServer) {
|
||||
return c.json({ success: false, error: "Realtime server not available" }, 500);
|
||||
}
|
||||
|
||||
const result = await postMessageToChannel(ioServer, c, {
|
||||
instanceId,
|
||||
categoryId,
|
||||
channelId,
|
||||
userId,
|
||||
content,
|
||||
token,
|
||||
repliedMessageId: repliedMessageId ?? null,
|
||||
})
|
||||
|
||||
if (result === "event not implemented") {
|
||||
return c.json({ success: false, message: "Event not implemented or recognized" }, 400);
|
||||
}
|
||||
|
||||
if (result === "no acknowledgment") {
|
||||
return c.json({ success: false, message: "No acknowledgment received from client" }, 500);
|
||||
}
|
||||
|
||||
if (!result) {
|
||||
return c.json({ success: false, message: "Failed to post message" }, 500);
|
||||
}
|
||||
|
||||
return c.json({ success: true, result }, 200);
|
||||
}
|
||||
);
|
||||
|
||||
export default realtimeRoutes;
|
||||
@@ -1,222 +0,0 @@
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import type { Server, Socket } from "socket.io";
|
||||
import { sendMessageToChannel } from "./messageService";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
const EVENTS = {
|
||||
NEW_CHANNEL_MESSAGE: "new_channel_message",
|
||||
DELETE_CHANNEL_MESSAGE: "delete_channel_message",
|
||||
MESSAGE_PING: "message_ping",
|
||||
};
|
||||
|
||||
export async function sendMessageToChannelEvent(
|
||||
instanceId: string,
|
||||
categoryId: string,
|
||||
channelId: string,
|
||||
userId: string,
|
||||
content: string,
|
||||
token: string,
|
||||
repliedMessageId: string | null,
|
||||
event: string,
|
||||
io: any,
|
||||
): Promise<string | boolean> {
|
||||
try {
|
||||
//TODO: implement middleware to replace this
|
||||
if (EVENTS.NEW_CHANNEL_MESSAGE === event) {
|
||||
throw new Error("Event not implemented");
|
||||
}
|
||||
|
||||
const newMessage = await sendMessageToChannel(channelId, userId, content, token, repliedMessageId);
|
||||
|
||||
if (!newMessage) {
|
||||
console.log("services::realtime::sendMessageToChannel - could not create new message");
|
||||
return "failed to create message"
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
io.to(instanceId).emit(event, newMessage, (ack: any) => {
|
||||
if (ack && ack.status === "received") {
|
||||
console.log(`Message ${ack.messageId} acknowledged by client.`);
|
||||
resolve(true);
|
||||
} else {
|
||||
console.log(
|
||||
"services::realtime::sendMessageToChannel No acknowledgment received from client.",
|
||||
);
|
||||
resolve("no acknowledgment");
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
const errMessage = err as Error;
|
||||
if (errMessage.message === "Event not implemented") {
|
||||
console.log(
|
||||
`services::realtime::sendMessageToChannel - Event not implemented. Attempted event: ${event}`,
|
||||
);
|
||||
return "event not implemented";
|
||||
}
|
||||
console.log("services::realtime::sendMessageToChannel - ", errMessage);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export async function removeMessageFromChannel(
|
||||
instanceId: string,
|
||||
categoryId: string,
|
||||
channelId: string,
|
||||
messageId: string,
|
||||
event: string,
|
||||
io: any,
|
||||
): Promise<string | boolean> {
|
||||
try {
|
||||
//TODO: implement middleware to replace this
|
||||
if (EVENTS.DELETE_CHANNEL_MESSAGE === event) {
|
||||
throw new Error("event not implemented");
|
||||
}
|
||||
|
||||
//TODO: add prisma to flag a channel message as deleted
|
||||
|
||||
return new Promise((resolve) => {
|
||||
io.to(instanceId).emit(event, { messageId }, (ack: any) => {
|
||||
if (ack && ack.status === "received") {
|
||||
console.log(`Message ${ack.messageId} acknowledged by client.`);
|
||||
resolve(true);
|
||||
} else {
|
||||
console.log(
|
||||
"services::realtime::deleteMessageFromChannel No acknowledgment received from client.",
|
||||
);
|
||||
resolve("no acknowledgment");
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
const errMessage = err as Error;
|
||||
if (errMessage.message === "Event not implemented") {
|
||||
console.log(
|
||||
`services::realtime::deleteMessageFromChannel - Event not implemented. Attempted event: ${event}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
console.log("services::realtime::deleteMessageFromChannel - ", errMessage);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export async function messagePing(
|
||||
instanceId: string,
|
||||
categoryId: string,
|
||||
channelId: string,
|
||||
messageId: string,
|
||||
userId: string,
|
||||
pingType: string,
|
||||
text: string,
|
||||
io: any,
|
||||
mentionedUserIds?: string[],
|
||||
): Promise<string | boolean> {
|
||||
try {
|
||||
const curInstance = await prisma.instance.findUnique({ where: { id: instanceId } });
|
||||
if (!curInstance) throw new Error("instance not found");
|
||||
|
||||
const curCategory = await prisma.category.findUnique({ where: { id: categoryId } });
|
||||
if (!curCategory) throw new Error("category not found");
|
||||
|
||||
const curChannel = await prisma.channel.findUnique({ where: { id: channelId } });
|
||||
if (!curChannel) throw new Error("channel not found");
|
||||
|
||||
if (pingType === "mention") {
|
||||
if (!mentionedUserIds || mentionedUserIds.length === 0) {
|
||||
throw new Error("no mentioned users provided for mention ping");
|
||||
}
|
||||
|
||||
// Persist pings (best-effort)
|
||||
try {
|
||||
const rows = mentionedUserIds.map((m) => ({ messageId, pingsUserId: m }));
|
||||
await prisma.messagePing.createMany({ data: rows, skipDuplicates: true });
|
||||
} catch (e) {
|
||||
console.warn("services::realtime::messagePing - could not persist pings", e);
|
||||
}
|
||||
|
||||
const timeoutMs = 5000;
|
||||
const results: Array<{ userId: string; delivered: boolean }> = [];
|
||||
|
||||
// For each mentioned user, find sockets on this server with matching socket.data.userId
|
||||
await Promise.all(
|
||||
mentionedUserIds.map(async (mentionedUserId) => {
|
||||
const socketsForUser: Socket[] = [];
|
||||
for (const sock of Array.from((io as Server).sockets.sockets.values())) {
|
||||
try {
|
||||
if ((sock as Socket).data?.userId === mentionedUserId) socketsForUser.push(sock as Socket);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
if (socketsForUser.length === 0) {
|
||||
results.push({ userId: mentsocketsionedUserId, delivered: false });
|
||||
return;
|
||||
}
|
||||
|
||||
const perSocket = socketsForUser.map(
|
||||
(sock) =>
|
||||
new Promise<boolean>((resolve) => {
|
||||
let done = false;
|
||||
try {
|
||||
sock.emit(
|
||||
EVENTS.MESSAGE_PING,
|
||||
{ categoryId, channelId, messageId, pingType, text, mentionedUserId },
|
||||
(ack: any) => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
resolve(!!ack && ack.status === "received");
|
||||
},
|
||||
);
|
||||
} catch (e) {
|
||||
if (!done) {
|
||||
done = true;
|
||||
resolve(false);
|
||||
}
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
resolve(false);
|
||||
}, timeoutMs);
|
||||
}),
|
||||
);
|
||||
|
||||
try {
|
||||
const settled = await Promise.all(perSocket);
|
||||
const delivered = settled.some(Boolean);
|
||||
results.push({ userId: mentionedUserId, delivered });
|
||||
} catch {
|
||||
results.push({ userId: mentionedUserId, delivered: false });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
const anyDelivered = results.some((r) => r.delivered);
|
||||
if (anyDelivered) {
|
||||
console.log("services::realtime::messagePing delivered to some users", results);
|
||||
return true;
|
||||
}
|
||||
|
||||
console.log("services::realtime::messagePing no acknowledgments", results);
|
||||
return "no acknowledgment";
|
||||
}
|
||||
|
||||
// Fallback: emit a generic ping to the instance (fire-and-forget ack optional)
|
||||
return new Promise((resolve) => {
|
||||
(io as Server).emit(EVENTS.MESSAGE_PING, { categoryId, channelId, messageId, pingType, text }, (ack: any) => {
|
||||
if (ack && ack.status === "received") {
|
||||
resolve(true);
|
||||
} else {
|
||||
resolve("no acknowledgment");
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
const errMessage = err as Error;
|
||||
console.log("services::realtime::messagePing - ", errMessage);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user