From 1d619b9d2a36fdb38062c1c320db701c44a1fd20 Mon Sep 17 00:00:00 2001 From: Nixyan Date: Thu, 26 Mar 2026 11:09:31 -0300 Subject: [PATCH] feat: enhance federation functionality and improve documentation - Added a new proxy route to relay encrypted federation traffic between servers, allowing for better communication in restricted environments. - Implemented health check mechanisms for server registration, including tracking health status and scheduling health checks. - Updated the database schema to include health check attempts and unhealthy reasons for servers. - Enhanced the federation fetch logic to handle errors more gracefully and support proxying requests. - Improved README documentation with a new section explaining public/private data handling and added links to mirrors. - Refactored existing code for better organization and clarity, including updates to various federation-related modules. #3 This should all be tested throughly, the workers are messy and poluted, a rework is needed and should be prioritized. They work, but the code is poorly documented and there is no proper testing of the workers, some of them run twice and there are major issues on them. --- README.md | 14 + src/app/PostTestForm.tsx | 23 + src/app/discover/route.ts | 23 +- src/app/page.tsx | 7 +- src/app/proxy/route.ts | 418 ++++++++++++ src/lib/auth.ts | 7 + src/lib/bull/index.ts | 271 ++++++-- src/lib/db/schema/index.ts | 16 +- src/lib/federation/fetch.ts | 301 +++++++++ src/lib/federation/peer-registry-url.ts | 19 + .../proxy-helpers/federated-post.ts | 94 +++ src/lib/federation/registry.ts | 56 +- src/lib/federation/threat-model.ts | 57 ++ src/lib/federation/url-guard.ts | 26 +- src/lib/plugins/server/federation.ts | 11 + .../helpers/social/endpoints/follows.ts | 44 +- .../server/helpers/social/endpoints/posts.ts | 201 ++++-- .../plugins/server/helpers/social/social.ts | 14 +- src/lib/zod/EncryptedEnvelope.ts | 40 ++ src/lib/zod/methods/FollowSchema.ts | 18 + src/lib/zod/methods/PostFederationSchema.ts | 18 + tests/helpers/queue.ts | 50 ++ tests/proxies/follow.ts | 596 ++++++++++++++++++ tests/proxies/post.ts | 409 ++++++++++++ tsconfig.json | 89 +-- 25 files changed, 2637 insertions(+), 185 deletions(-) create mode 100644 src/app/proxy/route.ts create mode 100644 src/lib/federation/fetch.ts create mode 100644 src/lib/federation/peer-registry-url.ts create mode 100644 src/lib/federation/proxy-helpers/federated-post.ts create mode 100644 src/lib/federation/threat-model.ts create mode 100644 src/lib/zod/EncryptedEnvelope.ts create mode 100644 src/lib/zod/methods/FollowSchema.ts create mode 100644 src/lib/zod/methods/PostFederationSchema.ts create mode 100644 tests/helpers/queue.ts create mode 100644 tests/proxies/follow.ts create mode 100644 tests/proxies/post.ts diff --git a/README.md b/README.md index ba5ab46..e0ec30f 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,22 @@ bun run rotateKeys.ts --resume '' ## Mirrors [Gitea](https://git.tockanest.com/Cete/sipher) + [GitHub](https://github.com/tockawaffle/sipher) +## What is public/private? + +### Public + +There are things that won't be e2ee because there's simply no reason for that to be done, this is a small list of the current (not finished) public data that other federations or even users might fetch and get all of the data: + +- Posts: +- - The whole post object is public, that includes: +- - - The content, including images, videos or audios if any +- - - Who posted it +- - - The federation that has that data + + ## Security SiPher implements custom federation and cryptographic protocols. I am not a professional cryptographer or security researcher — this system has not been audited and almost certainly contains multiple vulnerabilities I am not aware of. diff --git a/src/app/PostTestForm.tsx b/src/app/PostTestForm.tsx index 6fcbc8f..9965c2d 100644 --- a/src/app/PostTestForm.tsx +++ b/src/app/PostTestForm.tsx @@ -1,5 +1,6 @@ "use client"; +import { Button } from "@/components/ui/button"; import { authClient } from "@/lib/auth-client"; import { useState } from "react"; @@ -33,6 +34,25 @@ export function PostTestForm() { } }; + const body = JSON.stringify({ + method: "REGISTER", + url: process.env.BETTER_AUTH_URL!, + publicKey: process.env.FEDERATION_PUBLIC_KEY!, + encryptionPublicKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, + }); + + async function forceDiscover(url: string) { + console.log("body", body); + const response = await fetch(`${url}/discover`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: body, + }); + return response.json(); + } + return (

Test Post

@@ -85,6 +105,9 @@ export function PostTestForm() { {status} )} + +
+ ); } diff --git a/src/app/discover/route.ts b/src/app/discover/route.ts index d513cbe..eb49949 100644 --- a/src/app/discover/route.ts +++ b/src/app/discover/route.ts @@ -1,5 +1,6 @@ import db from "@/lib/db"; import { serverRegistry } from "@/lib/db/schema"; +import { FederationError, federationFetch } from "@/lib/federation/fetch"; import { decryptPayload, fingerprintKey } from "@/lib/federation/keytools"; import { upsertServer } from "@/lib/federation/registry"; import { assertSafeUrl, UrlGuardError } from "@/lib/federation/url-guard"; @@ -113,10 +114,16 @@ async function discoverServer(validated: z.infer) { if (server[0].publicKey === validated.publicKey) confirmations.sameKeyOnServer = true; debug("DISCOVER – fetching public key from federation server %s", server[0].url); try { - const federationResponse = await (await fetch(server[0].url + "/discover")).json(); + const { response } = await federationFetch(server[0].url + "/discover", { + serverUrl: server[0].url, + }); + const federationResponse = await response.json(); if (federationResponse.publicKey === validated.publicKey) confirmations.sameKeyOnFetch = true; } catch (err) { debug("DISCOVER – fetch to %s failed: %o", server[0].url, err); + if (err instanceof FederationError) { + return NextResponse.json({ error: "Failed to reach the federation server", code: err.code }, { status: 502 }); + } return NextResponse.json({ error: "Failed to reach the federation server" }, { status: 502 }); } @@ -136,18 +143,24 @@ async function registerServer(validated: z.infer) { } debug("REGISTER – fetching /discover from %s to validate server", validated.url); - let response: { publicKey?: string; encryptionPublicKey?: string }; + let remoteKeys: { publicKey?: string; encryptionPublicKey?: string }; try { - response = await (await fetch(validated.url + "/discover")).json(); + const { response } = await federationFetch(validated.url + "/discover", { + serverUrl: validated.url, + }); + remoteKeys = await response.json(); } catch (err) { debug("REGISTER – fetch to %s failed: %o", validated.url, err); + if (err instanceof FederationError) { + return NextResponse.json({ error: "Failed to reach the server", code: err.code }, { status: 502 }); + } return NextResponse.json({ error: "Failed to reach the server" }, { status: 502 }); } - if (!response.publicKey || !response.encryptionPublicKey) { + if (!remoteKeys.publicKey || !remoteKeys.encryptionPublicKey) { debug("REGISTER – remote server returned incomplete keys"); return NextResponse.json({ error: "Invalid server" }, { status: 400 }); - } else if (response.publicKey !== validated.publicKey || response.encryptionPublicKey !== validated.encryptionPublicKey) { + } else if (remoteKeys.publicKey !== validated.publicKey || remoteKeys.encryptionPublicKey !== validated.encryptionPublicKey) { debug("REGISTER – key mismatch: provided vs fetched"); return NextResponse.json({ error: "Public keys do not match the ones reported by the server" }, { status: 400 }); } diff --git a/src/app/page.tsx b/src/app/page.tsx index fa01183..5a05f3e 100644 --- a/src/app/page.tsx +++ b/src/app/page.tsx @@ -12,7 +12,12 @@ export default async function Home() { }); if (!session) redirect(`/auth`); + + return ( - + <> + + + ); } diff --git a/src/app/proxy/route.ts b/src/app/proxy/route.ts new file mode 100644 index 0000000..b87cc78 --- /dev/null +++ b/src/app/proxy/route.ts @@ -0,0 +1,418 @@ +import db from "@/lib/db"; +import { blacklistedServers, follows, serverRegistry, user } from "@/lib/db/schema"; +import { FederationError, federationFetch } from "@/lib/federation/fetch"; +import { decryptPayload, encryptPayload, getOwnEncryptionSecretKey, getOwnSigningSecretKey, signMessage, verifySignature } from "@/lib/federation/keytools"; +import { peerRegistryUrlOrNull } from "@/lib/federation/peer-registry-url"; +import { applyFederatedPostInTransaction } from "@/lib/federation/proxy-helpers/federated-post"; +import { discoverAndRegister } from "@/lib/federation/registry"; +import { EncryptedEnvelopeBaseSchema } from "@/lib/zod/EncryptedEnvelope"; +import { FollowEnvelopeSchema } from "@/lib/zod/methods/FollowSchema"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; +import createDebug from "debug"; +import { and, eq } from "drizzle-orm"; +import { NextRequest, NextResponse } from "next/server"; +import { z } from "zod"; + +const debug = createDebug("app:api:federation:proxy"); + +// Proxy route: relays encrypted federation traffic when two servers can't reach each other directly +// (e.g. network restrictions, ISP blocking). +// +// Any federation node can act as a proxy as long as both the requesting and target federations +// have discovered and registered it (same mutual-trust model as the follow endpoint). +// +// What the proxy knows: +// - The target federation's base URL (passed in plaintext so the proxy can forward). +// - That Federation A is trying to communicate with Federation B. +// +// What the proxy does NOT know: +// - The contents of the request (method, path, payload, etc.) — all of that lives inside an +// encrypted envelope that only Federation B can decrypt. +// +// Flow: +// 1. Federation A encrypts the full request (method, path, payload) into an envelope using +// Federation B's encryption public key, and signs it with its own signing key. +// 2. Federation A sends the encrypted envelope + target URL to the proxy. +// 3. The proxy forwards the envelope to Federation B's proxy endpoint. +// 4. Federation B decrypts, validates the signature, and processes the request. +// 5. Federation B encrypts its response using Federation A's encryption public key. +// 6. The encrypted response travels back: Federation B -> Proxy -> Federation A. +// +// If the request is of PROXY type and the target fails, the proxy will return a 502 error in which the first server +// should then either retry the request later or proxy it to a different server. +// If the target does not know the sender, it'll error with a 403 error and a "UNKNOWN_FEDERATION_SERVER_INTERACTION" code. + +const ProxiedDataSchema = z.discriminatedUnion("method", [ + z.object({ + method: z.literal("PROXY"), + targetUrl: z.url().refine((url) => { + // Check if the URL has the proxy path + const parsedUrl = new URL(url); + return parsedUrl.pathname.startsWith("/proxy"); + }, { message: "The target URL must have the proxy path" }), // Federation B's base URL, + publicSigningKey: z.string().optional().nullable(), // Federation A's signing public key + publicEncryptionKey: z.string().optional().nullable(), // Federation A's encryption public key + payload: EncryptedEnvelopeBaseSchema // Opaque — proxy cannot decrypt + }), + z.object({ + method: z.literal("TARGETED"), + payload: EncryptedEnvelopeBaseSchema // TODO: swap for createEncryptedEnvelopeSchema(TargetedPayloadSchema) once the inner schema is defined + }) +]) + +type ERROR_CODE = "MISSING_FED_ORIGIN_HEADER" | "UNKNOWN_FEDERATION_SERVER_INTERACTION" | "INCORRECT_KEYS" | "INVALID_PROXY_DATA"; + +// TARGETED: This federation is the target of a proxy request +// PROXY: This federation is the proxy for another federation +type PROXY_METHOD = "TARGETED" | "PROXY" | "PROXY_RESPONSE"; + +type PostsActions = "GET_USER_POSTS" | "FEDERATE_POST" | "GET_POST_BY_ID" | "GET_POST_COMMENTS" | "FEDERATE_POST_COMMENT" +type UserActions = "FEDERATE_FOLLOW" | "FEDERATE_UNFOLLOW" | "GET_USER_PROFILE" | "BLOCK_USER" | "UNBLOCK_USER" | "GET_USER_FOLLOWERS" | "GET_USER_FOLLOWING" + +type Actions = PostsActions | UserActions; + +export async function POST(request: NextRequest) { + const getFedUrl = request.headers.get("x-federation-origin"); + if (!getFedUrl) { + debug("Missing x-federation-origin header from %s", request.url); + return NextResponse.json({ error: "Missing x-federation-origin header", code: "MISSING_FED_ORIGIN_HEADER" }, { status: 400 }); + } + + const data = await request.clone().json(); + const parsed = ProxiedDataSchema.safeParse(data); + if (!parsed.success) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, parsed.error.message); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + + switch (parsed.data.method) { + case "PROXY": { + try { + + if (!parsed.data.publicSigningKey || !parsed.data.publicEncryptionKey) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, "Missing public signing or encryption key"); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + + const proxiedData = parsed.data; + + // Verify Federation A (sender) is known and keys match + const [sender] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, getFedUrl)); + + if (!sender) { + debug("POST /proxy – sender not found in registry: %s", getFedUrl); + return NextResponse.json({ + error: "Unknown federation server. Please redo the discovery process and try again.", + code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", + }, { status: 403 }); + } else if (sender.publicKey !== proxiedData.publicSigningKey) { + debug("POST /proxy – sender signing key mismatch: %s", getFedUrl); + return NextResponse.json({ + error: "The provided keys are a mismatch. If you rotated your keys, we are not aware of it.", + code: "INCORRECT_KEYS", + }, { status: 403 }); + } else if (sender.encryptionPublicKey !== proxiedData.publicEncryptionKey) { + debug("POST /proxy – sender encryption key mismatch: %s", getFedUrl); + return NextResponse.json({ + error: "The provided keys are a mismatch. If you rotated your keys, we are not aware of it.", + code: "INCORRECT_KEYS", + }, { status: 403 }); + } + + // Verify Federation B (target) is known to us (prevents open-relay abuse) + const targetBaseUrl = new URL(proxiedData.targetUrl.toString()).origin; + const [target] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetBaseUrl)); + + if (!target) { + debug("POST /proxy – target not found in registry: %s", targetBaseUrl); + debug("POST /proxy - Starting discovery process") + await discoverAndRegister(targetBaseUrl); + } + + // Proxy the request to Federation B as a TARGETED request (no proxy fallback — we ARE the proxy) + let forwardResponse: Response; + try { + const result = await federationFetch(proxiedData.targetUrl.toString(), { + method: "POST", + body: JSON.stringify({ + method: "TARGETED" as PROXY_METHOD, + payload: proxiedData.payload, + }), + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": process.env.BETTER_AUTH_URL!, + "Origin": process.env.BETTER_AUTH_URL!, + "X-Federation-Sender": getFedUrl, + }, + serverUrl: targetBaseUrl, + proxyFallback: false, + skipHealthUpdate: true, + }); + forwardResponse = result.response; + } catch (err) { + if (err instanceof FederationError) { + debug("POST /proxy – federation error proxying to %s: %s", proxiedData.targetUrl.toString(), err.code); + return NextResponse.json({ error: "Failed to proxy request", code: "FAILED_TO_PROXY_REQUEST", federationError: err.code, method: "PROXY_RESPONSE" as PROXY_METHOD }, { status: 502 }); + } + throw err; + } + + if (!forwardResponse.ok) { + debug("POST /proxy – error proxying request to %s: %s", proxiedData.targetUrl.toString(), forwardResponse.statusText); + return NextResponse.json({ error: "Failed to proxy request", code: "FAILED_TO_PROXY_REQUEST", details: await forwardResponse.json(), method: "PROXY_RESPONSE" as PROXY_METHOD }, { status: 502 }); + } + + const responseBody = await forwardResponse.json(); + + // Return the response from Federation B as a PROXY_RESPONSE + return NextResponse.json({ + method: "PROXY_RESPONSE" as PROXY_METHOD, + payload: responseBody, + }); + + } catch (error) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, error); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + } + case "TARGETED": { + try { + // 🚨 we've been targeted, the 🧃 are coming, everyone to the bunkers! 🚨 + + // We need to use the EncryptedEnvelopeBaseSchema here because we do not know what we are being targeted for + // This is the information we'll have at the end of the day: + // - The requester's url + // - The requester's public signing key + // - The requester's public encryption key + // - The request data, being the method, path, and payload + + if (!parsed.data.payload) { + debug("POST /proxy – error parsing targeted data from %s: %s", request.url, "Missing payload"); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_TARGETED_DATA" }, { status: 400 }); + } + + const decryptedPayload = decryptPayload(parsed.data.payload, getOwnEncryptionSecretKey()); + const parsedPayload = JSON.parse(decryptedPayload); + + debug("POST /proxy – parsed targeted data from %s: %o", request.url, parsedPayload); + + const payloadSchema = z.object({ + targetUrl: z.url(), + method: z.string(), + headers: z.record(z.string(), z.string()), + body: z.string().transform((body) => { + const parsedBody = JSON.parse(body); + return { + method: parsedBody.method, + payload: parsedBody.payload, + signature: parsedBody.signature, + }; + }) + }).superRefine((data, ctx) => { + try { + const originPayloadHeaders = data.headers; + debug("POST /proxy – origin payload headers: %o", originPayloadHeaders); + if (!originPayloadHeaders["X-Federation-Target"] || !originPayloadHeaders["X-Federation-Origin"] || !originPayloadHeaders["Origin"]) { + ctx.addIssue({ code: "custom", message: "Missing headers" }); + return z.NEVER; + } + + // Should be the base URL of the target URL + const targetUrl = new URL(data.targetUrl).origin; + const federationTargetOriginHeader = new URL(originPayloadHeaders["X-Federation-Target"]).origin; + debug("POST /proxy – target URL: %s", targetUrl); + debug("POST /proxy – x-federation-target header: %s", federationTargetOriginHeader); + if (federationTargetOriginHeader !== targetUrl) { + ctx.addIssue({ code: "custom", message: "x-federation-target header mismatch" }); + return z.NEVER; + } + } catch (error) { + ctx.addIssue({ code: "custom", message: "Decryption failed" }); + return z.NEVER; + } + }); + + const validated = payloadSchema.safeParse(parsedPayload); + if (!validated.success) { + debug("POST /proxy – error validating targeted data from %s: %s", request.url, validated.error.message); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_TARGETED_DATA" }, { status: 400 }); + } + + const { targetUrl, method, headers, body } = validated.data; + + // Check if the sender is known, keys match and is not blackisted + const result = await db.transaction(async (tx) => { + + const senderUrl = headers["X-Federation-Origin"]; + // Check if the sender is blacklisted + const [blacklisted] = await tx.select().from(blacklistedServers).where(eq(blacklistedServers.serverUrl, senderUrl)); + if (blacklisted) { + debug("POST /proxy – sender is blacklisted: %s", senderUrl); + return { error: "The federation server was blacklisted from interacting with this federation server. Please contact support to unblacklist your server.", code: "BLACKLISTED_FEDERATION_SERVER", action: undefined, status: 403 }; + } + + // Check if the sender is known + const [sender] = await tx.select().from(serverRegistry).where(eq(serverRegistry.url, senderUrl)); + if (!sender) { + debug("POST /proxy – sender not found in registry: %s", senderUrl); + return { error: "Unknown federation server. Please redo the discovery process and try again.", code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", action: undefined, status: 403 }; + } + + let consolidatedFollowPayload: z.infer | null = null; + let consolidatedPostPayload: z.infer | null = null; + let action: Actions; + switch (true) { + case targetUrl.includes("/api/auth/social/follows") && body.method === "FEDERATE": { + debug("POST /proxy – parsing follow payload: %s", body.payload); + const payload = FollowEnvelopeSchema.safeParse(body.payload); + if (!payload.success) { + debug("POST /proxy – error parsing follow payload: %s", body.payload); + return { error: "Invalid follow payload", code: "INVALID_FOLLOW_PAYLOAD", action: undefined, status: 400 }; + } + consolidatedFollowPayload = payload.data; + action = "FEDERATE_FOLLOW"; + break; + } + case targetUrl.includes("/api/auth/social/posts") && body.method === "FEDERATE_POST": { + debug("POST /proxy – parsing federated post payload"); + const payload = PostEnvelopeSchema.safeParse(body.payload); + if (!payload.success) { + debug("POST /proxy – error parsing federated post payload: %s", payload.error.message); + return { error: "Invalid federated post payload", code: "INVALID_FEDERATED_POST_PAYLOAD", action: undefined, status: 400 }; + } + consolidatedPostPayload = payload.data; + action = "FEDERATE_POST"; + break; + } + default: { + debug("POST /proxy – no endpoint specific parsing, rejecting request"); + return { error: "Invalid payload", code: "INVALID_PAYLOAD", action: undefined, status: 400 }; + } + } + + const signedEnvelope = consolidatedFollowPayload ?? consolidatedPostPayload; + if (!signedEnvelope) { + return { error: "Invalid payload", code: "INVALID_PAYLOAD", action: undefined, status: 400 }; + } + + // Check if the signature is valid + const senderPublicKey = new Uint8Array(Buffer.from(sender.publicKey, "base64")); + const senderEncryptionPublicKey = new Uint8Array(Buffer.from(sender.encryptionPublicKey, "base64")); + if (!verifySignature(signedEnvelope._raw, body.signature, senderPublicKey)) { + debug("POST /proxy – sender signature is invalid: %s", targetUrl); + return { error: "The provided signature is invalid. Please redo the discovery process and try again.", code: "INVALID_SIGNATURE", action: undefined, status: 403 }; + } + + debug("POST /proxy – sender is known, keys match and is not blackisted: %s", targetUrl); + + // Now we can assume that: + // - The sender is known to us + // - The sender is not blacklisted + // - The signature is valid with what we have in the payload + // - The payload is a valid action and has a valid payload + // - There is a known endpoint for the action + // Now the only thing left is to handle the action. This cannot be done in a worker since we need to return a response to the proxy server. This could eventually overload this endpoint and cause issues, but it's not something I can fix right now. + + switch (action) { + case "FEDERATE_FOLLOW": { + const followEnv = consolidatedFollowPayload!; + debug("POST /proxy – federating follow: %s", followEnv); + + // We can do the follow procedure + // First check if the user exists + const [targetUser] = await tx.select().from(user).where(eq(user.id, followEnv.following.followingId)); + if (!targetUser) { + debug("POST /proxy – target user not found: %s", followEnv.following.followingId); + return { error: "The user you are trying to follow does not exist.", code: "USER_NOT_FOUND", status: 404 }; + } + + // Second check if the follow already exists + const [existingFollow] = await tx.select().from(follows).where(and( + eq(follows.followerId, followEnv.following.followerId), + eq(follows.followingId, followEnv.following.followingId), + )); + + if (existingFollow) { + debug("POST /proxy – follow already exists: %s", existingFollow.id); + return { error: "You are already following this user.", code: "FOLLOW_ALREADY_EXISTS", status: 409 }; + } + + // Third check if the user is private + const isPrivate = !targetUser.isPrivate; + + const following = await tx.insert(follows).values({ + id: crypto.randomUUID(), + followerId: followEnv.following.followerId, + followingId: followEnv.following.followingId, + accepted: isPrivate, + createdAt: new Date(), + followerServerUrl: peerRegistryUrlOrNull(senderUrl), + followingServerUrl: peerRegistryUrlOrNull(targetUrl), + }).returning(); + + const row = following[0]; + // Same plaintext shape as the delivery job payload / FollowInnerPayloadSchema (see federation worker). + const innerPayload = JSON.stringify({ + following: { + id: row.id, + createdAt: row.createdAt, + followerId: row.followerId, + followingId: row.followingId, + accepted: row.accepted, + followerServerUrl: row.followerServerUrl, + }, + federationUrl: senderUrl, + method: "FEDERATE" as const, + }); + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + return { innerPayload, signature, senderEncryptionPublicKey }; + } + case "FEDERATE_POST": { + const postEnv = consolidatedPostPayload!; + const postResult = await applyFederatedPostInTransaction(tx, postEnv, body.signature, { + publicKey: sender.publicKey, + encryptionPublicKey: sender.encryptionPublicKey, + url: sender.url, + }); + if (!postResult.ok) { + return { + error: postResult.error, + code: postResult.code, + action: undefined, + status: postResult.status, + }; + } + const encKey = new Uint8Array(Buffer.from(postResult.senderEncryptionPublicKeyB64, "base64")); + return { + innerPayload: postResult.innerPayload, + signature: postResult.signature, + senderEncryptionPublicKey: encKey, + }; + } + default: { + debug("POST /proxy – no action specific handling, rejecting request"); + return { error: "Invalid action", code: "INVALID_ACTION", action: undefined, status: 400 }; + } + } + + }); + + if (result.error) { + return NextResponse.json({ error: result.error, code: result.code, action: result.action, status: result.status }, { status: result.status }); + } + + return NextResponse.json({ + method: "PROXY_RESPONSE" as PROXY_METHOD, + status: "acknowledged", + data: encryptPayload(result.innerPayload!, result.senderEncryptionPublicKey!), + signature: result.signature, + }, { status: 200 }); + + } catch (error) { + debug("POST /proxy – error parsing targeted data from %s: %s", request.url, error); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + } + } +} \ No newline at end of file diff --git a/src/lib/auth.ts b/src/lib/auth.ts index 2fbe8ed..162ff42 100644 --- a/src/lib/auth.ts +++ b/src/lib/auth.ts @@ -102,6 +102,13 @@ const bAuth = betterAuth({ defaultValue: false, required: false, index: false, + }, + postPropagationPolicy: { + type: "string", + defaultValue: "all", + required: false, + index: false, + enum: ["all", "followers", "none"] as const, } } } diff --git a/src/lib/bull/index.ts b/src/lib/bull/index.ts index 8fe6caa..93489cd 100644 --- a/src/lib/bull/index.ts +++ b/src/lib/bull/index.ts @@ -1,14 +1,30 @@ import db from '@/lib/db'; import { blacklistedServers, deliveryJobs, follows, serverRegistry } from '@/lib/db/schema'; -import { encryptPayload, getOwnSigningSecretKey, signMessage } from '@/lib/federation/keytools'; -import { discoverAndRegister, DiscoveryError } from '@/lib/federation/registry'; +import { FederationError, federationFetch, type FederationErrorCode } from '@/lib/federation/fetch'; +import { encryptPayload, getOwnSigningSecretKey, signMessage, verifySignature } from '@/lib/federation/keytools'; +import { discoverAndRegister, DiscoveryError, markServerHealthy } from '@/lib/federation/registry'; +import { getThreatPolicy } from '@/lib/federation/threat-model'; import { Queue, UnrecoverableError, Worker, type Job } from 'bullmq'; import createDebug from 'debug'; -import { eq } from 'drizzle-orm'; +import { and, eq } from 'drizzle-orm'; import Redis from 'ioredis'; +import z from 'zod'; +import { FollowEnvelopeSchema } from '../zod/methods/FollowSchema'; const debug = createDebug('app:federation:worker'); +// --------------------------------------------------------------------------- +// Shared Redis +// --------------------------------------------------------------------------- + +function createRedisConnection() { + return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }); +} + +// --------------------------------------------------------------------------- +// Federation delivery queue (existing) +// --------------------------------------------------------------------------- + export interface FederationDeliveryJob { deliveryJobId: string; targetUrl: string; @@ -16,17 +32,13 @@ export interface FederationDeliveryJob { payload: string; } -const QUEUE_NAME = 'federation-delivery'; +const DELIVERY_QUEUE_NAME = 'federation-delivery'; -function createRedisConnection() { - return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }); -} - -let _queue: Queue | null = null; +let _deliveryQueue: Queue | null = null; export function getFederationQueue(): Queue { - if (!_queue) { - _queue = new Queue(QUEUE_NAME, { + if (!_deliveryQueue) { + _deliveryQueue = new Queue(DELIVERY_QUEUE_NAME, { connection: createRedisConnection() as never, defaultJobOptions: { attempts: 5, @@ -39,9 +51,48 @@ export function getFederationQueue(): Queue { }, }); } - return _queue; + return _deliveryQueue; } +// --------------------------------------------------------------------------- +// Health-check queue +// --------------------------------------------------------------------------- + +export interface HealthCheckJob { + serverUrl: string; +} + +const HEALTH_CHECK_QUEUE_NAME = 'federation-health-check'; + +let _healthCheckQueue: Queue | null = null; + +export function getHealthCheckQueue(): Queue { + if (!_healthCheckQueue) { + _healthCheckQueue = new Queue(HEALTH_CHECK_QUEUE_NAME, { + connection: createRedisConnection() as never, + }); + } + return _healthCheckQueue; +} + +export async function scheduleHealthCheck(serverUrl: string, attempt: number): Promise { + const delayMinutes = 5 + (attempt * 10); + const delayMs = delayMinutes * 60 * 1000; + debug('scheduling health check for %s in %d minutes (attempt %d)', serverUrl, delayMinutes, attempt); + + const safeId = serverUrl.replace(/[^a-zA-Z0-9._-]/g, '_'); + await getHealthCheckQueue().add('health-check', { serverUrl }, { + delay: delayMs, + jobId: `health-check_${safeId}_${attempt}`, + removeOnComplete: true, + removeOnFail: true, + }); +} + +// --------------------------------------------------------------------------- +// Delivery worker processor +// --------------------------------------------------------------------------- + async function processFederationDelivery(job: Job) { const { deliveryJobId, targetUrl, serverUrl, payload } = job.data; debug('processing job %s (%s) → %s (attempt %d)', job.id, job.name, targetUrl, job.attemptsMade + 1); @@ -61,7 +112,7 @@ async function processFederationDelivery(job: Job) { let encryptionPublicKey: string; const [server] = await db - .select({ encryptionPublicKey: serverRegistry.encryptionPublicKey }) + .select({ encryptionPublicKey: serverRegistry.encryptionPublicKey, publicKey: serverRegistry.publicKey }) .from(serverRegistry) .where(eq(serverRegistry.url, serverUrl)) .limit(1); @@ -93,7 +144,7 @@ async function processFederationDelivery(job: Job) { debug('sending encrypted payload to %s', targetUrl); const method = JSON.parse(payload).method; - if (!method || !["FEDERATE", "INSERT", "UNFOLLOW"].includes(method)) { + if (!method || !["FEDERATE", "FEDERATE_POST", "INSERT", "UNFOLLOW"].includes(method)) { debug('invalid method: %s, dropping job %s', method, job.id); await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); debug('job %s dropped because of invalid method', job.id); @@ -102,11 +153,18 @@ async function processFederationDelivery(job: Job) { const signature = signMessage(payload, getOwnSigningSecretKey()); - const response = await fetch(targetUrl, { + const { response } = await federationFetch(targetUrl, { method: 'POST', - headers: { 'Content-Type': 'application/json', 'Origin': process.env.BETTER_AUTH_URL! }, + headers: { + 'Content-Type': 'application/json', + 'Origin': process.env.BETTER_AUTH_URL!, + 'X-Federation-Origin': process.env.BETTER_AUTH_URL!, + 'X-Federation-Target': targetUrl, + }, body: JSON.stringify({ method, payload: encrypted, signature }), - signal: AbortSignal.timeout(15_000), + timeout: 15_000, + proxyFallback: true, + serverUrl, }); if (!response.ok) { @@ -115,30 +173,150 @@ async function processFederationDelivery(job: Job) { } const responseBody = await response.json(); + debug('delivery to %s response body: %o', targetUrl, responseBody); + debug('responseBody.payload: %s', responseBody.payload); - if (responseBody.status !== "acknowledged") { + const ackPayload = + responseBody.payload?.method === "PROXY_RESPONSE" + ? responseBody.payload + : responseBody.method === "PROXY_RESPONSE" + ? responseBody + : null; + + if (!ackPayload || ackPayload.method !== "PROXY_RESPONSE") { debug('delivery to %s not acknowledged', targetUrl); throw new UnrecoverableError(`Federation delivery to ${targetUrl} failed: ${response.status} - ${JSON.stringify(responseBody)}`); } if (job.name === 'deliver-follow') { - const followId = JSON.parse(payload).following?.id; - if (followId && typeof responseBody.accepted === "boolean") { - await db.update(follows).set({ accepted: responseBody.accepted }) - .where(eq(follows.id, followId)); - debug('updated follow %s accepted=%s', followId, responseBody.accepted); + let followPayload: z.infer; + debug('delivery to %s is a follow, updating follow', targetUrl); + debug('ackPayload: %o', ackPayload); + + if (ackPayload.method === "PROXY_RESPONSE") { + // Decrypt the payload + const decrypted = FollowEnvelopeSchema.safeParse(ackPayload.data) + if (!decrypted.success) { + debug('failed to parse follow payload: %s', ackPayload.data); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Failed to parse follow payload, dropping job ${job.id}`); + } + + debug("payload data: %o", decrypted.data); + // Decrypt the signature + const signature = verifySignature(decrypted.data._raw, ackPayload.signature, new Uint8Array(Buffer.from(server.publicKey!, 'base64'))); + + if (!signature) { + debug('signature verification failed, dropping job %s', job.id); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Signature verification failed, dropping job ${job.id}`); + } + + followPayload = decrypted.data as z.infer; + } else { + const validated = FollowEnvelopeSchema.safeParse(ackPayload); + if (!validated.success) { + debug('failed to parse follow payload: %s', ackPayload); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Failed to parse follow payload, dropping job ${job.id}`); + } + + followPayload = validated.data as z.infer; + } + + const followData = followPayload.following; + if (followData && followData.accepted) { + await db.update(follows).set({ accepted: followData.accepted }) + .where( + and( + eq(follows.followerId, followData.followerId), + eq(follows.followingId, followData.followingId), + eq(follows.followerServerUrl, serverUrl), + ) + ); + debug('updated follow %s accepted=%s', followData.id, followData.accepted); } } debug('job %s delivered successfully to %s', job.id, targetUrl); } +// --------------------------------------------------------------------------- +// Health-check worker processor +// --------------------------------------------------------------------------- + +const MAX_HEALTH_CHECK_ATTEMPTS = 5; + +async function processHealthCheck(job: Job) { + const { serverUrl } = job.data; + + const [server] = await db.select() + .from(serverRegistry) + .where(eq(serverRegistry.url, serverUrl)) + .limit(1); + + if (!server) { + debug('health-check: server %s not found in registry, skipping', serverUrl); + return; + } + + if (server.isHealthy) { + debug('health-check: server %s is already healthy, skipping', serverUrl); + return; + } + + if (server.unhealthyReason) { + const policy = getThreatPolicy(server.unhealthyReason as FederationErrorCode); + if (!policy.directHealthCheckable) { + debug('health-check: server %s has reason %s (not direct-checkable), skipping', serverUrl, server.unhealthyReason); + return; + } + } + + debug('health-check: pinging %s (attempt %d/%d)', serverUrl, server.healthCheckAttempts + 1, MAX_HEALTH_CHECK_ATTEMPTS); + + try { + const { response } = await federationFetch(serverUrl + '/discover', { + serverUrl, + timeout: 8_000, + skipHealthUpdate: true, + }); + + if (response.ok) { + debug('health-check: %s is reachable, marking healthy', serverUrl); + await markServerHealthy(serverUrl); + return; + } + + debug('health-check: %s returned HTTP %d', serverUrl, response.status); + } catch (err) { + debug('health-check: %s failed: %s', serverUrl, err instanceof FederationError ? err.code : err); + } + + const nextAttempt = server.healthCheckAttempts + 1; + await db.update(serverRegistry).set({ + healthCheckAttempts: nextAttempt, + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); + + if (nextAttempt < MAX_HEALTH_CHECK_ATTEMPTS) { + await scheduleHealthCheck(serverUrl, nextAttempt); + } else { + debug('health-check: %s exhausted all %d attempts, stopping', serverUrl, MAX_HEALTH_CHECK_ATTEMPTS); + console.warn(`[federation] health-check exhausted for ${serverUrl} after ${MAX_HEALTH_CHECK_ATTEMPTS} attempts`); + } +} + +// --------------------------------------------------------------------------- +// Worker startup +// --------------------------------------------------------------------------- + export function startFederationWorker() { createDebug.enable(process.env.DEBUG || ''); - console.log('[federation] Starting worker...'); + console.log('[federation] Starting workers...'); - const worker = new Worker( - QUEUE_NAME, + const deliveryWorker = new Worker( + DELIVERY_QUEUE_NAME, processFederationDelivery, { connection: createRedisConnection() as never, @@ -146,18 +324,18 @@ export function startFederationWorker() { }, ); - worker.on('ready', () => { - console.log('[federation] Worker connected to Redis and ready'); + deliveryWorker.on('ready', () => { + console.log('[federation] Delivery worker connected to Redis and ready'); }); - worker.on('failed', (job, err) => { + deliveryWorker.on('failed', (job, err) => { const retriesLeft = (job?.opts.attempts ?? 0) - (job?.attemptsMade ?? 0); - debug('job %s (%s) to %s failed (attempt %d, %d retries left): %s', job?.id, job?.name, job?.data.targetUrl, job?.attemptsMade, retriesLeft, err.message); + debug('delivery job %s (%s) to %s failed (attempt %d, %d retries left): %s', job?.id, job?.name, job?.data.targetUrl, job?.attemptsMade, retriesLeft, err.message); if (err.cause) debug('cause: %O', err.cause); }); - worker.on('completed', async (job) => { - debug('job %s (%s) completed, cleaning up delivery record %s', job.id, job.name, job.data.deliveryJobId); + deliveryWorker.on('completed', async (job) => { + debug('delivery job %s (%s) completed, cleaning up delivery record %s', job.id, job.name, job.data.deliveryJobId); try { await db.delete(deliveryJobs).where(eq(deliveryJobs.id, job.data.deliveryJobId)); } catch (err) { @@ -165,10 +343,31 @@ export function startFederationWorker() { } }); - worker.on('error', (err) => { - console.error('[federation] Worker error:', err); + deliveryWorker.on('error', (err) => { + console.error('[federation] Delivery worker error:', err); }); - debug('worker started'); - return worker; + const healthCheckWorker = new Worker( + HEALTH_CHECK_QUEUE_NAME, + processHealthCheck, + { + connection: createRedisConnection() as never, + concurrency: 3, + }, + ); + + healthCheckWorker.on('ready', () => { + console.log('[federation] Health-check worker connected to Redis and ready'); + }); + + healthCheckWorker.on('failed', (job, err) => { + debug('health-check job %s failed: %s', job?.id, err.message); + }); + + healthCheckWorker.on('error', (err) => { + console.error('[federation] Health-check worker error:', err); + }); + + debug('all workers started'); + return { deliveryWorker, healthCheckWorker }; } diff --git a/src/lib/db/schema/index.ts b/src/lib/db/schema/index.ts index 14e4084..4029d7b 100644 --- a/src/lib/db/schema/index.ts +++ b/src/lib/db/schema/index.ts @@ -25,6 +25,7 @@ export const user = pgTable("user", { displayUsername: text("display_username"), twoFactorEnabled: boolean("two_factor_enabled").default(false), isPrivate: boolean("is_private").default(false), + postPropagationPolicy: text("post_propagation_policy").default("all"), }); export const account = pgTable( @@ -72,16 +73,21 @@ export const posts = pgTable( { id: text("id").primaryKey(), content: jsonb("content").notNull(), - authorId: text("author_id") - .notNull() - .references(() => user.id, { onDelete: "cascade" }), + authorId: text("author_id").references(() => user.id, { + onDelete: "cascade", + }), + federatedAuthorId: text("federated_author_id"), published: timestamp("published").notNull(), isLocal: boolean("is_local").default(false).notNull(), isPrivate: boolean("is_private").default(false), createdAt: timestamp("created_at").notNull(), federationUrl: text("federation_url"), + federationPostId: text("federation_post_id"), }, - (table) => [index("posts_federationUrl_idx").on(table.federationUrl)], + (table) => [ + index("posts_federationUrl_idx").on(table.federationUrl), + index("posts_federationPostId_idx").on(table.federationPostId), + ], ); export const follows = pgTable( @@ -150,6 +156,8 @@ export const serverRegistry = pgTable( createdAt: timestamp("created_at").notNull(), updatedAt: timestamp("updated_at").notNull(), isHealthy: boolean("is_healthy").notNull(), + healthCheckAttempts: integer("health_check_attempts").default(0).notNull(), + unhealthyReason: text("unhealthy_reason"), }, (table) => [ uniqueIndex("serverRegistry_publicKey_uidx").on(table.publicKey), diff --git a/src/lib/federation/fetch.ts b/src/lib/federation/fetch.ts new file mode 100644 index 0000000..77fceeb --- /dev/null +++ b/src/lib/federation/fetch.ts @@ -0,0 +1,301 @@ +import db from '@/lib/db'; +import { serverRegistry } from '@/lib/db/schema'; +import { encryptPayload, getOwnSigningSecretKey, signMessage } from '@/lib/federation/keytools'; +import { markServerHealthy, markServerUnhealthy } from '@/lib/federation/registry'; +import { EMERGENCY_SWEEP_TIMEOUT, getThreatPolicy } from '@/lib/federation/threat-model'; +import createDebug from 'debug'; +import { and, desc, eq, ne } from 'drizzle-orm'; + +const debug = createDebug('app:federation:fetch'); + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +export type FederationErrorCode = + | "DNS_BLOCKED" + | "CONN_REFUSED" + | "CONN_RESET" + | "TIMEOUT" + | "TLS_ERROR" + | "UNKNOWN"; + +export class FederationError extends Error { + constructor( + public readonly code: FederationErrorCode, + public readonly url: string, + ) { + super(`Federation unreachable: ${code} — ${url}`); + this.name = 'FederationError'; + } + + get isProxyEligible(): boolean { + return getThreatPolicy(this.code).proxyEligible; + } +} + +export interface FederationFetchOptions { + method?: string; + headers?: Record; + body?: string; + timeout?: number; + proxyFallback?: boolean; + serverUrl?: string; + skipHealthUpdate?: boolean; +} + +export interface FederationFetchResult { + response: Response; + proxied: boolean; + proxyPeer?: string; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +function extractServerUrl(fullUrl: string, explicit?: string): string { + if (explicit) return explicit; + const parsed = new URL(fullUrl); + return `${parsed.protocol}//${parsed.host}`; +} + +function classifyError(err: unknown, url: string): FederationError { + const anyErr = err as Record | undefined; + const code = anyErr?.cause?.code ?? anyErr?.code ?? ''; + + if (anyErr?.name === 'AbortError' || anyErr?.name === 'TimeoutError') { + return new FederationError('TIMEOUT', url); + } + if (code === 'ENOTFOUND' || code === 'EAI_AGAIN') { + return new FederationError('DNS_BLOCKED', url); + } + if (code === 'ECONNREFUSED') { + return new FederationError('CONN_REFUSED', url); + } + if (code === 'ECONNRESET' || code === 'ETIMEDOUT') { + return new FederationError('CONN_RESET', url); + } + if (typeof code === 'string' && ( + code.startsWith('ERR_TLS') || + code.startsWith('ERR_SSL') || + code.startsWith('CERT_') || + code === 'DEPTH_ZERO_SELF_SIGNED_CERT' || + code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE' || + code === 'SELF_SIGNED_CERT_IN_CHAIN' + )) { + return new FederationError('TLS_ERROR', url); + } + return new FederationError('UNKNOWN', url); +} + +async function directFetch(url: string, opts: FederationFetchOptions): Promise { + const controller = new AbortController(); + const timeout = opts.timeout ?? 10_000; + const timer = setTimeout(() => controller.abort(), timeout); + + try { + const response = await fetch(url, { + method: opts.method ?? 'GET', + headers: opts.headers, + body: opts.body, + signal: controller.signal, + }); + clearTimeout(timer); + return response; + } catch (err) { + clearTimeout(timer); + throw classifyError(err, url); + } +} + +// --------------------------------------------------------------------------- +// Proxy peer selection & emergency sweep +// --------------------------------------------------------------------------- + +async function pickHealthyProxy(excludeUrl: string): Promise { + const ownUrl = process.env.BETTER_AUTH_URL!; + const [peer] = await db.select() + .from(serverRegistry) + .where( + and( + eq(serverRegistry.isHealthy, true), + ne(serverRegistry.url, excludeUrl), + ne(serverRegistry.url, ownUrl), + ), + ) + .orderBy(desc(serverRegistry.lastSeen)) + .limit(1); + + return peer ?? null; +} + +async function emergencySweep(excludeUrl: string): Promise { + debug('emergency sweep: pinging all unhealthy servers'); + const ownUrl = process.env.BETTER_AUTH_URL!; + + const unhealthyServers = await db.select() + .from(serverRegistry) + .where( + and( + eq(serverRegistry.isHealthy, false), + ne(serverRegistry.url, excludeUrl), + ne(serverRegistry.url, ownUrl), + ), + ) + .orderBy(desc(serverRegistry.lastSeen)); + + const checkable = unhealthyServers.filter(s => { + if (!s.unhealthyReason) return true; + const policy = getThreatPolicy(s.unhealthyReason as FederationErrorCode); + return policy.directHealthCheckable; + }); + + if (checkable.length === 0) { + debug('emergency sweep: no direct-checkable servers'); + return null; + } + + debug('emergency sweep: pinging %d servers in parallel (timeout %dms)', checkable.length, EMERGENCY_SWEEP_TIMEOUT); + + const results = await Promise.allSettled( + checkable.map(async (server) => { + const res = await fetch(server.url + '/discover', { + signal: AbortSignal.timeout(EMERGENCY_SWEEP_TIMEOUT), + }); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + return server; + }), + ); + + const recovered: (typeof serverRegistry.$inferSelect)[] = []; + for (const result of results) { + if (result.status === 'fulfilled') { + recovered.push(result.value); + } + } + + if (recovered.length === 0) { + debug('emergency sweep: no servers recovered — federation is STRANDED'); + console.error('[federation] STRANDED: all known peers are unreachable. Inbound registration is the only recovery path.'); + return null; + } + + debug('emergency sweep: %d server(s) recovered', recovered.length); + for (const server of recovered) { + await markServerHealthy(server.url); + } + + return recovered[0]; +} + +// --------------------------------------------------------------------------- +// Proxy routing +// --------------------------------------------------------------------------- + +async function attemptProxyRoute( + url: string, + opts: FederationFetchOptions, + targetServerUrl: string, + proxyPeer: typeof serverRegistry.$inferSelect, +): Promise { + debug('proxy route: sending through %s → %s', proxyPeer.url, targetServerUrl); + + const [targetServer] = await db.select() + .from(serverRegistry) + .where(eq(serverRegistry.url, targetServerUrl)) + .limit(1); + + if (!targetServer) { + throw new Error(`Target server ${targetServerUrl} not found in registry for proxy routing`); + } + + const recipientKey = new Uint8Array(Buffer.from(targetServer.encryptionPublicKey, 'base64')); + const innerPayload = JSON.stringify({ + targetUrl: url, + method: opts.method ?? 'GET', + headers: opts.headers ?? {}, + body: opts.body ?? null, + }); + + const encrypted = encryptPayload(innerPayload, recipientKey); + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + const proxyUrl = proxyPeer.url + '/proxy'; + const proxyBody = JSON.stringify({ + method: 'PROXY', + targetUrl: targetServerUrl + '/proxy', + publicSigningKey: process.env.FEDERATION_PUBLIC_KEY!, + publicEncryptionKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, + payload: encrypted, + signature, + }); + + const proxyResponse = await fetch(proxyUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Federation-Origin': process.env.BETTER_AUTH_URL!, + 'Origin': process.env.BETTER_AUTH_URL!, + }, + body: proxyBody, + signal: AbortSignal.timeout(opts.timeout ?? 15_000), + }); + + if (!proxyResponse.ok) { + throw new Error(`Proxy ${proxyPeer.url} returned ${proxyResponse.status}`); + } + + return { response: proxyResponse, proxied: true, proxyPeer: proxyPeer.url }; +} + +// --------------------------------------------------------------------------- +// Main entry point +// --------------------------------------------------------------------------- + +export async function federationFetch( + url: string, + opts: FederationFetchOptions = {}, +): Promise { + const serverUrl = extractServerUrl(url, opts.serverUrl); + + // Gate 0: direct fetch + try { + const response = await directFetch(url, opts); + return { response, proxied: false }; + } catch (err) { + if (!(err instanceof FederationError)) throw err; + + debug('direct fetch to %s failed: %s', url, err.code); + + if (!opts.skipHealthUpdate) { + await markServerUnhealthy(serverUrl, err.code).catch(e => + debug('failed to mark %s unhealthy: %O', serverUrl, e), + ); + } + + const policy = getThreatPolicy(err.code); + + // Gate 1: proxy fallback + if (opts.proxyFallback && policy.proxyEligible) { + let proxyPeer = await pickHealthyProxy(serverUrl); + + // If no healthy proxy is found, we'll do an emergency sweep to find a new proxy. + if (!proxyPeer) { + proxyPeer = await emergencySweep(serverUrl); + } + + if (proxyPeer) { + try { + return await attemptProxyRoute(url, opts, serverUrl, proxyPeer); + } catch (proxyErr) { + debug('proxy route through %s failed: %O', proxyPeer.url, proxyErr); + } + } else { + throw new Error("No healthy proxy found. Emergency sweep failed."); + } + } + + throw err; + } +} diff --git a/src/lib/federation/peer-registry-url.ts b/src/lib/federation/peer-registry-url.ts new file mode 100644 index 0000000..6204ecc --- /dev/null +++ b/src/lib/federation/peer-registry-url.ts @@ -0,0 +1,19 @@ +/** + * Values for `follows.follower_server_url` / `follows.following_server_url`, which FK to + * `server_registry.url`. This instance is intentionally absent from that table, so when the + * peer is local we persist null instead of violating the FK. + */ +export function peerRegistryUrlOrNull(peerUrl: string | null | undefined): string | null { + if (peerUrl == null || peerUrl === "") return null; + const own = process.env.BETTER_AUTH_URL; + let peerOrigin: string; + try { + peerOrigin = new URL(peerUrl).origin; + } catch { + return null; + } + if (!own) return peerOrigin; + const ownOrigin = new URL(own).origin; + if (peerOrigin === ownOrigin) return null; + return peerOrigin; +} diff --git a/src/lib/federation/proxy-helpers/federated-post.ts b/src/lib/federation/proxy-helpers/federated-post.ts new file mode 100644 index 0000000..f01b235 --- /dev/null +++ b/src/lib/federation/proxy-helpers/federated-post.ts @@ -0,0 +1,94 @@ +import db from "@/lib/db"; +import { posts } from "@/lib/db/schema"; +import { getOwnSigningSecretKey, signMessage, verifySignature } from "@/lib/federation/keytools"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; +import { and, eq } from "drizzle-orm"; +import type { z } from "zod"; + +type Tx = Parameters[0]>[0]; + +export type FederatedPostSender = { + publicKey: string; + encryptionPublicKey: string; + url: string; +}; + +export type FederatedPostResult = + | { + ok: true; + innerPayload: string; + signature: string; + senderEncryptionPublicKeyB64: string; + } + | { ok: false; error: string; code: string; status: number }; + +export async function applyFederatedPostInTransaction( + tx: Tx, + envelope: z.infer, + bodySignature: string, + sender: FederatedPostSender, +): Promise { + const senderPublicKey = new Uint8Array(Buffer.from(sender.publicKey, "base64")); + if (!verifySignature(envelope._raw, bodySignature, senderPublicKey)) { + return { + ok: false, + error: "The provided signature is invalid. Please redo the discovery process and try again.", + code: "INVALID_SIGNATURE", + status: 403, + }; + } + + const [existing] = await tx + .select({ id: posts.id }) + .from(posts) + .where( + and( + eq(posts.federationUrl, envelope.federationUrl), + eq(posts.federationPostId, envelope.post.id), + ), + ) + .limit(1); + + if (existing) { + return { + ok: false, + error: "This post has already been federated to this server.", + code: "FEDERATED_POST_ALREADY_EXISTS", + status: 409, + }; + } + + const localId = crypto.randomUUID(); + const published = new Date(envelope.post.published); + + await tx.insert(posts).values({ + id: localId, + content: envelope.post.content, + authorId: null, + federatedAuthorId: envelope.post.authorId, + published, + isLocal: false, + isPrivate: envelope.post.isPrivate, + federationUrl: envelope.federationUrl, + federationPostId: envelope.post.id, + createdAt: new Date(), + }); + + const innerPayload = JSON.stringify({ + post: { + id: localId, + federationPostId: envelope.post.id, + }, + federationUrl: process.env.BETTER_AUTH_URL!, + method: "FEDERATE_POST" as const, + }); + + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + return { + ok: true, + innerPayload, + signature, + senderEncryptionPublicKeyB64: sender.encryptionPublicKey, + }; +} diff --git a/src/lib/federation/registry.ts b/src/lib/federation/registry.ts index d8b7d97..308c823 100644 --- a/src/lib/federation/registry.ts +++ b/src/lib/federation/registry.ts @@ -1,5 +1,6 @@ import db from '@/lib/db'; import { serverRegistry } from '@/lib/db/schema'; +import { federationFetch, FederationError, type FederationErrorCode } from '@/lib/federation/fetch'; import { assertSafeUrl } from '@/lib/federation/url-guard'; import createDebug from 'debug'; import { eq } from 'drizzle-orm'; @@ -16,7 +17,43 @@ export async function upsertServer(url: string, publicKey: string, encryptionPub createdAt: new Date(), updatedAt: new Date(), isHealthy: true, - }).onConflictDoNothing(); + healthCheckAttempts: 0, + unhealthyReason: null, + }).onConflictDoUpdate({ + target: serverRegistry.url, + set: { + lastSeen: new Date(), + updatedAt: new Date(), + }, + }); +} + +export async function markServerUnhealthy(serverUrl: string, reason: FederationErrorCode): Promise { + debug('marking server %s as unhealthy (reason: %s)', serverUrl, reason); + await db.update(serverRegistry).set({ + isHealthy: false, + unhealthyReason: reason, + healthCheckAttempts: 0, + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); + + try { + const { scheduleHealthCheck } = await import('@/lib/bull'); + await scheduleHealthCheck(serverUrl, 0); + } catch (err) { + debug('failed to schedule health check for %s: %O', serverUrl, err); + } +} + +export async function markServerHealthy(serverUrl: string): Promise { + debug('marking server %s as healthy', serverUrl); + await db.update(serverRegistry).set({ + isHealthy: true, + unhealthyReason: null, + healthCheckAttempts: 0, + lastSeen: new Date(), + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); } export class DiscoveryError extends Error { @@ -38,15 +75,18 @@ export async function discoverAndRegister(serverUrl: string): Promise { let remote: { url?: string; publicKey?: string; encryptionPublicKey?: string }; try { - const res = await fetch(serverUrl + '/discover', { - signal: AbortSignal.timeout(10_000), + const { response } = await federationFetch(serverUrl + '/discover', { + serverUrl, }); - if (!res.ok) { - throw new DiscoveryError(`GET /discover returned ${res.status}`); + if (!response.ok) { + throw new DiscoveryError(`GET /discover returned ${response.status}`); } - remote = await res.json(); + remote = await response.json(); } catch (err) { if (err instanceof DiscoveryError) throw err; + if (err instanceof FederationError) { + throw new DiscoveryError(`Failed to reach ${serverUrl}/discover: ${err.code}`); + } throw new DiscoveryError(`Failed to reach ${serverUrl}/discover: ${err instanceof Error ? err.message : err}`); } @@ -72,7 +112,7 @@ export async function discoverAndRegister(serverUrl: string): Promise { debug('sending mutual REGISTER to %s', serverUrl); try { - await fetch(serverUrl + '/discover', { + await federationFetch(serverUrl + '/discover', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ @@ -81,7 +121,7 @@ export async function discoverAndRegister(serverUrl: string): Promise { publicKey: process.env.FEDERATION_PUBLIC_KEY!, encryptionPublicKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, }), - signal: AbortSignal.timeout(10_000), + serverUrl, }); } catch (err) { debug('mutual REGISTER to %s failed (non-fatal): %s', serverUrl, err instanceof Error ? err.message : err); diff --git a/src/lib/federation/threat-model.ts b/src/lib/federation/threat-model.ts new file mode 100644 index 0000000..909f8b7 --- /dev/null +++ b/src/lib/federation/threat-model.ts @@ -0,0 +1,57 @@ +import type { FederationErrorCode } from "./fetch"; + +export interface ThreatPolicy { + proxyEligible: boolean; + directHealthCheckable: boolean; + description: string; +} + +export const DEFAULT_THREAT_MODEL: Record = { + DNS_BLOCKED: { + proxyEligible: true, + directHealthCheckable: false, + description: "DNS unreachable -- likely censored, relay-eligible", + }, + TIMEOUT: { + proxyEligible: false, + directHealthCheckable: true, + description: "Timed out -- ambiguous, rely on existing BullMQ retries", + }, + CONN_REFUSED: { + proxyEligible: false, + directHealthCheckable: true, + description: "Connection refused -- server is down, proxy won't help", + }, + CONN_RESET: { + proxyEligible: false, + directHealthCheckable: true, + description: "Connection reset -- ambiguous, likely server-side", + }, + TLS_ERROR: { + proxyEligible: false, + directHealthCheckable: true, + description: "TLS failure -- possible MITM, do not proxy", + }, + UNKNOWN: { + proxyEligible: true, + directHealthCheckable: true, + description: "Unknown error -- safety default", + }, +}; + +export const EMERGENCY_SWEEP_TIMEOUT = 2_000; + +/** + * Admin overrides -- edit this to match your federation's threat model. + * Any key you set here overrides the corresponding default above. + * + * Example for a heavily censored region: + * TIMEOUT: { proxyEligible: true }, + */ +export const THREAT_MODEL_OVERRIDES: Partial>> = {}; + +export function getThreatPolicy(code: FederationErrorCode): ThreatPolicy { + const base = DEFAULT_THREAT_MODEL[code]; + const override = THREAT_MODEL_OVERRIDES[code]; + return override ? { ...base, ...override } : base; +} diff --git a/src/lib/federation/url-guard.ts b/src/lib/federation/url-guard.ts index 7065669..fc0fdfb 100644 --- a/src/lib/federation/url-guard.ts +++ b/src/lib/federation/url-guard.ts @@ -1,6 +1,6 @@ import createDebug from "debug"; -const debug = createDebug("federation:url-guard"); +const debug = createDebug("app:federation:url-guard"); const BLOCKED_HOSTNAMES = new Set([ "localhost", @@ -12,8 +12,21 @@ const BLOCKED_HOSTNAMES = new Set([ "169.254.169.254", ]); -const DEV_ALLOWED_HOSTNAMES = new Set(["localhost", "127.0.0.1", process.env.DEV_ALLOWED_HOSTNAMES!]); -debug("DEV_ALLOWED_HOSTNAMES: %o", DEV_ALLOWED_HOSTNAMES); +const SSRF_BYPASS = process.env.FEDERATION_ALLOW_PRIVATE_URLS === "true"; + +const DEV_ALLOWED_HOSTNAMES = new Set([ + "localhost", + "127.0.0.1", +]); + +if (typeof process.env.DEV_ALLOWED_HOSTNAMES === "string" && process.env.DEV_ALLOWED_HOSTNAMES.trim() !== "") { + for (const h of process.env.DEV_ALLOWED_HOSTNAMES.split(",")) { + const hostname = h.trim(); + if (hostname) DEV_ALLOWED_HOSTNAMES.add(hostname); + } +} + +debug("SSRF bypass: %s, DEV_ALLOWED_HOSTNAMES: %s", SSRF_BYPASS, [...DEV_ALLOWED_HOSTNAMES].join(", ")); function isPrivateIPv4(hostname: string): boolean { const parts = hostname.split(".").map(Number); if (parts.length !== 4 || parts.some((p) => isNaN(p))) return false; @@ -39,9 +52,8 @@ function isPrivateIPv6(hostname: string): boolean { /** * Throws if the URL points to a private/internal address or uses a - * non-HTTP(S) protocol. In development, localhost/127.0.0.1 are explicitly - * allowed for local federation testing while all other safety checks - * remain enforced. + * non-HTTP(S) protocol. Set FEDERATION_ALLOW_PRIVATE_URLS=true to + * allow localhost/127.0.0.1 for local federation testing. */ export function assertSafeUrl(url: string): void { let parsed: URL; @@ -57,7 +69,7 @@ export function assertSafeUrl(url: string): void { const hostname = parsed.hostname; - if (process.env.NODE_ENV === "development" && DEV_ALLOWED_HOSTNAMES.has(hostname)) { + if (SSRF_BYPASS && DEV_ALLOWED_HOSTNAMES.has(hostname)) { return; } diff --git a/src/lib/plugins/server/federation.ts b/src/lib/plugins/server/federation.ts index 3024555..46a2988 100644 --- a/src/lib/plugins/server/federation.ts +++ b/src/lib/plugins/server/federation.ts @@ -43,6 +43,17 @@ export const federation = () => { type: "boolean", required: true, index: false + }, + healthCheckAttempts: { + type: "number", + required: true, + index: false, + defaultValue: 0 + }, + unhealthyReason: { + type: "string", + required: false, + index: false } } }, diff --git a/src/lib/plugins/server/helpers/social/endpoints/follows.ts b/src/lib/plugins/server/helpers/social/endpoints/follows.ts index 2881433..6ec2653 100644 --- a/src/lib/plugins/server/helpers/social/endpoints/follows.ts +++ b/src/lib/plugins/server/helpers/social/endpoints/follows.ts @@ -1,8 +1,10 @@ import { getFederationQueue } from "@/lib/bull"; import db from "@/lib/db"; import { blacklistedServers, deliveryJobs, follows, serverRegistry, user } from "@/lib/db/schema"; -import { decryptPayload, getOwnEncryptionSecretKey, verifySignature } from "@/lib/federation/keytools"; +import { verifySignature } from "@/lib/federation/keytools"; +import { peerRegistryUrlOrNull } from "@/lib/federation/peer-registry-url"; import { discoverAndRegister, DiscoveryError } from "@/lib/federation/registry"; +import { FollowEnvelopeSchema } from "@/lib/zod/methods/FollowSchema"; import { createAuthEndpoint, getSessionFromCtx } from "better-auth/api"; import createDebug from "debug"; import { and, eq } from "drizzle-orm"; @@ -20,38 +22,7 @@ const followSchema = z.discriminatedUnion( z.object({ method: z.literal("FEDERATE"), signature: z.string(), - payload: z.object({ - ephemeralPublicKey: z.string(), - iv: z.string(), - ciphertext: z.string(), - authTag: z.string(), - }).transform((payload, ctx) => { - try { - const decrypted = decryptPayload(payload, getOwnEncryptionSecretKey()); - const parsedPayload = JSON.parse(decrypted); - - const parsedPayloadSchema = z.object({ - following: z.object({ - id: z.string(), - createdAt: z.coerce.date(), - followerId: z.string(), - followingId: z.string(), - accepted: z.boolean(), - followerServerUrl: z.string().nullable(), - }), - federationUrl: z.string(), - method: z.literal("FEDERATE"), - }).safeParse(parsedPayload); - if (!parsedPayloadSchema.success) { - ctx.addIssue({ code: "custom", message: "Invalid payload" }); - return z.never(); - } - return { ...parsedPayloadSchema.data, _raw: decrypted }; - } catch { - ctx.addIssue({ code: "custom", message: "Invalid payload" }); - return z.never(); - } - }), + payload: FollowEnvelopeSchema }), z.object({ method: z.literal("UNFOLLOW"), @@ -64,12 +35,11 @@ export const followUser = createAuthEndpoint("/social/follows", { method: "POST", body: followSchema, }, async (context) => { - debug("FOLLOW – %s", context.body.method); + const { method } = context.body; switch (method) { case "INSERT": { const session = await getSessionFromCtx(context); - debug("FOLLOW – user: %o", session); if (!session) { return context.json({ error: "Unauthorized" }, { status: 401 }); }; @@ -150,7 +120,7 @@ export const followUser = createAuthEndpoint("/social/follows", { followingId: userId, accepted: false, createdAt: new Date(), - followerServerUrl: serverUrl, + followerServerUrl: peerRegistryUrlOrNull(serverUrl), }).returning(); const job = await db.insert(deliveryJobs).values({ @@ -221,7 +191,7 @@ export const followUser = createAuthEndpoint("/social/follows", { followingId: following.followingId, accepted, createdAt: new Date(), - followingServerUrl: server.url, + followingServerUrl: peerRegistryUrlOrNull(server.url), }); return context.json({ status: "acknowledged", accepted }, { status: 200 }); diff --git a/src/lib/plugins/server/helpers/social/endpoints/posts.ts b/src/lib/plugins/server/helpers/social/endpoints/posts.ts index dc860ae..2935047 100644 --- a/src/lib/plugins/server/helpers/social/endpoints/posts.ts +++ b/src/lib/plugins/server/helpers/social/endpoints/posts.ts @@ -1,65 +1,182 @@ -import db from "@/lib/db"; -import { deliveryJobs, follows, posts } from "@/lib/db/schema"; import { getFederationQueue, type FederationDeliveryJob } from "@/lib/bull"; +import db from "@/lib/db"; +import { deliveryJobs, follows, posts, serverRegistry } from "@/lib/db/schema"; +import { encryptPayload } from "@/lib/federation/keytools"; +import { applyFederatedPostInTransaction } from "@/lib/federation/proxy-helpers/federated-post"; +import { EncryptedEnvelopeBaseSchema } from "@/lib/zod/EncryptedEnvelope"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; import minioClient from "@/plugins/server/storage/minio.client"; import { createAuthEndpoint, getSessionFromCtx } from "better-auth/api"; +import createDebug from "debug"; import { and, eq } from "drizzle-orm"; import { z } from "zod"; import { postContentSchema } from "../social"; +const debug = createDebug("app:plugins:server:helpers:social:posts"); + +const federatedPostRequestSchema = z.object({ + method: z.literal("FEDERATE_POST"), + payload: EncryptedEnvelopeBaseSchema, + signature: z.string(), +}); + +const createPostBodySchema = z.union([federatedPostRequestSchema, postContentSchema]); + export const createPost = createAuthEndpoint("/social/posts", { method: "POST", - body: postContentSchema, + body: createPostBodySchema, }, async (context) => { - const content = context.body; - const user = await getSessionFromCtx(context) + const body = context.body; + + if (typeof body === "object" && body !== null && "method" in body && body.method === "FEDERATE_POST") { + const { payload: encryptedPayload, signature } = body; + + const parsedEnvelope = PostEnvelopeSchema.safeParse(encryptedPayload); + if (!parsedEnvelope.success) { + return context.json( + { error: "Invalid federated post payload", code: "INVALID_FEDERATED_POST_PAYLOAD" }, + { status: 400 }, + ); + } + + const envelope = parsedEnvelope.data; + const [server] = await db + .select({ + url: serverRegistry.url, + publicKey: serverRegistry.publicKey, + encryptionPublicKey: serverRegistry.encryptionPublicKey, + }) + .from(serverRegistry) + .where(eq(serverRegistry.url, envelope.federationUrl)) + .limit(1); + + if (!server) { + return context.json( + { + error: "Unknown federation server. Please redo the discovery process and try again.", + code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", + }, + { status: 403 }, + ); + } + + const result = await db.transaction(async (tx) => + applyFederatedPostInTransaction(tx, envelope, signature, server), + ); + + if (!result.ok) { + return context.json({ error: result.error, code: result.code }, { status: result.status }); + } + + const recipientKey = new Uint8Array(Buffer.from(result.senderEncryptionPublicKeyB64, "base64")); + return context.json( + { + method: "PROXY_RESPONSE" as const, + status: "acknowledged", + data: encryptPayload(result.innerPayload, recipientKey), + signature: result.signature, + }, + { status: 200 }, + ); + } + + const content = body; + const user = await getSessionFromCtx(context); if (!user) { return context.json({ error: "Unauthorized" }, { status: 401 }); } - // Create post - const post = await db.insert(posts).values({ - id: crypto.randomUUID(), - content: content, - authorId: user.user.id, - published: new Date(), - isLocal: true, - createdAt: new Date(), - }).returning({ id: posts.id }); + const isPrivate = user.user.isPrivate; + const shouldPropagate = { + all: true, + followers: isPrivate, + none: false, + }[user.user.postPropagationPolicy as "all" | "followers" | "none"] ?? true; - // Enqueue federation delivery jobs for each follower's server - const followers = await db.select().from(follows).where(and(eq(follows.followingId, user.user.id), eq(follows.accepted, true))); - const uniqueUrls = [...new Set(followers.map(f => f.followerServerUrl).filter(Boolean))] as string[]; - const payload = JSON.stringify({ content }); + const postId = crypto.randomUUID(); + const published = new Date(); + const inserted = await db + .insert(posts) + .values({ + id: postId, + content, + authorId: user.user.id, + published, + isLocal: shouldPropagate, + isPrivate, + federationUrl: process.env.BETTER_AUTH_URL!, + federationPostId: postId, + createdAt: new Date(), + }) + .returning({ id: posts.id }); - const jobRows = uniqueUrls.map(url => ({ - id: crypto.randomUUID(), - targetUrl: url + "/social/posts", - serverUrl: url, - payload, - attempts: 0, - createdAt: new Date(), - })); + let federationDeliveriesQueued = 0; - if (jobRows.length > 0) { - await db.insert(deliveryJobs).values(jobRows); + if (shouldPropagate) { + const followers = await db + .select() + .from(follows) + .where(and(eq(follows.followingId, user.user.id), eq(follows.accepted, true))); + const following = await db + .select() + .from(follows) + .where(and(eq(follows.followerId, user.user.id), eq(follows.accepted, true))); - await getFederationQueue().addBulk( - jobRows.map(row => ({ - name: 'deliver-post' as const, - data: { - deliveryJobId: row.id, - targetUrl: row.targetUrl, - serverUrl: row.serverUrl, - payload: row.payload, - } satisfies FederationDeliveryJob, - })), - ); + debug("followers: %o", followers); + debug("following: %o", following); + + const uniqueUrls = [ + ...new Set([ + ...followers.map((f) => f.followingServerUrl).filter(Boolean), + ...following.map((f) => f.followerServerUrl).filter(Boolean), + ]), + ] as string[]; + + federationDeliveriesQueued = uniqueUrls.length; + + if (uniqueUrls.length > 0) { + const jobPayload = JSON.stringify({ + method: "FEDERATE_POST" as const, + federationUrl: process.env.BETTER_AUTH_URL!, + post: { + id: postId, + content, + authorId: user.user.id, + published: published.toISOString(), + isPrivate, + }, + }); + + const jobRows = uniqueUrls.map((url) => ({ + id: crypto.randomUUID(), + targetUrl: url + "/api/auth/social/posts", + serverUrl: url, + payload: jobPayload, + attempts: 0, + createdAt: new Date(), + })); + + await db.insert(deliveryJobs).values(jobRows); + + await getFederationQueue().addBulk( + jobRows.map((row) => ({ + name: "deliver-post" as const, + data: { + deliveryJobId: row.id, + targetUrl: row.targetUrl, + serverUrl: row.serverUrl, + payload: row.payload, + } satisfies FederationDeliveryJob, + })), + ); + } } - return context.json({ id: post[0].id }, { status: 200 }); - + return context.json( + { id: inserted[0].id, federationDeliveriesQueued }, + { status: 200 }, + ); }); export const getPost = createAuthEndpoint("/social/posts/:id", { @@ -107,4 +224,4 @@ export const uploadFile = createAuthEndpoint("/social/posts/files", { const objectUrl = `${protocol}://${process.env.MINIO_ENDPOINT}:${process.env.MINIO_PORT}/${process.env.MINIO_BUCKET}/${objectKey}`; return context.json({ presignedUrl, objectUrl, objectKey }, { status: 200 }); -}) \ No newline at end of file +}); diff --git a/src/lib/plugins/server/helpers/social/social.ts b/src/lib/plugins/server/helpers/social/social.ts index efae50a..61b8d0a 100644 --- a/src/lib/plugins/server/helpers/social/social.ts +++ b/src/lib/plugins/server/helpers/social/social.ts @@ -63,13 +63,18 @@ export default { }, authorId: { type: "string", - required: true, + required: false, index: false, references: { model: "user", field: "id" } }, + federatedAuthorId: { + type: "string", + required: false, + index: false, + }, published: { type: "date", required: true, @@ -84,6 +89,7 @@ export default { defaultValue: false, }, // "isPrivate" will be used to determine if the post should be visible only for the user's followers + // If "isLocal" is set to true and this to false, only users on the same server will be able to see the psot isPrivate: { type: "boolean", required: false, @@ -99,6 +105,12 @@ export default { type: "string", required: false, index: true, + }, + // This serves as a reference to the post on the original server this post came from + federationPostId: { + type: "string", + required: false, + index: true, } } }, diff --git a/src/lib/zod/EncryptedEnvelope.ts b/src/lib/zod/EncryptedEnvelope.ts new file mode 100644 index 0000000..f2793dd --- /dev/null +++ b/src/lib/zod/EncryptedEnvelope.ts @@ -0,0 +1,40 @@ +import { z, type ZodType } from "zod"; +import { decryptPayload, getOwnEncryptionSecretKey } from "../federation/keytools"; + +/** + * Raw envelope shape — validates the four ECIES fields without decrypting. + * Use this when you only need to confirm the envelope structure (e.g. a proxy + * that forwards opaque ciphertext without access to the recipient's key). + */ +export const EncryptedEnvelopeBaseSchema = z.object({ + ephemeralPublicKey: z.string(), + iv: z.string(), + ciphertext: z.string(), + authTag: z.string(), +}); + +/** + * Factory that returns an envelope schema which decrypts the ciphertext and + * validates the resulting plaintext against {@link innerSchema}. + * + * The output type is `z.infer & { _raw: string }` — the parsed inner + * payload plus the raw decrypted JSON string (useful for signature verification). + */ +export function createEncryptedEnvelopeSchema(innerSchema: T) { + return EncryptedEnvelopeBaseSchema.transform((payload, ctx) => { + try { + const decrypted = decryptPayload(payload, getOwnEncryptionSecretKey()); + const parsed = innerSchema.safeParse(JSON.parse(decrypted)); + + if (!parsed.success) { + ctx.addIssue({ code: "custom", message: parsed.error.issues.map(i => i.message).join("; ") }); + return z.NEVER; + } + + return Object.assign({}, parsed.data as z.infer, { _raw: decrypted }); + } catch { + ctx.addIssue({ code: "custom", message: "Decryption failed" }); + return z.NEVER; + } + }); +} diff --git a/src/lib/zod/methods/FollowSchema.ts b/src/lib/zod/methods/FollowSchema.ts new file mode 100644 index 0000000..d7a7b84 --- /dev/null +++ b/src/lib/zod/methods/FollowSchema.ts @@ -0,0 +1,18 @@ +import { z } from "zod"; +import { createEncryptedEnvelopeSchema } from "../EncryptedEnvelope"; + +const FollowInnerPayloadSchema = z.object({ + following: z.object({ + id: z.string(), + createdAt: z.coerce.date(), + followerId: z.string(), + followingId: z.string(), + accepted: z.boolean(), + followerServerUrl: z.string().nullable(), + }), + federationUrl: z.string(), + method: z.literal("FEDERATE"), +}); + +export const FollowEnvelopeSchema = createEncryptedEnvelopeSchema(FollowInnerPayloadSchema); +export default FollowInnerPayloadSchema; diff --git a/src/lib/zod/methods/PostFederationSchema.ts b/src/lib/zod/methods/PostFederationSchema.ts new file mode 100644 index 0000000..808be44 --- /dev/null +++ b/src/lib/zod/methods/PostFederationSchema.ts @@ -0,0 +1,18 @@ +import { postContentSchema } from "@/lib/plugins/server/helpers/social/social"; +import { z } from "zod"; +import { createEncryptedEnvelopeSchema } from "../EncryptedEnvelope"; + +export const PostInnerPayloadSchema = z.object({ + post: z.object({ + id: z.string(), + content: postContentSchema, + /** User id on the sending federation node (not required to exist on the recipient). */ + authorId: z.string(), + published: z.string(), + isPrivate: z.boolean(), + }), + federationUrl: z.string(), + method: z.literal("FEDERATE_POST"), +}); + +export const PostEnvelopeSchema = createEncryptedEnvelopeSchema(PostInnerPayloadSchema); diff --git a/tests/helpers/queue.ts b/tests/helpers/queue.ts new file mode 100644 index 0000000..7346439 --- /dev/null +++ b/tests/helpers/queue.ts @@ -0,0 +1,50 @@ +import { Queue, type Job } from "bullmq" +import Redis from "ioredis" + +function createRedis() { + return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }) +} + +const HEALTH_CHECK_QUEUE = "federation-health-check" +const RETRY_QUEUE = "federation-retry" + +let _healthQueue: Queue | null = null +let _retryQueue: Queue | null = null + +export function getTestHealthCheckQueue(): Queue { + if (!_healthQueue) { + _healthQueue = new Queue(HEALTH_CHECK_QUEUE, { connection: createRedis() as never }) + } + return _healthQueue +} + +export function getTestRetryQueue(): Queue { + if (!_retryQueue) { + _retryQueue = new Queue(RETRY_QUEUE, { connection: createRedis() as never }) + } + return _retryQueue +} + +export async function getHealthCheckJobsFor(serverUrl: string): Promise { + const queue = getTestHealthCheckQueue() + const jobs = await queue.getJobs(["waiting", "delayed", "active", "completed", "failed"]) + return jobs.filter((j) => j.data?.serverUrl === serverUrl) +} + +export async function getRetryJobsFor(serverUrl: string): Promise { + const queue = getTestRetryQueue() + const jobs = await queue.getJobs(["waiting", "delayed", "active", "completed", "failed"]) + return jobs.filter((j) => j.data?.serverUrl === serverUrl) +} + +export async function drainAllQueues(): Promise { + const hq = getTestHealthCheckQueue() + const rq = getTestRetryQueue() + await hq.obliterate({ force: true }).catch(() => {}) + await rq.obliterate({ force: true }).catch(() => {}) +} + +export async function closeQueues(): Promise { + if (_healthQueue) { await _healthQueue.close(); _healthQueue = null } + if (_retryQueue) { await _retryQueue.close(); _retryQueue = null } +} diff --git a/tests/proxies/follow.ts b/tests/proxies/follow.ts new file mode 100644 index 0000000..2545508 --- /dev/null +++ b/tests/proxies/follow.ts @@ -0,0 +1,596 @@ +/** + * Manual proxy chain test script. + * + * You need 3 different instances up and running to use this test script. That includes yours. + * + * Exercises the full A → B → C → B → A proxy relay against real federation + * instances. Run this from Server A while Server B (proxy) and Server C + * (target) are already up. + * + * Usage: + * bun run testProxy.ts --proxy --target + * + * Examples: + * bun run testProxy.ts --proxy https://proxy.example.com --target https://target.example.com + * bun run testProxy.ts --proxy http://localhost:3001 --target http://localhost:3002 + */ + +import db from "@/lib/db"; +import { deliveryJobs, follows, serverRegistry } from "@/lib/db/schema"; +import { encryptPayload, fingerprintKey, signMessage } from "@/lib/federation/keytools"; +import { config } from "dotenv"; +import { desc, eq } from "drizzle-orm"; +import nacl from "tweetnacl"; + +config({ path: ".env.local" }); + +const FETCH_TIMEOUT_MS = 15_000; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +interface FedKeys { + signingPublicKey: string; + signingSecretKey: string; + encryptionPublicKey: string; + encryptionSecretKey: string; +} + +function generateKeypair(): FedKeys { + const signing = nacl.sign.keyPair(); + const encryption = nacl.box.keyPair(); + return { + signingPublicKey: Buffer.from(signing.publicKey).toString("base64"), + signingSecretKey: Buffer.from(signing.secretKey).toString("base64"), + encryptionPublicKey: Buffer.from(encryption.publicKey).toString("base64"), + encryptionSecretKey: Buffer.from(encryption.secretKey).toString("base64"), + }; +} + +async function readErrorBody(response: Response): Promise { + try { + const body = await response.json(); + return body?.error ?? body?.message ?? JSON.stringify(body); + } catch { + try { + return await response.text(); + } catch { + return response.statusText; + } + } +} + +interface TestResult { + name: string; + passed: boolean; + message: string; +} + +const results: TestResult[] = []; + +function pass(name: string, message = "OK") { + console.log(` ✔ ${name}`); + if (message !== "OK") console.log(` ${message}`); + results.push({ name, passed: true, message }); +} + +function fail(name: string, message: string) { + console.error(` ✘ ${name}`); + console.error(` ${message}`); + results.push({ name, passed: false, message }); +} + +// --------------------------------------------------------------------------- +// Validate environment +// --------------------------------------------------------------------------- + +const REQUIRED_ENV = [ + "FEDERATION_PUBLIC_KEY", + "FEDERATION_PRIVATE_KEY", + "FEDERATION_ENCRYPTION_PUBLIC_KEY", + "FEDERATION_ENCRYPTION_PRIVATE_KEY", + "BETTER_AUTH_URL", +] as const; + +const missing = REQUIRED_ENV.filter((k) => !process.env[k]); +if (missing.length > 0) { + console.error("Missing required environment variables:"); + missing.forEach((k) => console.error(` - ${k}`)); + console.error("Ensure .env.local is present and populated."); + process.exit(1); +} + +const ORIGIN = process.env.BETTER_AUTH_URL!; +const OWN_SIGNING_PUB = process.env.FEDERATION_PUBLIC_KEY!; +const OWN_ENCRYPTION_PUB = process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!; + +// --------------------------------------------------------------------------- +// Parse arguments +// --------------------------------------------------------------------------- + +function argAfter(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +const proxyUrl = argAfter("--proxy"); +const targetUrl = argAfter("--target"); +const bearerToken = argAfter("--bearer"); +const targetUserId = argAfter("--user"); + +if (!proxyUrl || !targetUrl) { + console.error("Usage: bun run testProxy.ts --proxy --target [options]"); + console.error(""); + console.error(" --proxy URL of Server B (the proxy)"); + console.error(" --target URL of Server C (the target)"); + console.error(" --test-fallback Enable proxy fallback test (requires C blocked from A)"); + console.error(" --bearer Bearer token for A's API (required for --test-fallback)"); + console.error(" --user User ID on Server C to follow (required for --test-fallback)"); + process.exit(1); +} + +console.log("Proxy chain test"); +console.log(` Server A (us): ${ORIGIN}`); +console.log(` Server B (proxy): ${proxyUrl}`); +console.log(` Server C (target): ${targetUrl}`); +console.log(` A signing key: ${fingerprintKey(OWN_SIGNING_PUB).slice(0, 16)}…`); +console.log(` A encryption key: ${fingerprintKey(OWN_ENCRYPTION_PUB).slice(0, 16)}…`); + +// --------------------------------------------------------------------------- +// 1. Discovery check +// --------------------------------------------------------------------------- + +interface DiscoverResponse { + url: string; + publicKey: string; + encryptionPublicKey: string; + peers: { url: string; isHealthy: boolean }[]; +} + +console.log("\n── Discovery ────────────────────────────────────────────"); + +let proxyInfo: DiscoverResponse; +let targetInfo: DiscoverResponse; + +try { + const res = await fetch(`${proxyUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server B (${proxyUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + proxyInfo = await res.json(); + console.log(` B: ${proxyInfo.url}`); + console.log(` signing: ${fingerprintKey(proxyInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(proxyInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${proxyInfo.peers.length}`); +} catch (err) { + console.error(`Cannot reach Server B at ${proxyUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); +} + +const isFallbackMode = process.argv.includes("--test-fallback"); + +if (isFallbackMode) { + // C is blocked from A — load C's info from A's local registry instead + const [cRecord] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetUrl)).limit(1); + if (!cRecord) { + console.error(` Server C (${targetUrl}) not found in local registry. Run mutual discovery before blocking.`); + process.exit(1); + } + targetInfo = { + url: cRecord.url, + publicKey: cRecord.publicKey, + encryptionPublicKey: cRecord.encryptionPublicKey, + peers: [], + }; + console.log(` C: ${targetInfo.url} (from local registry — blocked)`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); +} else { + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server C (${targetUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + targetInfo = await res.json(); + console.log(` C: ${targetInfo.url}`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${targetInfo.peers.length}`); + } catch (err) { + console.error(`Cannot reach Server C at ${targetUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); + } +} + +const aOnB = proxyInfo.peers.some((p) => p.url === ORIGIN); +console.log(` A registered on B: ${aOnB}`); + +if (!aOnB) { + console.error("\n A is not registered on B. Run mutual discovery first."); + process.exit(1); +} + +if (!isFallbackMode) { + const aOnC = targetInfo.peers.some((p) => p.url === ORIGIN); + console.log(` A registered on C: ${aOnC}`); + if (!aOnC) { + console.error("\n A is not registered on C. Run mutual discovery first."); + process.exit(1); + } +} + +// --------------------------------------------------------------------------- +// 2–5: Direct tests (skipped in --test-fallback mode since C is blocked) +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n Skipping direct tests (2–5) — C is blocked in fallback mode."); +} + +if (!isFallbackMode) { + + // --------------------------------------------------------------------------- + // 2. Full proxy relay: A → B → C → B → A + // --------------------------------------------------------------------------- + + console.log("\n── Test: Full proxy relay (A → B → C → B → A) ─────────"); + + { + const testName = "full proxy relay"; + try { + const nonce = crypto.randomUUID(); + const innerPayload = JSON.stringify({ + action: "proxy-test", + nonce, + timestamp: Date.now(), + sender: ORIGIN, + }); + + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + const signature = signMessage(innerPayload, new Uint8Array(Buffer.from(process.env.FEDERATION_PRIVATE_KEY!, "base64"))); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: OWN_SIGNING_PUB, + publicEncryptionKey: OWN_ENCRYPTION_PUB, + payload: encrypted, + signature, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status !== 200) { + fail(testName, `expected 200, got ${res.status}: ${JSON.stringify(body)}`); + } else if (body.method !== "PROXY_RESPONSE") { + fail(testName, `expected method=PROXY_RESPONSE, got ${body.method}`); + } else if (!body.payload) { + fail(testName, "response missing payload (B did not relay C's response)"); + } else if (body.payload.method !== "PROXY_RESPONSE") { + fail(testName, `inner payload method=${body.payload.method}, expected PROXY_RESPONSE`); + } else { + pass(testName, `nonce=${nonce}, C responded: "${body.payload.message ?? JSON.stringify(body.payload)}"`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 3. Direct TARGETED: A → C + // --------------------------------------------------------------------------- + + console.log("\n── Test: Direct TARGETED (A → C) ────────────────────────"); + + { + const testName = "direct TARGETED to C"; + try { + const innerPayload = JSON.stringify({ + action: "targeted-test", + nonce: crypto.randomUUID(), + sender: ORIGIN, + }); + + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const res = await fetch(`${targetUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify({ + method: "TARGETED", + payload: encrypted, + }), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status !== 200) { + fail(testName, `expected 200, got ${res.status}: ${JSON.stringify(body)}`); + } else if (body.method !== "PROXY_RESPONSE") { + fail(testName, `expected method=PROXY_RESPONSE, got ${body.method}`); + } else { + pass(testName, `C says: "${body.message}"`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 4. Sender validation — bad signing key + // --------------------------------------------------------------------------- + + console.log("\n── Test: Sender validation (bad keys → B) ──────────────"); + + { + const testName = "reject mismatched signing key"; + try { + const fakeKeys = generateKeypair(); + + const innerPayload = JSON.stringify({ action: "bad-key-test" }); + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: fakeKeys.signingPublicKey, + publicEncryptionKey: OWN_ENCRYPTION_PUB, + payload: encrypted, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status === 403 && body.code === "INCORRECT_KEYS") { + pass(testName, `B correctly rejected: "${body.error}"`); + } else { + fail(testName, `expected 403/INCORRECT_KEYS, got ${res.status}/${body.code}: ${JSON.stringify(body)}`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 5. Unknown sender + // --------------------------------------------------------------------------- + + console.log("\n── Test: Unknown sender (→ B) ────────────────────────────"); + + { + const testName = "reject unknown sender"; + try { + const unknownKeys = generateKeypair(); + const unknownOrigin = "https://totally-unknown-federation-" + crypto.randomUUID().slice(0, 8) + ".test"; + + const innerPayload = JSON.stringify({ action: "unknown-sender-test" }); + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: unknownKeys.signingPublicKey, + publicEncryptionKey: unknownKeys.encryptionPublicKey, + payload: encrypted, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": unknownOrigin, + "Origin": unknownOrigin, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status === 403 && body.code === "UNKNOWN_FEDERATION_SERVER_INTERACTION") { + pass(testName, `B correctly rejected: "${body.error}"`); + } else { + fail(testName, `expected 403/UNKNOWN_FEDERATION_SERVER_INTERACTION, got ${res.status}/${body.code}: ${JSON.stringify(body)}`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + +} // end if (!isFallbackMode) + +// --------------------------------------------------------------------------- +// 6. Auto proxy fallback via real follow delivery pipeline +// Sends a follow request through A's API → BullMQ worker picks it up → +// federationFetch with proxyFallback:true → direct to C fails → proxied +// through B → C processes → worker updates follow.accepted +// +// Requires: +// - Server C blocked from A (firewall) +// - --bearer and --user flags +// +// Block C: netsh advfirewall firewall add rule name="Block Federation C" dir=out action=block remoteip= remoteport= protocol=tcp +// Unblock: netsh advfirewall firewall delete rule name="Block Federation C" +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n── Test: Auto proxy fallback (follow delivery pipeline) ─"); + + if (!bearerToken || !targetUserId) { + console.error(" --test-fallback requires --bearer and --user "); + process.exit(1); + } + + // Step 1: verify C is unreachable directly + { + const testName = "direct fetch to C fails"; + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(5_000), + }); + fail(testName, `direct fetch succeeded (${res.status}) — C is not blocked from A. Block it first.`); + } catch { + pass(testName, "C is unreachable from A (blocked)"); + } + } + + // Step 2: send follow request through A's API + { + const testName = "follow delivery via proxy fallback"; + try { + console.log(` Sending follow request for user ${targetUserId} on ${targetUrl}...`); + + const followRes = await fetch(`${ORIGIN}/api/auth/social/follows`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "Authorization": `Bearer ${bearerToken}`, + }, + body: JSON.stringify({ + method: "INSERT", + userId: targetUserId, + federationUrl: targetUrl, + }), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const followBody = await followRes.json(); + + if (!followRes.ok) { + fail(testName, `follow request failed (${followRes.status}): ${JSON.stringify(followBody)}`); + } else { + const followId = followBody.following?.[0]?.id; + if (!followId) { + fail(testName, `follow created but no ID returned: ${JSON.stringify(followBody)}`); + } else { + console.log(` Follow record created: ${followId}`); + console.log(" Waiting for BullMQ worker to process delivery job..."); + + // Step 3: poll until the delivery job completes (worker processes it) + const maxWait = 60_000; + const pollInterval = 2_000; + let elapsed = 0; + let delivered = false; + + while (elapsed < maxWait) { + await new Promise((r) => setTimeout(r, pollInterval)); + elapsed += pollInterval; + + // Check if delivery job for this target still exists (removed on success) + const pendingJobs = await db.select() + .from(deliveryJobs) + .where(eq(deliveryJobs.targetUrl, targetUrl + "/api/auth/social/follows")) + .orderBy(desc(deliveryJobs.createdAt)) + .limit(5); + + // Check if follow.accepted was updated (worker sets this on success) + const [followRecord] = await db.select() + .from(follows) + .where(eq(follows.id, followId)) + .limit(1); + + const jobCount = pendingJobs.length; + const accepted = followRecord?.accepted; + + process.stdout.write(`\r Polling... ${Math.round(elapsed / 1000)}s — jobs pending: ${jobCount}, accepted: ${accepted} `); + + if (accepted === true) { + delivered = true; + break; + } + } + + console.log(""); + + if (delivered) { + pass(testName, "follow delivered through proxy and accepted by C"); + } else { + // Check final state for diagnostics + const [finalFollow] = await db.select() + .from(follows) + .where(eq(follows.id, followId)) + .limit(1); + + const remainingJobs = await db.select() + .from(deliveryJobs) + .where(eq(deliveryJobs.targetUrl, targetUrl + "/api/auth/social/follows")) + .limit(5); + + fail(testName, + `timed out after ${maxWait / 1000}s. ` + + `follow.accepted=${finalFollow?.accepted}, ` + + `pending delivery jobs=${remainingJobs.length}. ` + + `Check worker logs (DEBUG=app:federation:*) for details.`, + ); + } + + // Cleanup: remove the test follow record + console.log(" Cleaning up test follow record..."); + // await db.delete(follows).where(eq(follows.id, followId)); + } + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } +} else { + console.log("\n Skipping auto-fallback test (pass --test-fallback to enable)."); + console.log(" Requires: --test-fallback --bearer --user "); + console.log(" And C must be blocked from A's machine (firewall rule)."); +} + +// --------------------------------------------------------------------------- +// Summary +// --------------------------------------------------------------------------- + +const passed = results.filter((r) => r.passed); +const failed = results.filter((r) => !r.passed); + +console.log("\n════════════════════════════════════════════════════════"); +console.log(`Results: ${passed.length} passed, ${failed.length} failed out of ${results.length}`); + +if (failed.length > 0) { + console.error("\nFailed tests:"); + failed.forEach((f) => console.error(` ✘ ${f.name}: ${f.message}`)); + process.exit(1); +} + +console.log("\nAll tests passed."); +process.exit(0); diff --git a/tests/proxies/post.ts b/tests/proxies/post.ts new file mode 100644 index 0000000..3f6b92f --- /dev/null +++ b/tests/proxies/post.ts @@ -0,0 +1,409 @@ +/** + * Manual post proxy / federation test (Server A). + * + * Exercises the real path only — same as production: + * POST ${A}/api/auth/social/posts → BullMQ worker → federationFetch (direct or via proxy B) → C. + * + * Does not POST ciphertext to B’s `/proxy` by hand; the worker does that after your createPost. + * + * Usage: + * bun run tests/proxies/post.ts --proxy --target --bearer + * + * Prerequisites on A: + * - Bearer user must have at least one accepted follower whose `followerServerUrl` points at C + * (same base URL as `--target` / registry). + * - Propagation must enqueue jobs (e.g. policy `all`, or private + `followers`). + * + * Optional: + * --test-fallback C blocked from A: load C from A’s server_registry only; verify direct C fetch fails first + * --test-no-remote-followers Expect 200 with federationDeliveriesQueued === 0 (propagation on, no remote follower URLs) + * + * Examples: + * bun run tests/proxies/post.ts --proxy http://localhost:3001 --target http://host.docker.internal:3002 --bearer --test-fallback + */ + +import db from "@/lib/db"; +import { deliveryJobs, serverRegistry } from "@/lib/db/schema"; +import { fingerprintKey } from "@/lib/federation/keytools"; +import { config } from "dotenv"; +import { and, desc, eq, like } from "drizzle-orm"; + +config({ path: ".env.local" }); + +const FETCH_TIMEOUT_MS = 15_000; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function readErrorBody(response: Response): Promise { + try { + const body = await response.json(); + return body?.error ?? body?.message ?? JSON.stringify(body); + } catch { + try { + return await response.text(); + } catch { + return response.statusText; + } + } +} + +interface TestResult { + name: string; + passed: boolean; + message: string; +} + +const results: TestResult[] = []; + +function pass(name: string, message = "OK") { + console.log(` ✔ ${name}`); + if (message !== "OK") console.log(` ${message}`); + results.push({ name, passed: true, message }); +} + +function fail(name: string, message: string) { + console.error(` ✘ ${name}`); + console.error(` ${message}`); + results.push({ name, passed: false, message }); +} + +// --------------------------------------------------------------------------- +// Validate environment +// --------------------------------------------------------------------------- + +const REQUIRED_ENV = [ + "FEDERATION_PUBLIC_KEY", + "FEDERATION_PRIVATE_KEY", + "FEDERATION_ENCRYPTION_PUBLIC_KEY", + "FEDERATION_ENCRYPTION_PRIVATE_KEY", + "BETTER_AUTH_URL", +] as const; + +const missing = REQUIRED_ENV.filter((k) => !process.env[k]); +if (missing.length > 0) { + console.error("Missing required environment variables:"); + missing.forEach((k) => console.error(` - ${k}`)); + console.error("Ensure .env.local is present and populated."); + process.exit(1); +} + +const ORIGIN = process.env.BETTER_AUTH_URL!; + +// --------------------------------------------------------------------------- +// Parse arguments +// --------------------------------------------------------------------------- + +function argAfter(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +const proxyUrl = argAfter("--proxy"); +const targetUrl = argAfter("--target"); +const bearerToken = argAfter("--bearer"); +const testNoRemoteFollowers = process.argv.includes("--test-no-remote-followers"); +const isFallbackMode = process.argv.includes("--test-fallback"); + +if (!proxyUrl || !targetUrl || !bearerToken) { + console.error( + "Usage: bun run tests/proxies/post.ts --proxy --target --bearer [options]", + ); + console.error(""); + console.error(" --proxy Server B (proxy); used for /discover sanity check"); + console.error(" --target Server C base URL (must match server_registry.url on A for --test-fallback)"); + console.error(" --bearer Session token on A (required — test hits POST /api/auth/social/posts)"); + console.error(" --test-fallback C unreachable from A; load C from registry; verify block then deliver via API"); + console.error(" --test-no-remote-followers Expect createPost 400 NO_REMOTE_FOLLOWERS (runs after main test if set)"); + process.exit(1); +} + +if (testNoRemoteFollowers && !bearerToken) { + console.error("--test-no-remote-followers requires --bearer "); + process.exit(1); +} + +console.log("Post delivery test (A API → worker → federation/proxy)"); +console.log(` Server A (us): ${ORIGIN}`); +console.log(` Server B (proxy): ${proxyUrl}`); +console.log(` Server C (target): ${targetUrl}`); + +const DEFAULT_POST_CONTENT = [{ type: "text" as const, value: "proxy post test" }]; + +// --------------------------------------------------------------------------- +// 1. Discovery (B reachable from A; C from registry or live) +// --------------------------------------------------------------------------- + +interface DiscoverResponse { + url: string; + publicKey: string; + encryptionPublicKey: string; + peers: { url: string; isHealthy: boolean }[]; +} + +console.log("\n── Discovery ────────────────────────────────────────────"); + +let proxyInfo: DiscoverResponse; +let targetInfo: DiscoverResponse; + +try { + const res = await fetch(`${proxyUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server B (${proxyUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + proxyInfo = await res.json(); + console.log(` B: ${proxyInfo.url}`); + console.log(` signing: ${fingerprintKey(proxyInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(proxyInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${proxyInfo.peers.length}`); +} catch (err) { + console.error(`Cannot reach Server B at ${proxyUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); +} + +if (isFallbackMode) { + const [cRecord] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetUrl)).limit(1); + if (!cRecord) { + console.error(` Server C (${targetUrl}) not found in local registry. Run mutual discovery before blocking.`); + process.exit(1); + } + targetInfo = { + url: cRecord.url, + publicKey: cRecord.publicKey, + encryptionPublicKey: cRecord.encryptionPublicKey, + peers: [], + }; + console.log(` C: ${targetInfo.url} (from local registry — blocked from A)`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); +} else { + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server C (${targetUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + targetInfo = await res.json(); + console.log(` C: ${targetInfo.url}`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${targetInfo.peers.length}`); + } catch (err) { + console.error(`Cannot reach Server C at ${targetUrl}/discover: ${err instanceof Error ? err.message : err}`); + console.error("\n If C is firewalled from A, pass --test-fallback (load C from A’s registry)."); + process.exit(1); + } +} + +const aOnB = proxyInfo.peers.some((p) => p.url === ORIGIN); +console.log(` A registered on B: ${aOnB}`); + +if (!aOnB) { + console.error("\n A is not registered on B. Run mutual discovery first."); + process.exit(1); +} + +if (!isFallbackMode) { + const aOnC = targetInfo.peers.some((p) => p.url === ORIGIN); + console.log(` A registered on C: ${aOnC}`); + if (!aOnC) { + console.error("\n A is not registered on C. Run mutual discovery first."); + process.exit(1); + } +} + +const targetPostsUrl = `${targetInfo.url.replace(/\/$/, "")}/api/auth/social/posts`; + +// --------------------------------------------------------------------------- +// 2. Optional: confirm C is unreachable when --test-fallback +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n── Test: direct fetch to C fails (blocked) ─────────────"); + + const testName = "direct fetch to C fails"; + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(5_000), + }); + fail(testName, `direct fetch succeeded (${res.status}) — C is not blocked from A. Block it first.`); + } catch { + pass(testName, "C is unreachable from A (blocked)"); + } +} + +// --------------------------------------------------------------------------- +// 3. Real createPost on A → worker → C (via proxy when needed) +// --------------------------------------------------------------------------- + +console.log("\n── Test: post delivery via A API + worker ──────────────"); + +{ + const testName = "POST /api/auth/social/posts → deliver-post job completes"; + try { + console.log(` Creating post on A; expecting delivery to ${targetPostsUrl}…`); + + const postRes = await fetch(`${ORIGIN}/api/auth/social/posts`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${bearerToken}`, + }, + body: JSON.stringify(DEFAULT_POST_CONTENT), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const postBody = await postRes.json(); + + if (!postRes.ok) { + fail( + testName, + `createPost failed (${postRes.status}): ${JSON.stringify(postBody)} — need accepted remote followers on C and propagating policy`, + ); + } else { + const postId = postBody.id as string | undefined; + if (!postId) { + fail(testName, `no post id in response: ${JSON.stringify(postBody)}`); + } else { + console.log(` Post created: ${postId}`); + console.log(" Waiting for BullMQ worker to deliver FEDERATE_POST…"); + + await new Promise((r) => setTimeout(r, 300)); + + const jobsForPost = await db + .select() + .from(deliveryJobs) + .where(like(deliveryJobs.payload, `%${postId}%`)); + + if (jobsForPost.length === 0) { + fail( + testName, + "No delivery_jobs row for this post after createPost — propagation off, federationDeliveriesQueued was 0, or bug.", + ); + } else { + const forTarget = jobsForPost.filter((j) => j.targetUrl === targetPostsUrl); + if (forTarget.length === 0) { + const urls = [...new Set(jobsForPost.map((j) => j.targetUrl))].join(", "); + fail( + testName, + `Delivery job(s) target other URL(s): ${urls} — expected ${targetPostsUrl} (followerServerUrl must match C).`, + ); + } else { + const maxWait = 60_000; + const pollInterval = 2_000; + let elapsed = 300; + let delivered = false; + + while (elapsed < maxWait) { + await new Promise((r) => setTimeout(r, pollInterval)); + elapsed += pollInterval; + + const pendingJobs = await db + .select() + .from(deliveryJobs) + .where( + and( + eq(deliveryJobs.targetUrl, targetPostsUrl), + like(deliveryJobs.payload, `%${postId}%`), + ), + ) + .orderBy(desc(deliveryJobs.createdAt)) + .limit(5); + + process.stdout.write( + `\r Polling… ${Math.round(elapsed / 1000)}s — pending jobs for this post: ${pendingJobs.length} `, + ); + + if (pendingJobs.length === 0) { + delivered = true; + break; + } + } + + console.log(""); + + if (delivered) { + pass(testName, "delivery job finished (worker reached C, direct or via proxy)"); + } else { + fail( + testName, + `timed out after ${maxWait / 1000}s with jobs still pending. ` + + `Check worker (DEBUG=app:federation:*), Redis, proxy, and firewall.`, + ); + } + } + } + } + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } +} + +// --------------------------------------------------------------------------- +// 4. Optional: propagation on but no remote URLs (200, zero deliveries) +// --------------------------------------------------------------------------- + +if (testNoRemoteFollowers) { + console.log("\n── Test: createPost 200 + federationDeliveriesQueued === 0 ─"); + + const testName = "createPost saves post but queues no federation deliveries"; + try { + const postRes = await fetch(`${ORIGIN}/api/auth/social/posts`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${bearerToken}`, + }, + body: JSON.stringify(DEFAULT_POST_CONTENT), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const postBody = await postRes.json(); + + if ( + postRes.status === 200 && + postBody.id && + postBody.federationDeliveriesQueued === 0 + ) { + pass(testName, `post ${postBody.id} — no remote follower server URLs under propagation`); + } else if (postRes.ok && postBody.federationDeliveriesQueued > 0) { + fail( + testName, + `expected federationDeliveriesQueued === 0, got ${postBody.federationDeliveriesQueued} (user has remote followers or wrong test account).`, + ); + } else { + fail( + testName, + `expected 200 with id and federationDeliveriesQueued 0, got ${postRes.status}: ${JSON.stringify(postBody)}`, + ); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } +} + +// --------------------------------------------------------------------------- +// Summary +// --------------------------------------------------------------------------- + +const passed = results.filter((r) => r.passed); +const failed = results.filter((r) => !r.passed); + +console.log("\n════════════════════════════════════════════════════════"); +console.log(`Results: ${passed.length} passed, ${failed.length} failed out of ${results.length}`); + +if (failed.length > 0) { + console.error("\nFailed tests:"); + failed.forEach((f) => console.error(` ✘ ${f.name}: ${f.message}`)); + process.exit(1); +} + +console.log("\nAll tests passed."); +process.exit(0); diff --git a/tsconfig.json b/tsconfig.json index 06d4df8..641f35a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,45 +1,46 @@ { - "compilerOptions": { - "target": "ES2017", - "lib": [ - "dom", - "dom.iterable", - "esnext" - ], - "allowJs": true, - "skipLibCheck": true, - "strict": true, - "noEmit": true, - "esModuleInterop": true, - "module": "esnext", - "moduleResolution": "bundler", - "resolveJsonModule": true, - "isolatedModules": true, - "jsx": "react-jsx", - "incremental": true, - "plugins": [ - { - "name": "next" - } - ], - "paths": { - "@/*": [ - "./src/*" - ], - "@/plugins/*": [ - "./src/lib/plugins/*" - ], - } - }, - "include": [ - "next-env.d.ts", - "**/*.ts", - "**/*.tsx", - ".next/types/**/*.ts", - ".next/dev/types/**/*.ts", - "**/*.mts" - ], - "exclude": [ - "node_modules" - ] -} \ No newline at end of file + "compilerOptions": { + "target": "ES2017", + "lib": [ + "dom", + "dom.iterable", + "esnext" + ], + "allowJs": true, + "skipLibCheck": true, + "strict": true, + "noEmit": true, + "esModuleInterop": true, + "module": "esnext", + "moduleResolution": "bundler", + "resolveJsonModule": true, + "isolatedModules": true, + "jsx": "react-jsx", + "incremental": true, + "plugins": [ + { + "name": "next" + } + ], + "paths": { + "@/*": [ + "./src/*" + ], + "@/plugins/*": [ + "./src/lib/plugins/*" + ] + } + }, + "include": [ + "next-env.d.ts", + "**/*.ts", + "**/*.tsx", + ".next/types/**/*.ts", + ".next/dev/types/**/*.ts", + "**/*.mts", + ".next/dev/dev/types/**/*.ts" + ], + "exclude": [ + "node_modules" + ] +}