From f39f8ee830ebea5408502fc1e390ae8d43433f61 Mon Sep 17 00:00:00 2001 From: Nixyan Date: Thu, 8 Jan 2026 10:01:13 -0300 Subject: [PATCH] feat: - Fixed race condition on @olm-context.tsx when a session is created - Fixed an issue where the participants ids where being set to the current user only - Added safeguard for the dm event to try to either parse the message or use it as-is --- convex/betterAuth/olm/index.ts | 1 + src/components/ui/dm/DmChannelContent.tsx | 6 +- src/contexts/olm-context.tsx | 170 ++++++++++++---------- src/contexts/socket-context.tsx | 148 ++++++++++--------- src/lib/sockets/events/dm.ts | 24 ++- 5 files changed, 199 insertions(+), 150 deletions(-) diff --git a/convex/betterAuth/olm/index.ts b/convex/betterAuth/olm/index.ts index 8193ec5..de555d7 100644 --- a/convex/betterAuth/olm/index.ts +++ b/convex/betterAuth/olm/index.ts @@ -58,6 +58,7 @@ export const consumeOTK = mutation({ const oneTimeKeys = olmAccount.oneTimeKeys; const keyIndex = oneTimeKeys.findIndex((key) => key.keyId === args.keyId); + if (keyIndex === -1) throw new Error("The key to be consumed was not found"); oneTimeKeys.splice(keyIndex, 1); diff --git a/src/components/ui/dm/DmChannelContent.tsx b/src/components/ui/dm/DmChannelContent.tsx index a61834c..9d45ed2 100644 --- a/src/components/ui/dm/DmChannelContent.tsx +++ b/src/components/ui/dm/DmChannelContent.tsx @@ -22,7 +22,10 @@ export default function DMChannelContent( participantDetails, }: DMChannelContentProps ) { - const otherUser = useMemo(() => participantDetails[0], [participantDetails]); + + const otherUser = useMemo(() => { + return participantDetails.find((p) => p.id !== userId); + }, [participantDetails, userId]); const [olmSession, setOlmSession] = useState(null); const [sessionError, setSessionError] = useState(null); const [messageInput, setMessageInput] = useState(""); @@ -339,6 +342,7 @@ export default function DMChannelContent( recipientId: otherUser.id, password, }); + if (messageId) { setMessageInput(""); } diff --git a/src/contexts/olm-context.tsx b/src/contexts/olm-context.tsx index fc2f720..9618e84 100644 --- a/src/contexts/olm-context.tsx +++ b/src/contexts/olm-context.tsx @@ -59,6 +59,8 @@ export function OlmProvider({ // Cache sessions in memory: recipientId -> Session const sessionsRef = useRef>(new Map()); + // Track pending session creation to prevent race conditions + const pendingSessionsRef = useRef>>(new Map()); const [, forceUpdate] = useState({}); // Helper: Cache session in memory @@ -82,7 +84,7 @@ export function OlmProvider({ createdAt: Date.now(), updatedAt: Date.now(), }); - console.debug("[OlmContext] ✓ Session saved to DB"); + console.debug("[OlmContext]: Session saved to DB"); }, [userId]); // Helper: Unpickle session from database @@ -95,10 +97,10 @@ export function OlmProvider({ const Olm = await loadOlm(); const session = new Olm.Session(); session.unpickle(sessionPassword, pickledSession); - console.debug("[OlmContext] ✓ Session unpickled from DB"); + console.debug("[OlmContext]: Session unpickled from DB"); return session; } catch (err) { - console.warn("[OlmContext] Failed to unpickle session:", err); + console.warn("[OlmContext]: Failed to unpickle session:", err); // Delete corrupted session if (userId) { await db.olmSessions @@ -120,7 +122,7 @@ export function OlmProvider({ const missing = requirements.find(req => !req.value); if (missing) { - console.error(`[OlmContext] Cannot perform session operation: missing ${missing.name}`); + console.error(`[OlmContext]: Cannot perform session operation: missing ${missing.name}`); return false; } @@ -170,17 +172,17 @@ export function OlmProvider({ const loadAccount = async () => { try { - console.debug("[OlmContext] Loading OLM account..."); + console.debug("[OlmContext]: Loading OLM account..."); const account = await getOlmAccount(userId, password); if (!account) { - console.warn("[OlmContext] No OLM account found"); + console.warn("[OlmContext]: No OLM account found"); return; } setOlmAccount(account); - console.debug("[OlmContext] ✓ OLM account loaded successfully"); + console.debug("[OlmContext]: OLM account loaded successfully"); } catch (err) { - console.error("[OlmContext] Failed to load OLM account:", err); + console.error("[OlmContext]: Failed to load OLM account:", err); // Password might be wrong - clear it clearPassword(); } @@ -232,73 +234,91 @@ export function OlmProvider({ // Check if we already have this session in memory if (sessionsRef.current.has(recipientId)) { - console.debug(`[OlmContext] Using cached session for ${recipientId}`); + console.debug(`[OlmContext]: Using cached session for ${recipientId}`); return sessionsRef.current.get(recipientId)!; } - try { - console.debug(`[OlmContext] Loading/creating session for user ${recipientId}`); - - // Check if session exists in DB - const existingSession = await db.olmSessions - .where("[odId+recipientId]") - .equals([userId!, recipientId]) - .first(); - - if (existingSession) { - console.debug("[OlmContext] Found existing session in DB, unpickling..."); - const session = await unpickleSessionFromDb(recipientId, existingSession.pickledSession, password!); - - if (session) { - cacheSession(recipientId, session); - console.debug("[OlmContext] ✓ Session loaded from DB"); - return session; - } - // If unpickling failed, continue to create new session - } - - // Create new outbound session - console.debug("[OlmContext] Creating new outbound session..."); - - if (recipientOlmAccount.oneTimeKeys.length === 0) { - throw new Error("No one-time keys available for recipient"); - } - - const otk = recipientOlmAccount.oneTimeKeys[0]; - const Olm = await loadOlm(); - const newSession = new Olm.Session(); - - newSession.create_outbound( - olmAccount!, - recipientOlmAccount.identityKey.curve25519, - otk.publicKey - ); - - console.debug(`[OlmContext] ✓ Created session: ${newSession.session_id()}`); - - // Save to DB - await saveSessionToDb(recipientId, newSession, password!); - - // Consume the OTK from server - try { - await consumeOTK({ - userId: recipientId, - keyId: otk.keyId, - }); - console.debug(`[OlmContext] ✓ Consumed OTK: ${otk.keyId}`); - } catch (err) { - console.error("[OlmContext] Failed to consume OTK:", err); - } - - // Cache it - cacheSession(recipientId, newSession); - - return newSession; - } catch (err) { - console.error("[OlmContext] Failed to get/create session:", err); - return null; + // Check if session creation is already in progress for this recipient + const pendingSession = pendingSessionsRef.current.get(recipientId); + if (pendingSession) { + console.debug(`[OlmContext]: Waiting for pending session creation for ${recipientId}`); + return pendingSession; } - }, [userId, olmAccount, password, consumeOTK, validateSessionRequirements, unpickleSessionFromDb, cacheSession, saveSessionToDb]); + + // Create a new promise for this session creation + const sessionPromise = (async () => { + try { + console.debug(`[OlmContext]: Loading/creating session for user ${recipientId}`); + + // Check if session exists in DB + const existingSession = await db.olmSessions + .where("[odId+recipientId]") + .equals([userId!, recipientId]) + .first(); + + if (existingSession) { + console.debug("[OlmContext]: Found existing session in DB, unpickling..."); + const session = await unpickleSessionFromDb(recipientId, existingSession.pickledSession, password!); + + if (session) { + cacheSession(recipientId, session); + console.debug("[OlmContext]: Session loaded from DB"); + return session; + } + // If unpickling failed, continue to create new session + } + + // Create new outbound session + console.debug("[OlmContext]: Creating new outbound session..."); + + if (recipientOlmAccount.oneTimeKeys.length === 0) { + throw new Error("No one-time keys available for recipient"); + } + + const otk = recipientOlmAccount.oneTimeKeys[0]; + const Olm = await loadOlm(); + const newSession = new Olm.Session(); + + newSession.create_outbound( + olmAccount!, + recipientOlmAccount.identityKey.curve25519, + otk.publicKey + ); + + console.debug(`[OlmContext]: Created session: ${newSession.session_id()}`); + + // Save to DB + await saveSessionToDb(recipientId, newSession, password!); + + // Consume the OTK from server + try { + await consumeOTK({ + userId: recipientId, + keyId: otk.keyId, + }); + console.debug(`[OlmContext]: Consumed OTK: ${otk.keyId}`); + } catch (err) { + console.error("[OlmContext]: Failed to consume OTK:", err); + } + + // Cache it + cacheSession(recipientId, newSession); + + return newSession; + } catch (err) { + console.error("[OlmContext]: Failed to get/create session:", err); + return null; + } finally { + // Clean up pending promise + pendingSessionsRef.current.delete(recipientId); + } + })(); + + // Store the promise so concurrent calls can await it + pendingSessionsRef.current.set(recipientId, sessionPromise); + + return sessionPromise; + }, [userId, olmAccount, password, validateSessionRequirements, unpickleSessionFromDb, cacheSession, saveSessionToDb, consumeOTK]); // Create an INBOUND session from a received pre-key message const createInboundSession = useCallback(async ( @@ -311,12 +331,12 @@ export function OlmProvider({ // Check if we already have a session with this sender if (sessionsRef.current.has(senderId)) { - console.debug(`[OlmContext] Session already exists for ${senderId}`); + console.debug(`[OlmContext]: Session already exists for ${senderId}`); return sessionsRef.current.get(senderId)!; } try { - console.debug(`[OlmContext] Creating inbound session from sender ${senderId}`); + console.debug(`[OlmContext]: Creating inbound session from sender ${senderId}`); const Olm = await loadOlm(); const newSession = new Olm.Session(); @@ -327,7 +347,7 @@ export function OlmProvider({ // Remove the one-time key that was used olmAccount!.remove_one_time_keys(newSession); - console.debug(`[OlmContext] ✓ Created inbound session: ${newSession.session_id()}`); + console.debug(`[OlmContext]: Created inbound session: ${newSession.session_id()}`); // Save to DB await saveSessionToDb(senderId, newSession, password!); @@ -337,7 +357,7 @@ export function OlmProvider({ return newSession; } catch (err) { - console.error("[OlmContext] Failed to create inbound session:", err); + console.error("[OlmContext]: Failed to create inbound session:", err); return null; } }, [validateSessionRequirements, olmAccount, password, saveSessionToDb, cacheSession]); diff --git a/src/contexts/socket-context.tsx b/src/contexts/socket-context.tsx index df03a6d..9347320 100644 --- a/src/contexts/socket-context.tsx +++ b/src/contexts/socket-context.tsx @@ -87,7 +87,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr pickledSession: session.pickle(password), updatedAt: Date.now(), }); - console.debug("[Socket] Session state saved after decrypt"); + console.debug("[Socket]: Session state saved after decrypt"); }, [password]); // Helper: Decrypt, validate, and store message @@ -101,7 +101,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr // Decrypt the message const decryptedBody = session.decrypt(messageType, encryptedBody); const message = JSON.parse(decryptedBody); - console.debug("[Socket] Decrypted message:", message); + console.debug("[Socket]: Decrypted message:", message); // Save session state after decryption await saveSessionState(session, currentUserId, fromUserId); @@ -109,14 +109,14 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr // Validate with ZOD const validatedMessage = MESSAGE_SCHEMA.safeParse(message); if (!validatedMessage.success) { - console.error("[Socket] Invalid message:", validatedMessage.error); + console.error("[Socket]: Invalid message:", validatedMessage.error); throw new Error("Invalid message format"); } // Store message and increment unread count await storeMessage(validatedMessage.data as SiPher.Messages.ClientEncrypted.EncryptedMessage & { to: string }); await incrementUnread(validatedMessage.data.channelId); - console.debug("[Socket] Message stored successfully"); + console.debug("[Socket]: Message stored successfully"); }, [saveSessionState]); // Manual disconnect function @@ -141,7 +141,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr const sendMessage = useCallback((message: { type: 0 | 1; body: string }, to: string) => { if (!socketRef.current) { - console.warn("⚠️ Cannot send message: Socket not connected"); + console.warn("[Socket]: Cannot send message due to socket not being connected"); return; } @@ -152,84 +152,91 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr }, []); // Define message processor that can be called from both socket handler and queue processor - const processIncomingDM = useCallback(async (data: { content: { type: 0 | 1; body: unknown }, participants: string[] }) => { - // Get the current user id - const currentUserId = user.id; - if (!currentUserId) { - console.error("[Socket] No user ID available"); - return; - } - - // Extract sender from participants - const fromUserId = data.participants.find((participant) => participant !== currentUserId); - if (!fromUserId) { - console.error("[Socket] Could not determine sender from participants"); - return; - } - - // Fetch participant details - try { - const participantDetails = await convex.query(api.auth.getParticipantDetails, { - participantIds: [fromUserId] - }); - - const fromUser = participantDetails?.[0]; - if (!fromUser) { - console.error("[Socket] Failed to get from user"); + const processIncomingDM = useCallback( + async (data: { content: { type: 0 | 1; body: unknown }, participants: string[] }) => { + // Get the current user id + const currentUserId = user.id; + if (!currentUserId) { + console.error("[Socket]: No user ID available"); + return; + } else if ( + data.participants.length !== 2 || + data.participants.some((participant) => participant === currentUserId) + ) { + console.error("[Socket]: Invalid DM data, participants:", data.participants, "currentUserId:", currentUserId); return; } - const { type, body } = data.content; - - switch (type) { - case 0: { - console.debug("[Socket] Received inbound message from pre-key message"); - - const session = await createInboundSession(fromUserId, body as string); - if (!session) { - console.error("[Socket] Failed to create inbound session"); - return; - } - - // Now we can create or open the DM channel - await getOrCreateDmChannel(currentUserId, fromUser); - - // Decrypt, validate, and store using helper - await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId); - break; - } - case 1: { - console.debug("[Socket] Received regular message"); - - // Get existing session from cache - const session = sessions.get(fromUserId); - if (!session) { - console.error("[Socket] No session found for sender. This shouldn't happen!"); - return; - } - - // Decrypt, validate, and store using helper - await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId); - break; - } + // Extract sender from participants + const fromUserId = data.participants.find((participant) => participant !== currentUserId); + if (!fromUserId) { + console.error("[Socket]: Could not determine sender from participants"); + return; } - } catch (error) { - console.error("[Socket] Error handling incoming DM:", error); - } - }, [user.id, createInboundSession, sessions, decryptAndStoreMessage]); + + // Fetch participant details + try { + const participantDetails = await convex.query(api.auth.getParticipantDetails, { + participantIds: [fromUserId] + }); + + const fromUser = participantDetails?.[0]; + if (!fromUser) { + console.error("[Socket]: Failed to get from user"); + return; + } + + const { type, body } = data.content; + + switch (type) { + case 0: { + console.debug("[Socket]: Received inbound message from pre-key message"); + + const session = await createInboundSession(fromUserId, body as string); + if (!session) { + console.error("[Socket]: Failed to create inbound session"); + return; + } + + // Now we can create or open the DM channel + await getOrCreateDmChannel(currentUserId, fromUser); + + // Decrypt, validate, and store using helper + await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId); + break; + } + case 1: { + console.debug("[Socket]: Received regular message"); + + // Get existing session from cache + const session = sessions.get(fromUserId); + if (!session) { + console.error("[Socket]: No session found for sender. This shouldn't happen!"); + return; + } + + // Decrypt, validate, and store using helper + await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId); + break; + } + } + } catch (error) { + console.error("[Socket]: Error handling incoming DM:", error); + } + }, [user.id, createInboundSession, sessions, decryptAndStoreMessage]); // Process queued messages when OLM becomes ready useEffect(() => { if (!olmAccount || !olmIsReady || messageQueueRef.current.length === 0) return; - console.log(`[Socket] OLM is now ready, processing ${messageQueueRef.current.length} queued messages`); + console.log(`[Socket]: OLM is now ready, processing ${messageQueueRef.current.length} queued messages`); const processQueue = async () => { const queue = [...messageQueueRef.current]; messageQueueRef.current = []; // Clear queue for (const data of queue) { - console.log("[Socket] Processing queued message:", data); + console.log("[Socket]: Processing queued message:", data); await processIncomingDM(data); } }; @@ -255,7 +262,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr // Use acknowledgment callback for reliable latency measurement socket.timeout(5000).emit("ping", (err: Error, serverTimestamp: number) => { if (err) { - console.warn("[Socket] Ping timeout or error:", err); + console.warn("[Socket]: Ping timeout or error:", err); updateSocketInfo({ ping: null }); return; } @@ -372,12 +379,13 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr // Check if OLM account is loaded if (!olmAccount) { - console.warn("[Socket] OLM account not loaded yet, queueing message for later processing"); + console.warn("[Socket]: OLM account not loaded yet, queueing message for later processing"); messageQueueRef.current.push(data); return; } // Process immediately if OLM is ready + console.debug("[Socket]: Processing incoming DM immediately:", data); await processIncomingDM(data); }); diff --git a/src/lib/sockets/events/dm.ts b/src/lib/sockets/events/dm.ts index 6f86552..4d2a4bb 100644 --- a/src/lib/sockets/events/dm.ts +++ b/src/lib/sockets/events/dm.ts @@ -48,15 +48,31 @@ const dmEvent: SiPher.EventsType = { // Join sender to the DM room socket.join(roomId); + // Message parser: + // 08/01/2026: I bwoke it :3 - Cete + const message: { + type: 0 | 1; + body: string; + } = typeof content === "string" ? JSON.parse(content) : content; + // Send to the DM room (for users already in the room) - io.to(roomId).emit("dm:message", JSON.parse(content)); + io.to(roomId).emit("dm:message", message); // Also send directly to recipient's socket (socket.id = user.id) // This ensures they receive the message even if not in the DM room yet - io.to(to).emit("dm:new", { - content: JSON.parse(content), + const dmData = { + content: message, participants: [sender.id, to].sort(), - }); + }; + + // Before sending, check if the participant ids are not the same (This is happening) + if (sender.id === to) { + socket.emit("error", { message: "Cannot send DM to yourself" }); + console.error("[DM] Cannot send DM to yourself: ", sender.id, "→", to); + return; + } + + io.to(to).emit("dm:new", dmData); console.log(`[DM] ${sender.id} → ${to} in room ${roomId}`); },