- Fixed race condition on @olm-context.tsx when a session is created
- Fixed an issue where the participants ids where being set to the current user only
- Added safeguard for the dm event to try to either parse the message or use it as-is
This commit is contained in:
Nixyan 2026-01-08 10:01:13 -03:00
parent be3b0ba591
commit f39f8ee830
5 changed files with 199 additions and 150 deletions

View file

@ -58,6 +58,7 @@ export const consumeOTK = mutation({
const oneTimeKeys = olmAccount.oneTimeKeys; const oneTimeKeys = olmAccount.oneTimeKeys;
const keyIndex = oneTimeKeys.findIndex((key) => key.keyId === args.keyId); const keyIndex = oneTimeKeys.findIndex((key) => key.keyId === args.keyId);
if (keyIndex === -1) throw new Error("The key to be consumed was not found"); if (keyIndex === -1) throw new Error("The key to be consumed was not found");
oneTimeKeys.splice(keyIndex, 1); oneTimeKeys.splice(keyIndex, 1);

View file

@ -22,7 +22,10 @@ export default function DMChannelContent(
participantDetails, participantDetails,
}: DMChannelContentProps }: DMChannelContentProps
) { ) {
const otherUser = useMemo(() => participantDetails[0], [participantDetails]);
const otherUser = useMemo(() => {
return participantDetails.find((p) => p.id !== userId);
}, [participantDetails, userId]);
const [olmSession, setOlmSession] = useState<Olm.Session | null>(null); const [olmSession, setOlmSession] = useState<Olm.Session | null>(null);
const [sessionError, setSessionError] = useState<string | null>(null); const [sessionError, setSessionError] = useState<string | null>(null);
const [messageInput, setMessageInput] = useState(""); const [messageInput, setMessageInput] = useState("");
@ -339,6 +342,7 @@ export default function DMChannelContent(
recipientId: otherUser.id, recipientId: otherUser.id,
password, password,
}); });
if (messageId) { if (messageId) {
setMessageInput(""); setMessageInput("");
} }

View file

@ -59,6 +59,8 @@ export function OlmProvider({
// Cache sessions in memory: recipientId -> Session // Cache sessions in memory: recipientId -> Session
const sessionsRef = useRef<Map<string, Olm.Session>>(new Map()); const sessionsRef = useRef<Map<string, Olm.Session>>(new Map());
// Track pending session creation to prevent race conditions
const pendingSessionsRef = useRef<Map<string, Promise<Olm.Session | null>>>(new Map());
const [, forceUpdate] = useState({}); const [, forceUpdate] = useState({});
// Helper: Cache session in memory // Helper: Cache session in memory
@ -82,7 +84,7 @@ export function OlmProvider({
createdAt: Date.now(), createdAt: Date.now(),
updatedAt: Date.now(), updatedAt: Date.now(),
}); });
console.debug("[OlmContext] Session saved to DB"); console.debug("[OlmContext]: Session saved to DB");
}, [userId]); }, [userId]);
// Helper: Unpickle session from database // Helper: Unpickle session from database
@ -95,10 +97,10 @@ export function OlmProvider({
const Olm = await loadOlm(); const Olm = await loadOlm();
const session = new Olm.Session(); const session = new Olm.Session();
session.unpickle(sessionPassword, pickledSession); session.unpickle(sessionPassword, pickledSession);
console.debug("[OlmContext] Session unpickled from DB"); console.debug("[OlmContext]: Session unpickled from DB");
return session; return session;
} catch (err) { } catch (err) {
console.warn("[OlmContext] Failed to unpickle session:", err); console.warn("[OlmContext]: Failed to unpickle session:", err);
// Delete corrupted session // Delete corrupted session
if (userId) { if (userId) {
await db.olmSessions await db.olmSessions
@ -120,7 +122,7 @@ export function OlmProvider({
const missing = requirements.find(req => !req.value); const missing = requirements.find(req => !req.value);
if (missing) { if (missing) {
console.error(`[OlmContext] Cannot perform session operation: missing ${missing.name}`); console.error(`[OlmContext]: Cannot perform session operation: missing ${missing.name}`);
return false; return false;
} }
@ -170,17 +172,17 @@ export function OlmProvider({
const loadAccount = async () => { const loadAccount = async () => {
try { try {
console.debug("[OlmContext] Loading OLM account..."); console.debug("[OlmContext]: Loading OLM account...");
const account = await getOlmAccount(userId, password); const account = await getOlmAccount(userId, password);
if (!account) { if (!account) {
console.warn("[OlmContext] No OLM account found"); console.warn("[OlmContext]: No OLM account found");
return; return;
} }
setOlmAccount(account); setOlmAccount(account);
console.debug("[OlmContext] OLM account loaded successfully"); console.debug("[OlmContext]: OLM account loaded successfully");
} catch (err) { } catch (err) {
console.error("[OlmContext] Failed to load OLM account:", err); console.error("[OlmContext]: Failed to load OLM account:", err);
// Password might be wrong - clear it // Password might be wrong - clear it
clearPassword(); clearPassword();
} }
@ -232,73 +234,91 @@ export function OlmProvider({
// Check if we already have this session in memory // Check if we already have this session in memory
if (sessionsRef.current.has(recipientId)) { if (sessionsRef.current.has(recipientId)) {
console.debug(`[OlmContext] Using cached session for ${recipientId}`); console.debug(`[OlmContext]: Using cached session for ${recipientId}`);
return sessionsRef.current.get(recipientId)!; return sessionsRef.current.get(recipientId)!;
} }
try { // Check if session creation is already in progress for this recipient
console.debug(`[OlmContext] Loading/creating session for user ${recipientId}`); const pendingSession = pendingSessionsRef.current.get(recipientId);
if (pendingSession) {
// Check if session exists in DB console.debug(`[OlmContext]: Waiting for pending session creation for ${recipientId}`);
const existingSession = await db.olmSessions return pendingSession;
.where("[odId+recipientId]")
.equals([userId!, recipientId])
.first();
if (existingSession) {
console.debug("[OlmContext] Found existing session in DB, unpickling...");
const session = await unpickleSessionFromDb(recipientId, existingSession.pickledSession, password!);
if (session) {
cacheSession(recipientId, session);
console.debug("[OlmContext] ✓ Session loaded from DB");
return session;
}
// If unpickling failed, continue to create new session
}
// Create new outbound session
console.debug("[OlmContext] Creating new outbound session...");
if (recipientOlmAccount.oneTimeKeys.length === 0) {
throw new Error("No one-time keys available for recipient");
}
const otk = recipientOlmAccount.oneTimeKeys[0];
const Olm = await loadOlm();
const newSession = new Olm.Session();
newSession.create_outbound(
olmAccount!,
recipientOlmAccount.identityKey.curve25519,
otk.publicKey
);
console.debug(`[OlmContext] ✓ Created session: ${newSession.session_id()}`);
// Save to DB
await saveSessionToDb(recipientId, newSession, password!);
// Consume the OTK from server
try {
await consumeOTK({
userId: recipientId,
keyId: otk.keyId,
});
console.debug(`[OlmContext] ✓ Consumed OTK: ${otk.keyId}`);
} catch (err) {
console.error("[OlmContext] Failed to consume OTK:", err);
}
// Cache it
cacheSession(recipientId, newSession);
return newSession;
} catch (err) {
console.error("[OlmContext] Failed to get/create session:", err);
return null;
} }
}, [userId, olmAccount, password, consumeOTK, validateSessionRequirements, unpickleSessionFromDb, cacheSession, saveSessionToDb]);
// Create a new promise for this session creation
const sessionPromise = (async () => {
try {
console.debug(`[OlmContext]: Loading/creating session for user ${recipientId}`);
// Check if session exists in DB
const existingSession = await db.olmSessions
.where("[odId+recipientId]")
.equals([userId!, recipientId])
.first();
if (existingSession) {
console.debug("[OlmContext]: Found existing session in DB, unpickling...");
const session = await unpickleSessionFromDb(recipientId, existingSession.pickledSession, password!);
if (session) {
cacheSession(recipientId, session);
console.debug("[OlmContext]: Session loaded from DB");
return session;
}
// If unpickling failed, continue to create new session
}
// Create new outbound session
console.debug("[OlmContext]: Creating new outbound session...");
if (recipientOlmAccount.oneTimeKeys.length === 0) {
throw new Error("No one-time keys available for recipient");
}
const otk = recipientOlmAccount.oneTimeKeys[0];
const Olm = await loadOlm();
const newSession = new Olm.Session();
newSession.create_outbound(
olmAccount!,
recipientOlmAccount.identityKey.curve25519,
otk.publicKey
);
console.debug(`[OlmContext]: Created session: ${newSession.session_id()}`);
// Save to DB
await saveSessionToDb(recipientId, newSession, password!);
// Consume the OTK from server
try {
await consumeOTK({
userId: recipientId,
keyId: otk.keyId,
});
console.debug(`[OlmContext]: Consumed OTK: ${otk.keyId}`);
} catch (err) {
console.error("[OlmContext]: Failed to consume OTK:", err);
}
// Cache it
cacheSession(recipientId, newSession);
return newSession;
} catch (err) {
console.error("[OlmContext]: Failed to get/create session:", err);
return null;
} finally {
// Clean up pending promise
pendingSessionsRef.current.delete(recipientId);
}
})();
// Store the promise so concurrent calls can await it
pendingSessionsRef.current.set(recipientId, sessionPromise);
return sessionPromise;
}, [userId, olmAccount, password, validateSessionRequirements, unpickleSessionFromDb, cacheSession, saveSessionToDb, consumeOTK]);
// Create an INBOUND session from a received pre-key message // Create an INBOUND session from a received pre-key message
const createInboundSession = useCallback(async ( const createInboundSession = useCallback(async (
@ -311,12 +331,12 @@ export function OlmProvider({
// Check if we already have a session with this sender // Check if we already have a session with this sender
if (sessionsRef.current.has(senderId)) { if (sessionsRef.current.has(senderId)) {
console.debug(`[OlmContext] Session already exists for ${senderId}`); console.debug(`[OlmContext]: Session already exists for ${senderId}`);
return sessionsRef.current.get(senderId)!; return sessionsRef.current.get(senderId)!;
} }
try { try {
console.debug(`[OlmContext] Creating inbound session from sender ${senderId}`); console.debug(`[OlmContext]: Creating inbound session from sender ${senderId}`);
const Olm = await loadOlm(); const Olm = await loadOlm();
const newSession = new Olm.Session(); const newSession = new Olm.Session();
@ -327,7 +347,7 @@ export function OlmProvider({
// Remove the one-time key that was used // Remove the one-time key that was used
olmAccount!.remove_one_time_keys(newSession); olmAccount!.remove_one_time_keys(newSession);
console.debug(`[OlmContext] Created inbound session: ${newSession.session_id()}`); console.debug(`[OlmContext]: Created inbound session: ${newSession.session_id()}`);
// Save to DB // Save to DB
await saveSessionToDb(senderId, newSession, password!); await saveSessionToDb(senderId, newSession, password!);
@ -337,7 +357,7 @@ export function OlmProvider({
return newSession; return newSession;
} catch (err) { } catch (err) {
console.error("[OlmContext] Failed to create inbound session:", err); console.error("[OlmContext]: Failed to create inbound session:", err);
return null; return null;
} }
}, [validateSessionRequirements, olmAccount, password, saveSessionToDb, cacheSession]); }, [validateSessionRequirements, olmAccount, password, saveSessionToDb, cacheSession]);

View file

@ -87,7 +87,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
pickledSession: session.pickle(password), pickledSession: session.pickle(password),
updatedAt: Date.now(), updatedAt: Date.now(),
}); });
console.debug("[Socket] Session state saved after decrypt"); console.debug("[Socket]: Session state saved after decrypt");
}, [password]); }, [password]);
// Helper: Decrypt, validate, and store message // Helper: Decrypt, validate, and store message
@ -101,7 +101,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
// Decrypt the message // Decrypt the message
const decryptedBody = session.decrypt(messageType, encryptedBody); const decryptedBody = session.decrypt(messageType, encryptedBody);
const message = JSON.parse(decryptedBody); const message = JSON.parse(decryptedBody);
console.debug("[Socket] Decrypted message:", message); console.debug("[Socket]: Decrypted message:", message);
// Save session state after decryption // Save session state after decryption
await saveSessionState(session, currentUserId, fromUserId); await saveSessionState(session, currentUserId, fromUserId);
@ -109,14 +109,14 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
// Validate with ZOD // Validate with ZOD
const validatedMessage = MESSAGE_SCHEMA.safeParse(message); const validatedMessage = MESSAGE_SCHEMA.safeParse(message);
if (!validatedMessage.success) { if (!validatedMessage.success) {
console.error("[Socket] Invalid message:", validatedMessage.error); console.error("[Socket]: Invalid message:", validatedMessage.error);
throw new Error("Invalid message format"); throw new Error("Invalid message format");
} }
// Store message and increment unread count // Store message and increment unread count
await storeMessage(validatedMessage.data as SiPher.Messages.ClientEncrypted.EncryptedMessage & { to: string }); await storeMessage(validatedMessage.data as SiPher.Messages.ClientEncrypted.EncryptedMessage & { to: string });
await incrementUnread(validatedMessage.data.channelId); await incrementUnread(validatedMessage.data.channelId);
console.debug("[Socket] Message stored successfully"); console.debug("[Socket]: Message stored successfully");
}, [saveSessionState]); }, [saveSessionState]);
// Manual disconnect function // Manual disconnect function
@ -141,7 +141,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
const sendMessage = useCallback((message: { type: 0 | 1; body: string }, to: string) => { const sendMessage = useCallback((message: { type: 0 | 1; body: string }, to: string) => {
if (!socketRef.current) { if (!socketRef.current) {
console.warn("⚠️ Cannot send message: Socket not connected"); console.warn("[Socket]: Cannot send message due to socket not being connected");
return; return;
} }
@ -152,84 +152,91 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
}, []); }, []);
// Define message processor that can be called from both socket handler and queue processor // Define message processor that can be called from both socket handler and queue processor
const processIncomingDM = useCallback(async (data: { content: { type: 0 | 1; body: unknown }, participants: string[] }) => { const processIncomingDM = useCallback(
// Get the current user id async (data: { content: { type: 0 | 1; body: unknown }, participants: string[] }) => {
const currentUserId = user.id; // Get the current user id
if (!currentUserId) { const currentUserId = user.id;
console.error("[Socket] No user ID available"); if (!currentUserId) {
return; console.error("[Socket]: No user ID available");
} return;
} else if (
// Extract sender from participants data.participants.length !== 2 ||
const fromUserId = data.participants.find((participant) => participant !== currentUserId); data.participants.some((participant) => participant === currentUserId)
if (!fromUserId) { ) {
console.error("[Socket] Could not determine sender from participants"); console.error("[Socket]: Invalid DM data, participants:", data.participants, "currentUserId:", currentUserId);
return;
}
// Fetch participant details
try {
const participantDetails = await convex.query(api.auth.getParticipantDetails, {
participantIds: [fromUserId]
});
const fromUser = participantDetails?.[0];
if (!fromUser) {
console.error("[Socket] Failed to get from user");
return; return;
} }
const { type, body } = data.content; // Extract sender from participants
const fromUserId = data.participants.find((participant) => participant !== currentUserId);
switch (type) { if (!fromUserId) {
case 0: { console.error("[Socket]: Could not determine sender from participants");
console.debug("[Socket] Received inbound message from pre-key message"); return;
const session = await createInboundSession(fromUserId, body as string);
if (!session) {
console.error("[Socket] Failed to create inbound session");
return;
}
// Now we can create or open the DM channel
await getOrCreateDmChannel(currentUserId, fromUser);
// Decrypt, validate, and store using helper
await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId);
break;
}
case 1: {
console.debug("[Socket] Received regular message");
// Get existing session from cache
const session = sessions.get(fromUserId);
if (!session) {
console.error("[Socket] No session found for sender. This shouldn't happen!");
return;
}
// Decrypt, validate, and store using helper
await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId);
break;
}
} }
} catch (error) {
console.error("[Socket] Error handling incoming DM:", error); // Fetch participant details
} try {
}, [user.id, createInboundSession, sessions, decryptAndStoreMessage]); const participantDetails = await convex.query(api.auth.getParticipantDetails, {
participantIds: [fromUserId]
});
const fromUser = participantDetails?.[0];
if (!fromUser) {
console.error("[Socket]: Failed to get from user");
return;
}
const { type, body } = data.content;
switch (type) {
case 0: {
console.debug("[Socket]: Received inbound message from pre-key message");
const session = await createInboundSession(fromUserId, body as string);
if (!session) {
console.error("[Socket]: Failed to create inbound session");
return;
}
// Now we can create or open the DM channel
await getOrCreateDmChannel(currentUserId, fromUser);
// Decrypt, validate, and store using helper
await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId);
break;
}
case 1: {
console.debug("[Socket]: Received regular message");
// Get existing session from cache
const session = sessions.get(fromUserId);
if (!session) {
console.error("[Socket]: No session found for sender. This shouldn't happen!");
return;
}
// Decrypt, validate, and store using helper
await decryptAndStoreMessage(session, type, body as string, currentUserId, fromUserId);
break;
}
}
} catch (error) {
console.error("[Socket]: Error handling incoming DM:", error);
}
}, [user.id, createInboundSession, sessions, decryptAndStoreMessage]);
// Process queued messages when OLM becomes ready // Process queued messages when OLM becomes ready
useEffect(() => { useEffect(() => {
if (!olmAccount || !olmIsReady || messageQueueRef.current.length === 0) return; if (!olmAccount || !olmIsReady || messageQueueRef.current.length === 0) return;
console.log(`[Socket] OLM is now ready, processing ${messageQueueRef.current.length} queued messages`); console.log(`[Socket]: OLM is now ready, processing ${messageQueueRef.current.length} queued messages`);
const processQueue = async () => { const processQueue = async () => {
const queue = [...messageQueueRef.current]; const queue = [...messageQueueRef.current];
messageQueueRef.current = []; // Clear queue messageQueueRef.current = []; // Clear queue
for (const data of queue) { for (const data of queue) {
console.log("[Socket] Processing queued message:", data); console.log("[Socket]: Processing queued message:", data);
await processIncomingDM(data); await processIncomingDM(data);
} }
}; };
@ -255,7 +262,7 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
// Use acknowledgment callback for reliable latency measurement // Use acknowledgment callback for reliable latency measurement
socket.timeout(5000).emit("ping", (err: Error, serverTimestamp: number) => { socket.timeout(5000).emit("ping", (err: Error, serverTimestamp: number) => {
if (err) { if (err) {
console.warn("[Socket] Ping timeout or error:", err); console.warn("[Socket]: Ping timeout or error:", err);
updateSocketInfo({ ping: null }); updateSocketInfo({ ping: null });
return; return;
} }
@ -372,12 +379,13 @@ export function SocketProvider({ children, user, refetchUser }: SocketProviderPr
// Check if OLM account is loaded // Check if OLM account is loaded
if (!olmAccount) { if (!olmAccount) {
console.warn("[Socket] OLM account not loaded yet, queueing message for later processing"); console.warn("[Socket]: OLM account not loaded yet, queueing message for later processing");
messageQueueRef.current.push(data); messageQueueRef.current.push(data);
return; return;
} }
// Process immediately if OLM is ready // Process immediately if OLM is ready
console.debug("[Socket]: Processing incoming DM immediately:", data);
await processIncomingDM(data); await processIncomingDM(data);
}); });

View file

@ -48,15 +48,31 @@ const dmEvent: SiPher.EventsType = {
// Join sender to the DM room // Join sender to the DM room
socket.join(roomId); socket.join(roomId);
// Message parser:
// 08/01/2026: I bwoke it :3 - Cete
const message: {
type: 0 | 1;
body: string;
} = typeof content === "string" ? JSON.parse(content) : content;
// Send to the DM room (for users already in the room) // Send to the DM room (for users already in the room)
io.to(roomId).emit("dm:message", JSON.parse(content)); io.to(roomId).emit("dm:message", message);
// Also send directly to recipient's socket (socket.id = user.id) // Also send directly to recipient's socket (socket.id = user.id)
// This ensures they receive the message even if not in the DM room yet // This ensures they receive the message even if not in the DM room yet
io.to(to).emit("dm:new", { const dmData = {
content: JSON.parse(content), content: message,
participants: [sender.id, to].sort(), participants: [sender.id, to].sort(),
}); };
// Before sending, check if the participant ids are not the same (This is happening)
if (sender.id === to) {
socket.emit("error", { message: "Cannot send DM to yourself" });
console.error("[DM] Cannot send DM to yourself: ", sender.id, "→", to);
return;
}
io.to(to).emit("dm:new", dmData);
console.log(`[DM] ${sender.id}${to} in room ${roomId}`); console.log(`[DM] ${sender.id}${to} in room ${roomId}`);
}, },