diff --git a/README.md b/README.md index ba5ab46..e0ec30f 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,22 @@ bun run rotateKeys.ts --resume '' ## Mirrors [Gitea](https://git.tockanest.com/Cete/sipher) + [GitHub](https://github.com/tockawaffle/sipher) +## What is public/private? + +### Public + +There are things that won't be e2ee because there's simply no reason for that to be done, this is a small list of the current (not finished) public data that other federations or even users might fetch and get all of the data: + +- Posts: +- - The whole post object is public, that includes: +- - - The content, including images, videos or audios if any +- - - Who posted it +- - - The federation that has that data + + ## Security SiPher implements custom federation and cryptographic protocols. I am not a professional cryptographer or security researcher — this system has not been audited and almost certainly contains multiple vulnerabilities I am not aware of. diff --git a/src/app/PostTestForm.tsx b/src/app/PostTestForm.tsx index 6fcbc8f..9965c2d 100644 --- a/src/app/PostTestForm.tsx +++ b/src/app/PostTestForm.tsx @@ -1,5 +1,6 @@ "use client"; +import { Button } from "@/components/ui/button"; import { authClient } from "@/lib/auth-client"; import { useState } from "react"; @@ -33,6 +34,25 @@ export function PostTestForm() { } }; + const body = JSON.stringify({ + method: "REGISTER", + url: process.env.BETTER_AUTH_URL!, + publicKey: process.env.FEDERATION_PUBLIC_KEY!, + encryptionPublicKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, + }); + + async function forceDiscover(url: string) { + console.log("body", body); + const response = await fetch(`${url}/discover`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: body, + }); + return response.json(); + } + return (

Test Post

@@ -85,6 +105,9 @@ export function PostTestForm() { {status} )} + +
+ ); } diff --git a/src/app/discover/route.ts b/src/app/discover/route.ts index d513cbe..eb49949 100644 --- a/src/app/discover/route.ts +++ b/src/app/discover/route.ts @@ -1,5 +1,6 @@ import db from "@/lib/db"; import { serverRegistry } from "@/lib/db/schema"; +import { FederationError, federationFetch } from "@/lib/federation/fetch"; import { decryptPayload, fingerprintKey } from "@/lib/federation/keytools"; import { upsertServer } from "@/lib/federation/registry"; import { assertSafeUrl, UrlGuardError } from "@/lib/federation/url-guard"; @@ -113,10 +114,16 @@ async function discoverServer(validated: z.infer) { if (server[0].publicKey === validated.publicKey) confirmations.sameKeyOnServer = true; debug("DISCOVER – fetching public key from federation server %s", server[0].url); try { - const federationResponse = await (await fetch(server[0].url + "/discover")).json(); + const { response } = await federationFetch(server[0].url + "/discover", { + serverUrl: server[0].url, + }); + const federationResponse = await response.json(); if (federationResponse.publicKey === validated.publicKey) confirmations.sameKeyOnFetch = true; } catch (err) { debug("DISCOVER – fetch to %s failed: %o", server[0].url, err); + if (err instanceof FederationError) { + return NextResponse.json({ error: "Failed to reach the federation server", code: err.code }, { status: 502 }); + } return NextResponse.json({ error: "Failed to reach the federation server" }, { status: 502 }); } @@ -136,18 +143,24 @@ async function registerServer(validated: z.infer) { } debug("REGISTER – fetching /discover from %s to validate server", validated.url); - let response: { publicKey?: string; encryptionPublicKey?: string }; + let remoteKeys: { publicKey?: string; encryptionPublicKey?: string }; try { - response = await (await fetch(validated.url + "/discover")).json(); + const { response } = await federationFetch(validated.url + "/discover", { + serverUrl: validated.url, + }); + remoteKeys = await response.json(); } catch (err) { debug("REGISTER – fetch to %s failed: %o", validated.url, err); + if (err instanceof FederationError) { + return NextResponse.json({ error: "Failed to reach the server", code: err.code }, { status: 502 }); + } return NextResponse.json({ error: "Failed to reach the server" }, { status: 502 }); } - if (!response.publicKey || !response.encryptionPublicKey) { + if (!remoteKeys.publicKey || !remoteKeys.encryptionPublicKey) { debug("REGISTER – remote server returned incomplete keys"); return NextResponse.json({ error: "Invalid server" }, { status: 400 }); - } else if (response.publicKey !== validated.publicKey || response.encryptionPublicKey !== validated.encryptionPublicKey) { + } else if (remoteKeys.publicKey !== validated.publicKey || remoteKeys.encryptionPublicKey !== validated.encryptionPublicKey) { debug("REGISTER – key mismatch: provided vs fetched"); return NextResponse.json({ error: "Public keys do not match the ones reported by the server" }, { status: 400 }); } diff --git a/src/app/page.tsx b/src/app/page.tsx index fa01183..5a05f3e 100644 --- a/src/app/page.tsx +++ b/src/app/page.tsx @@ -12,7 +12,12 @@ export default async function Home() { }); if (!session) redirect(`/auth`); + + return ( - + <> + + + ); } diff --git a/src/app/proxy/route.ts b/src/app/proxy/route.ts new file mode 100644 index 0000000..b87cc78 --- /dev/null +++ b/src/app/proxy/route.ts @@ -0,0 +1,418 @@ +import db from "@/lib/db"; +import { blacklistedServers, follows, serverRegistry, user } from "@/lib/db/schema"; +import { FederationError, federationFetch } from "@/lib/federation/fetch"; +import { decryptPayload, encryptPayload, getOwnEncryptionSecretKey, getOwnSigningSecretKey, signMessage, verifySignature } from "@/lib/federation/keytools"; +import { peerRegistryUrlOrNull } from "@/lib/federation/peer-registry-url"; +import { applyFederatedPostInTransaction } from "@/lib/federation/proxy-helpers/federated-post"; +import { discoverAndRegister } from "@/lib/federation/registry"; +import { EncryptedEnvelopeBaseSchema } from "@/lib/zod/EncryptedEnvelope"; +import { FollowEnvelopeSchema } from "@/lib/zod/methods/FollowSchema"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; +import createDebug from "debug"; +import { and, eq } from "drizzle-orm"; +import { NextRequest, NextResponse } from "next/server"; +import { z } from "zod"; + +const debug = createDebug("app:api:federation:proxy"); + +// Proxy route: relays encrypted federation traffic when two servers can't reach each other directly +// (e.g. network restrictions, ISP blocking). +// +// Any federation node can act as a proxy as long as both the requesting and target federations +// have discovered and registered it (same mutual-trust model as the follow endpoint). +// +// What the proxy knows: +// - The target federation's base URL (passed in plaintext so the proxy can forward). +// - That Federation A is trying to communicate with Federation B. +// +// What the proxy does NOT know: +// - The contents of the request (method, path, payload, etc.) — all of that lives inside an +// encrypted envelope that only Federation B can decrypt. +// +// Flow: +// 1. Federation A encrypts the full request (method, path, payload) into an envelope using +// Federation B's encryption public key, and signs it with its own signing key. +// 2. Federation A sends the encrypted envelope + target URL to the proxy. +// 3. The proxy forwards the envelope to Federation B's proxy endpoint. +// 4. Federation B decrypts, validates the signature, and processes the request. +// 5. Federation B encrypts its response using Federation A's encryption public key. +// 6. The encrypted response travels back: Federation B -> Proxy -> Federation A. +// +// If the request is of PROXY type and the target fails, the proxy will return a 502 error in which the first server +// should then either retry the request later or proxy it to a different server. +// If the target does not know the sender, it'll error with a 403 error and a "UNKNOWN_FEDERATION_SERVER_INTERACTION" code. + +const ProxiedDataSchema = z.discriminatedUnion("method", [ + z.object({ + method: z.literal("PROXY"), + targetUrl: z.url().refine((url) => { + // Check if the URL has the proxy path + const parsedUrl = new URL(url); + return parsedUrl.pathname.startsWith("/proxy"); + }, { message: "The target URL must have the proxy path" }), // Federation B's base URL, + publicSigningKey: z.string().optional().nullable(), // Federation A's signing public key + publicEncryptionKey: z.string().optional().nullable(), // Federation A's encryption public key + payload: EncryptedEnvelopeBaseSchema // Opaque — proxy cannot decrypt + }), + z.object({ + method: z.literal("TARGETED"), + payload: EncryptedEnvelopeBaseSchema // TODO: swap for createEncryptedEnvelopeSchema(TargetedPayloadSchema) once the inner schema is defined + }) +]) + +type ERROR_CODE = "MISSING_FED_ORIGIN_HEADER" | "UNKNOWN_FEDERATION_SERVER_INTERACTION" | "INCORRECT_KEYS" | "INVALID_PROXY_DATA"; + +// TARGETED: This federation is the target of a proxy request +// PROXY: This federation is the proxy for another federation +type PROXY_METHOD = "TARGETED" | "PROXY" | "PROXY_RESPONSE"; + +type PostsActions = "GET_USER_POSTS" | "FEDERATE_POST" | "GET_POST_BY_ID" | "GET_POST_COMMENTS" | "FEDERATE_POST_COMMENT" +type UserActions = "FEDERATE_FOLLOW" | "FEDERATE_UNFOLLOW" | "GET_USER_PROFILE" | "BLOCK_USER" | "UNBLOCK_USER" | "GET_USER_FOLLOWERS" | "GET_USER_FOLLOWING" + +type Actions = PostsActions | UserActions; + +export async function POST(request: NextRequest) { + const getFedUrl = request.headers.get("x-federation-origin"); + if (!getFedUrl) { + debug("Missing x-federation-origin header from %s", request.url); + return NextResponse.json({ error: "Missing x-federation-origin header", code: "MISSING_FED_ORIGIN_HEADER" }, { status: 400 }); + } + + const data = await request.clone().json(); + const parsed = ProxiedDataSchema.safeParse(data); + if (!parsed.success) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, parsed.error.message); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + + switch (parsed.data.method) { + case "PROXY": { + try { + + if (!parsed.data.publicSigningKey || !parsed.data.publicEncryptionKey) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, "Missing public signing or encryption key"); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + + const proxiedData = parsed.data; + + // Verify Federation A (sender) is known and keys match + const [sender] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, getFedUrl)); + + if (!sender) { + debug("POST /proxy – sender not found in registry: %s", getFedUrl); + return NextResponse.json({ + error: "Unknown federation server. Please redo the discovery process and try again.", + code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", + }, { status: 403 }); + } else if (sender.publicKey !== proxiedData.publicSigningKey) { + debug("POST /proxy – sender signing key mismatch: %s", getFedUrl); + return NextResponse.json({ + error: "The provided keys are a mismatch. If you rotated your keys, we are not aware of it.", + code: "INCORRECT_KEYS", + }, { status: 403 }); + } else if (sender.encryptionPublicKey !== proxiedData.publicEncryptionKey) { + debug("POST /proxy – sender encryption key mismatch: %s", getFedUrl); + return NextResponse.json({ + error: "The provided keys are a mismatch. If you rotated your keys, we are not aware of it.", + code: "INCORRECT_KEYS", + }, { status: 403 }); + } + + // Verify Federation B (target) is known to us (prevents open-relay abuse) + const targetBaseUrl = new URL(proxiedData.targetUrl.toString()).origin; + const [target] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetBaseUrl)); + + if (!target) { + debug("POST /proxy – target not found in registry: %s", targetBaseUrl); + debug("POST /proxy - Starting discovery process") + await discoverAndRegister(targetBaseUrl); + } + + // Proxy the request to Federation B as a TARGETED request (no proxy fallback — we ARE the proxy) + let forwardResponse: Response; + try { + const result = await federationFetch(proxiedData.targetUrl.toString(), { + method: "POST", + body: JSON.stringify({ + method: "TARGETED" as PROXY_METHOD, + payload: proxiedData.payload, + }), + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": process.env.BETTER_AUTH_URL!, + "Origin": process.env.BETTER_AUTH_URL!, + "X-Federation-Sender": getFedUrl, + }, + serverUrl: targetBaseUrl, + proxyFallback: false, + skipHealthUpdate: true, + }); + forwardResponse = result.response; + } catch (err) { + if (err instanceof FederationError) { + debug("POST /proxy – federation error proxying to %s: %s", proxiedData.targetUrl.toString(), err.code); + return NextResponse.json({ error: "Failed to proxy request", code: "FAILED_TO_PROXY_REQUEST", federationError: err.code, method: "PROXY_RESPONSE" as PROXY_METHOD }, { status: 502 }); + } + throw err; + } + + if (!forwardResponse.ok) { + debug("POST /proxy – error proxying request to %s: %s", proxiedData.targetUrl.toString(), forwardResponse.statusText); + return NextResponse.json({ error: "Failed to proxy request", code: "FAILED_TO_PROXY_REQUEST", details: await forwardResponse.json(), method: "PROXY_RESPONSE" as PROXY_METHOD }, { status: 502 }); + } + + const responseBody = await forwardResponse.json(); + + // Return the response from Federation B as a PROXY_RESPONSE + return NextResponse.json({ + method: "PROXY_RESPONSE" as PROXY_METHOD, + payload: responseBody, + }); + + } catch (error) { + debug("POST /proxy – error parsing proxied data from %s: %s", request.url, error); + return NextResponse.json({ error: "Invalid proxied data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + } + case "TARGETED": { + try { + // 🚨 we've been targeted, the 🧃 are coming, everyone to the bunkers! 🚨 + + // We need to use the EncryptedEnvelopeBaseSchema here because we do not know what we are being targeted for + // This is the information we'll have at the end of the day: + // - The requester's url + // - The requester's public signing key + // - The requester's public encryption key + // - The request data, being the method, path, and payload + + if (!parsed.data.payload) { + debug("POST /proxy – error parsing targeted data from %s: %s", request.url, "Missing payload"); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_TARGETED_DATA" }, { status: 400 }); + } + + const decryptedPayload = decryptPayload(parsed.data.payload, getOwnEncryptionSecretKey()); + const parsedPayload = JSON.parse(decryptedPayload); + + debug("POST /proxy – parsed targeted data from %s: %o", request.url, parsedPayload); + + const payloadSchema = z.object({ + targetUrl: z.url(), + method: z.string(), + headers: z.record(z.string(), z.string()), + body: z.string().transform((body) => { + const parsedBody = JSON.parse(body); + return { + method: parsedBody.method, + payload: parsedBody.payload, + signature: parsedBody.signature, + }; + }) + }).superRefine((data, ctx) => { + try { + const originPayloadHeaders = data.headers; + debug("POST /proxy – origin payload headers: %o", originPayloadHeaders); + if (!originPayloadHeaders["X-Federation-Target"] || !originPayloadHeaders["X-Federation-Origin"] || !originPayloadHeaders["Origin"]) { + ctx.addIssue({ code: "custom", message: "Missing headers" }); + return z.NEVER; + } + + // Should be the base URL of the target URL + const targetUrl = new URL(data.targetUrl).origin; + const federationTargetOriginHeader = new URL(originPayloadHeaders["X-Federation-Target"]).origin; + debug("POST /proxy – target URL: %s", targetUrl); + debug("POST /proxy – x-federation-target header: %s", federationTargetOriginHeader); + if (federationTargetOriginHeader !== targetUrl) { + ctx.addIssue({ code: "custom", message: "x-federation-target header mismatch" }); + return z.NEVER; + } + } catch (error) { + ctx.addIssue({ code: "custom", message: "Decryption failed" }); + return z.NEVER; + } + }); + + const validated = payloadSchema.safeParse(parsedPayload); + if (!validated.success) { + debug("POST /proxy – error validating targeted data from %s: %s", request.url, validated.error.message); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_TARGETED_DATA" }, { status: 400 }); + } + + const { targetUrl, method, headers, body } = validated.data; + + // Check if the sender is known, keys match and is not blackisted + const result = await db.transaction(async (tx) => { + + const senderUrl = headers["X-Federation-Origin"]; + // Check if the sender is blacklisted + const [blacklisted] = await tx.select().from(blacklistedServers).where(eq(blacklistedServers.serverUrl, senderUrl)); + if (blacklisted) { + debug("POST /proxy – sender is blacklisted: %s", senderUrl); + return { error: "The federation server was blacklisted from interacting with this federation server. Please contact support to unblacklist your server.", code: "BLACKLISTED_FEDERATION_SERVER", action: undefined, status: 403 }; + } + + // Check if the sender is known + const [sender] = await tx.select().from(serverRegistry).where(eq(serverRegistry.url, senderUrl)); + if (!sender) { + debug("POST /proxy – sender not found in registry: %s", senderUrl); + return { error: "Unknown federation server. Please redo the discovery process and try again.", code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", action: undefined, status: 403 }; + } + + let consolidatedFollowPayload: z.infer | null = null; + let consolidatedPostPayload: z.infer | null = null; + let action: Actions; + switch (true) { + case targetUrl.includes("/api/auth/social/follows") && body.method === "FEDERATE": { + debug("POST /proxy – parsing follow payload: %s", body.payload); + const payload = FollowEnvelopeSchema.safeParse(body.payload); + if (!payload.success) { + debug("POST /proxy – error parsing follow payload: %s", body.payload); + return { error: "Invalid follow payload", code: "INVALID_FOLLOW_PAYLOAD", action: undefined, status: 400 }; + } + consolidatedFollowPayload = payload.data; + action = "FEDERATE_FOLLOW"; + break; + } + case targetUrl.includes("/api/auth/social/posts") && body.method === "FEDERATE_POST": { + debug("POST /proxy – parsing federated post payload"); + const payload = PostEnvelopeSchema.safeParse(body.payload); + if (!payload.success) { + debug("POST /proxy – error parsing federated post payload: %s", payload.error.message); + return { error: "Invalid federated post payload", code: "INVALID_FEDERATED_POST_PAYLOAD", action: undefined, status: 400 }; + } + consolidatedPostPayload = payload.data; + action = "FEDERATE_POST"; + break; + } + default: { + debug("POST /proxy – no endpoint specific parsing, rejecting request"); + return { error: "Invalid payload", code: "INVALID_PAYLOAD", action: undefined, status: 400 }; + } + } + + const signedEnvelope = consolidatedFollowPayload ?? consolidatedPostPayload; + if (!signedEnvelope) { + return { error: "Invalid payload", code: "INVALID_PAYLOAD", action: undefined, status: 400 }; + } + + // Check if the signature is valid + const senderPublicKey = new Uint8Array(Buffer.from(sender.publicKey, "base64")); + const senderEncryptionPublicKey = new Uint8Array(Buffer.from(sender.encryptionPublicKey, "base64")); + if (!verifySignature(signedEnvelope._raw, body.signature, senderPublicKey)) { + debug("POST /proxy – sender signature is invalid: %s", targetUrl); + return { error: "The provided signature is invalid. Please redo the discovery process and try again.", code: "INVALID_SIGNATURE", action: undefined, status: 403 }; + } + + debug("POST /proxy – sender is known, keys match and is not blackisted: %s", targetUrl); + + // Now we can assume that: + // - The sender is known to us + // - The sender is not blacklisted + // - The signature is valid with what we have in the payload + // - The payload is a valid action and has a valid payload + // - There is a known endpoint for the action + // Now the only thing left is to handle the action. This cannot be done in a worker since we need to return a response to the proxy server. This could eventually overload this endpoint and cause issues, but it's not something I can fix right now. + + switch (action) { + case "FEDERATE_FOLLOW": { + const followEnv = consolidatedFollowPayload!; + debug("POST /proxy – federating follow: %s", followEnv); + + // We can do the follow procedure + // First check if the user exists + const [targetUser] = await tx.select().from(user).where(eq(user.id, followEnv.following.followingId)); + if (!targetUser) { + debug("POST /proxy – target user not found: %s", followEnv.following.followingId); + return { error: "The user you are trying to follow does not exist.", code: "USER_NOT_FOUND", status: 404 }; + } + + // Second check if the follow already exists + const [existingFollow] = await tx.select().from(follows).where(and( + eq(follows.followerId, followEnv.following.followerId), + eq(follows.followingId, followEnv.following.followingId), + )); + + if (existingFollow) { + debug("POST /proxy – follow already exists: %s", existingFollow.id); + return { error: "You are already following this user.", code: "FOLLOW_ALREADY_EXISTS", status: 409 }; + } + + // Third check if the user is private + const isPrivate = !targetUser.isPrivate; + + const following = await tx.insert(follows).values({ + id: crypto.randomUUID(), + followerId: followEnv.following.followerId, + followingId: followEnv.following.followingId, + accepted: isPrivate, + createdAt: new Date(), + followerServerUrl: peerRegistryUrlOrNull(senderUrl), + followingServerUrl: peerRegistryUrlOrNull(targetUrl), + }).returning(); + + const row = following[0]; + // Same plaintext shape as the delivery job payload / FollowInnerPayloadSchema (see federation worker). + const innerPayload = JSON.stringify({ + following: { + id: row.id, + createdAt: row.createdAt, + followerId: row.followerId, + followingId: row.followingId, + accepted: row.accepted, + followerServerUrl: row.followerServerUrl, + }, + federationUrl: senderUrl, + method: "FEDERATE" as const, + }); + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + return { innerPayload, signature, senderEncryptionPublicKey }; + } + case "FEDERATE_POST": { + const postEnv = consolidatedPostPayload!; + const postResult = await applyFederatedPostInTransaction(tx, postEnv, body.signature, { + publicKey: sender.publicKey, + encryptionPublicKey: sender.encryptionPublicKey, + url: sender.url, + }); + if (!postResult.ok) { + return { + error: postResult.error, + code: postResult.code, + action: undefined, + status: postResult.status, + }; + } + const encKey = new Uint8Array(Buffer.from(postResult.senderEncryptionPublicKeyB64, "base64")); + return { + innerPayload: postResult.innerPayload, + signature: postResult.signature, + senderEncryptionPublicKey: encKey, + }; + } + default: { + debug("POST /proxy – no action specific handling, rejecting request"); + return { error: "Invalid action", code: "INVALID_ACTION", action: undefined, status: 400 }; + } + } + + }); + + if (result.error) { + return NextResponse.json({ error: result.error, code: result.code, action: result.action, status: result.status }, { status: result.status }); + } + + return NextResponse.json({ + method: "PROXY_RESPONSE" as PROXY_METHOD, + status: "acknowledged", + data: encryptPayload(result.innerPayload!, result.senderEncryptionPublicKey!), + signature: result.signature, + }, { status: 200 }); + + } catch (error) { + debug("POST /proxy – error parsing targeted data from %s: %s", request.url, error); + return NextResponse.json({ error: "Invalid targeted data", code: "INVALID_PROXY_DATA" }, { status: 400 }); + } + } + } +} \ No newline at end of file diff --git a/src/lib/auth.ts b/src/lib/auth.ts index 2fbe8ed..162ff42 100644 --- a/src/lib/auth.ts +++ b/src/lib/auth.ts @@ -102,6 +102,13 @@ const bAuth = betterAuth({ defaultValue: false, required: false, index: false, + }, + postPropagationPolicy: { + type: "string", + defaultValue: "all", + required: false, + index: false, + enum: ["all", "followers", "none"] as const, } } } diff --git a/src/lib/bull/index.ts b/src/lib/bull/index.ts index 8fe6caa..93489cd 100644 --- a/src/lib/bull/index.ts +++ b/src/lib/bull/index.ts @@ -1,14 +1,30 @@ import db from '@/lib/db'; import { blacklistedServers, deliveryJobs, follows, serverRegistry } from '@/lib/db/schema'; -import { encryptPayload, getOwnSigningSecretKey, signMessage } from '@/lib/federation/keytools'; -import { discoverAndRegister, DiscoveryError } from '@/lib/federation/registry'; +import { FederationError, federationFetch, type FederationErrorCode } from '@/lib/federation/fetch'; +import { encryptPayload, getOwnSigningSecretKey, signMessage, verifySignature } from '@/lib/federation/keytools'; +import { discoverAndRegister, DiscoveryError, markServerHealthy } from '@/lib/federation/registry'; +import { getThreatPolicy } from '@/lib/federation/threat-model'; import { Queue, UnrecoverableError, Worker, type Job } from 'bullmq'; import createDebug from 'debug'; -import { eq } from 'drizzle-orm'; +import { and, eq } from 'drizzle-orm'; import Redis from 'ioredis'; +import z from 'zod'; +import { FollowEnvelopeSchema } from '../zod/methods/FollowSchema'; const debug = createDebug('app:federation:worker'); +// --------------------------------------------------------------------------- +// Shared Redis +// --------------------------------------------------------------------------- + +function createRedisConnection() { + return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }); +} + +// --------------------------------------------------------------------------- +// Federation delivery queue (existing) +// --------------------------------------------------------------------------- + export interface FederationDeliveryJob { deliveryJobId: string; targetUrl: string; @@ -16,17 +32,13 @@ export interface FederationDeliveryJob { payload: string; } -const QUEUE_NAME = 'federation-delivery'; +const DELIVERY_QUEUE_NAME = 'federation-delivery'; -function createRedisConnection() { - return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }); -} - -let _queue: Queue | null = null; +let _deliveryQueue: Queue | null = null; export function getFederationQueue(): Queue { - if (!_queue) { - _queue = new Queue(QUEUE_NAME, { + if (!_deliveryQueue) { + _deliveryQueue = new Queue(DELIVERY_QUEUE_NAME, { connection: createRedisConnection() as never, defaultJobOptions: { attempts: 5, @@ -39,9 +51,48 @@ export function getFederationQueue(): Queue { }, }); } - return _queue; + return _deliveryQueue; } +// --------------------------------------------------------------------------- +// Health-check queue +// --------------------------------------------------------------------------- + +export interface HealthCheckJob { + serverUrl: string; +} + +const HEALTH_CHECK_QUEUE_NAME = 'federation-health-check'; + +let _healthCheckQueue: Queue | null = null; + +export function getHealthCheckQueue(): Queue { + if (!_healthCheckQueue) { + _healthCheckQueue = new Queue(HEALTH_CHECK_QUEUE_NAME, { + connection: createRedisConnection() as never, + }); + } + return _healthCheckQueue; +} + +export async function scheduleHealthCheck(serverUrl: string, attempt: number): Promise { + const delayMinutes = 5 + (attempt * 10); + const delayMs = delayMinutes * 60 * 1000; + debug('scheduling health check for %s in %d minutes (attempt %d)', serverUrl, delayMinutes, attempt); + + const safeId = serverUrl.replace(/[^a-zA-Z0-9._-]/g, '_'); + await getHealthCheckQueue().add('health-check', { serverUrl }, { + delay: delayMs, + jobId: `health-check_${safeId}_${attempt}`, + removeOnComplete: true, + removeOnFail: true, + }); +} + +// --------------------------------------------------------------------------- +// Delivery worker processor +// --------------------------------------------------------------------------- + async function processFederationDelivery(job: Job) { const { deliveryJobId, targetUrl, serverUrl, payload } = job.data; debug('processing job %s (%s) → %s (attempt %d)', job.id, job.name, targetUrl, job.attemptsMade + 1); @@ -61,7 +112,7 @@ async function processFederationDelivery(job: Job) { let encryptionPublicKey: string; const [server] = await db - .select({ encryptionPublicKey: serverRegistry.encryptionPublicKey }) + .select({ encryptionPublicKey: serverRegistry.encryptionPublicKey, publicKey: serverRegistry.publicKey }) .from(serverRegistry) .where(eq(serverRegistry.url, serverUrl)) .limit(1); @@ -93,7 +144,7 @@ async function processFederationDelivery(job: Job) { debug('sending encrypted payload to %s', targetUrl); const method = JSON.parse(payload).method; - if (!method || !["FEDERATE", "INSERT", "UNFOLLOW"].includes(method)) { + if (!method || !["FEDERATE", "FEDERATE_POST", "INSERT", "UNFOLLOW"].includes(method)) { debug('invalid method: %s, dropping job %s', method, job.id); await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); debug('job %s dropped because of invalid method', job.id); @@ -102,11 +153,18 @@ async function processFederationDelivery(job: Job) { const signature = signMessage(payload, getOwnSigningSecretKey()); - const response = await fetch(targetUrl, { + const { response } = await federationFetch(targetUrl, { method: 'POST', - headers: { 'Content-Type': 'application/json', 'Origin': process.env.BETTER_AUTH_URL! }, + headers: { + 'Content-Type': 'application/json', + 'Origin': process.env.BETTER_AUTH_URL!, + 'X-Federation-Origin': process.env.BETTER_AUTH_URL!, + 'X-Federation-Target': targetUrl, + }, body: JSON.stringify({ method, payload: encrypted, signature }), - signal: AbortSignal.timeout(15_000), + timeout: 15_000, + proxyFallback: true, + serverUrl, }); if (!response.ok) { @@ -115,30 +173,150 @@ async function processFederationDelivery(job: Job) { } const responseBody = await response.json(); + debug('delivery to %s response body: %o', targetUrl, responseBody); + debug('responseBody.payload: %s', responseBody.payload); - if (responseBody.status !== "acknowledged") { + const ackPayload = + responseBody.payload?.method === "PROXY_RESPONSE" + ? responseBody.payload + : responseBody.method === "PROXY_RESPONSE" + ? responseBody + : null; + + if (!ackPayload || ackPayload.method !== "PROXY_RESPONSE") { debug('delivery to %s not acknowledged', targetUrl); throw new UnrecoverableError(`Federation delivery to ${targetUrl} failed: ${response.status} - ${JSON.stringify(responseBody)}`); } if (job.name === 'deliver-follow') { - const followId = JSON.parse(payload).following?.id; - if (followId && typeof responseBody.accepted === "boolean") { - await db.update(follows).set({ accepted: responseBody.accepted }) - .where(eq(follows.id, followId)); - debug('updated follow %s accepted=%s', followId, responseBody.accepted); + let followPayload: z.infer; + debug('delivery to %s is a follow, updating follow', targetUrl); + debug('ackPayload: %o', ackPayload); + + if (ackPayload.method === "PROXY_RESPONSE") { + // Decrypt the payload + const decrypted = FollowEnvelopeSchema.safeParse(ackPayload.data) + if (!decrypted.success) { + debug('failed to parse follow payload: %s', ackPayload.data); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Failed to parse follow payload, dropping job ${job.id}`); + } + + debug("payload data: %o", decrypted.data); + // Decrypt the signature + const signature = verifySignature(decrypted.data._raw, ackPayload.signature, new Uint8Array(Buffer.from(server.publicKey!, 'base64'))); + + if (!signature) { + debug('signature verification failed, dropping job %s', job.id); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Signature verification failed, dropping job ${job.id}`); + } + + followPayload = decrypted.data as z.infer; + } else { + const validated = FollowEnvelopeSchema.safeParse(ackPayload); + if (!validated.success) { + debug('failed to parse follow payload: %s', ackPayload); + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Failed to parse follow payload, dropping job ${job.id}`); + } + + followPayload = validated.data as z.infer; + } + + const followData = followPayload.following; + if (followData && followData.accepted) { + await db.update(follows).set({ accepted: followData.accepted }) + .where( + and( + eq(follows.followerId, followData.followerId), + eq(follows.followingId, followData.followingId), + eq(follows.followerServerUrl, serverUrl), + ) + ); + debug('updated follow %s accepted=%s', followData.id, followData.accepted); } } debug('job %s delivered successfully to %s', job.id, targetUrl); } +// --------------------------------------------------------------------------- +// Health-check worker processor +// --------------------------------------------------------------------------- + +const MAX_HEALTH_CHECK_ATTEMPTS = 5; + +async function processHealthCheck(job: Job) { + const { serverUrl } = job.data; + + const [server] = await db.select() + .from(serverRegistry) + .where(eq(serverRegistry.url, serverUrl)) + .limit(1); + + if (!server) { + debug('health-check: server %s not found in registry, skipping', serverUrl); + return; + } + + if (server.isHealthy) { + debug('health-check: server %s is already healthy, skipping', serverUrl); + return; + } + + if (server.unhealthyReason) { + const policy = getThreatPolicy(server.unhealthyReason as FederationErrorCode); + if (!policy.directHealthCheckable) { + debug('health-check: server %s has reason %s (not direct-checkable), skipping', serverUrl, server.unhealthyReason); + return; + } + } + + debug('health-check: pinging %s (attempt %d/%d)', serverUrl, server.healthCheckAttempts + 1, MAX_HEALTH_CHECK_ATTEMPTS); + + try { + const { response } = await federationFetch(serverUrl + '/discover', { + serverUrl, + timeout: 8_000, + skipHealthUpdate: true, + }); + + if (response.ok) { + debug('health-check: %s is reachable, marking healthy', serverUrl); + await markServerHealthy(serverUrl); + return; + } + + debug('health-check: %s returned HTTP %d', serverUrl, response.status); + } catch (err) { + debug('health-check: %s failed: %s', serverUrl, err instanceof FederationError ? err.code : err); + } + + const nextAttempt = server.healthCheckAttempts + 1; + await db.update(serverRegistry).set({ + healthCheckAttempts: nextAttempt, + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); + + if (nextAttempt < MAX_HEALTH_CHECK_ATTEMPTS) { + await scheduleHealthCheck(serverUrl, nextAttempt); + } else { + debug('health-check: %s exhausted all %d attempts, stopping', serverUrl, MAX_HEALTH_CHECK_ATTEMPTS); + console.warn(`[federation] health-check exhausted for ${serverUrl} after ${MAX_HEALTH_CHECK_ATTEMPTS} attempts`); + } +} + +// --------------------------------------------------------------------------- +// Worker startup +// --------------------------------------------------------------------------- + export function startFederationWorker() { createDebug.enable(process.env.DEBUG || ''); - console.log('[federation] Starting worker...'); + console.log('[federation] Starting workers...'); - const worker = new Worker( - QUEUE_NAME, + const deliveryWorker = new Worker( + DELIVERY_QUEUE_NAME, processFederationDelivery, { connection: createRedisConnection() as never, @@ -146,18 +324,18 @@ export function startFederationWorker() { }, ); - worker.on('ready', () => { - console.log('[federation] Worker connected to Redis and ready'); + deliveryWorker.on('ready', () => { + console.log('[federation] Delivery worker connected to Redis and ready'); }); - worker.on('failed', (job, err) => { + deliveryWorker.on('failed', (job, err) => { const retriesLeft = (job?.opts.attempts ?? 0) - (job?.attemptsMade ?? 0); - debug('job %s (%s) to %s failed (attempt %d, %d retries left): %s', job?.id, job?.name, job?.data.targetUrl, job?.attemptsMade, retriesLeft, err.message); + debug('delivery job %s (%s) to %s failed (attempt %d, %d retries left): %s', job?.id, job?.name, job?.data.targetUrl, job?.attemptsMade, retriesLeft, err.message); if (err.cause) debug('cause: %O', err.cause); }); - worker.on('completed', async (job) => { - debug('job %s (%s) completed, cleaning up delivery record %s', job.id, job.name, job.data.deliveryJobId); + deliveryWorker.on('completed', async (job) => { + debug('delivery job %s (%s) completed, cleaning up delivery record %s', job.id, job.name, job.data.deliveryJobId); try { await db.delete(deliveryJobs).where(eq(deliveryJobs.id, job.data.deliveryJobId)); } catch (err) { @@ -165,10 +343,31 @@ export function startFederationWorker() { } }); - worker.on('error', (err) => { - console.error('[federation] Worker error:', err); + deliveryWorker.on('error', (err) => { + console.error('[federation] Delivery worker error:', err); }); - debug('worker started'); - return worker; + const healthCheckWorker = new Worker( + HEALTH_CHECK_QUEUE_NAME, + processHealthCheck, + { + connection: createRedisConnection() as never, + concurrency: 3, + }, + ); + + healthCheckWorker.on('ready', () => { + console.log('[federation] Health-check worker connected to Redis and ready'); + }); + + healthCheckWorker.on('failed', (job, err) => { + debug('health-check job %s failed: %s', job?.id, err.message); + }); + + healthCheckWorker.on('error', (err) => { + console.error('[federation] Health-check worker error:', err); + }); + + debug('all workers started'); + return { deliveryWorker, healthCheckWorker }; } diff --git a/src/lib/db/schema/index.ts b/src/lib/db/schema/index.ts index 14e4084..4029d7b 100644 --- a/src/lib/db/schema/index.ts +++ b/src/lib/db/schema/index.ts @@ -25,6 +25,7 @@ export const user = pgTable("user", { displayUsername: text("display_username"), twoFactorEnabled: boolean("two_factor_enabled").default(false), isPrivate: boolean("is_private").default(false), + postPropagationPolicy: text("post_propagation_policy").default("all"), }); export const account = pgTable( @@ -72,16 +73,21 @@ export const posts = pgTable( { id: text("id").primaryKey(), content: jsonb("content").notNull(), - authorId: text("author_id") - .notNull() - .references(() => user.id, { onDelete: "cascade" }), + authorId: text("author_id").references(() => user.id, { + onDelete: "cascade", + }), + federatedAuthorId: text("federated_author_id"), published: timestamp("published").notNull(), isLocal: boolean("is_local").default(false).notNull(), isPrivate: boolean("is_private").default(false), createdAt: timestamp("created_at").notNull(), federationUrl: text("federation_url"), + federationPostId: text("federation_post_id"), }, - (table) => [index("posts_federationUrl_idx").on(table.federationUrl)], + (table) => [ + index("posts_federationUrl_idx").on(table.federationUrl), + index("posts_federationPostId_idx").on(table.federationPostId), + ], ); export const follows = pgTable( @@ -150,6 +156,8 @@ export const serverRegistry = pgTable( createdAt: timestamp("created_at").notNull(), updatedAt: timestamp("updated_at").notNull(), isHealthy: boolean("is_healthy").notNull(), + healthCheckAttempts: integer("health_check_attempts").default(0).notNull(), + unhealthyReason: text("unhealthy_reason"), }, (table) => [ uniqueIndex("serverRegistry_publicKey_uidx").on(table.publicKey), diff --git a/src/lib/federation/fetch.ts b/src/lib/federation/fetch.ts new file mode 100644 index 0000000..77fceeb --- /dev/null +++ b/src/lib/federation/fetch.ts @@ -0,0 +1,301 @@ +import db from '@/lib/db'; +import { serverRegistry } from '@/lib/db/schema'; +import { encryptPayload, getOwnSigningSecretKey, signMessage } from '@/lib/federation/keytools'; +import { markServerHealthy, markServerUnhealthy } from '@/lib/federation/registry'; +import { EMERGENCY_SWEEP_TIMEOUT, getThreatPolicy } from '@/lib/federation/threat-model'; +import createDebug from 'debug'; +import { and, desc, eq, ne } from 'drizzle-orm'; + +const debug = createDebug('app:federation:fetch'); + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +export type FederationErrorCode = + | "DNS_BLOCKED" + | "CONN_REFUSED" + | "CONN_RESET" + | "TIMEOUT" + | "TLS_ERROR" + | "UNKNOWN"; + +export class FederationError extends Error { + constructor( + public readonly code: FederationErrorCode, + public readonly url: string, + ) { + super(`Federation unreachable: ${code} — ${url}`); + this.name = 'FederationError'; + } + + get isProxyEligible(): boolean { + return getThreatPolicy(this.code).proxyEligible; + } +} + +export interface FederationFetchOptions { + method?: string; + headers?: Record; + body?: string; + timeout?: number; + proxyFallback?: boolean; + serverUrl?: string; + skipHealthUpdate?: boolean; +} + +export interface FederationFetchResult { + response: Response; + proxied: boolean; + proxyPeer?: string; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +function extractServerUrl(fullUrl: string, explicit?: string): string { + if (explicit) return explicit; + const parsed = new URL(fullUrl); + return `${parsed.protocol}//${parsed.host}`; +} + +function classifyError(err: unknown, url: string): FederationError { + const anyErr = err as Record | undefined; + const code = anyErr?.cause?.code ?? anyErr?.code ?? ''; + + if (anyErr?.name === 'AbortError' || anyErr?.name === 'TimeoutError') { + return new FederationError('TIMEOUT', url); + } + if (code === 'ENOTFOUND' || code === 'EAI_AGAIN') { + return new FederationError('DNS_BLOCKED', url); + } + if (code === 'ECONNREFUSED') { + return new FederationError('CONN_REFUSED', url); + } + if (code === 'ECONNRESET' || code === 'ETIMEDOUT') { + return new FederationError('CONN_RESET', url); + } + if (typeof code === 'string' && ( + code.startsWith('ERR_TLS') || + code.startsWith('ERR_SSL') || + code.startsWith('CERT_') || + code === 'DEPTH_ZERO_SELF_SIGNED_CERT' || + code === 'UNABLE_TO_VERIFY_LEAF_SIGNATURE' || + code === 'SELF_SIGNED_CERT_IN_CHAIN' + )) { + return new FederationError('TLS_ERROR', url); + } + return new FederationError('UNKNOWN', url); +} + +async function directFetch(url: string, opts: FederationFetchOptions): Promise { + const controller = new AbortController(); + const timeout = opts.timeout ?? 10_000; + const timer = setTimeout(() => controller.abort(), timeout); + + try { + const response = await fetch(url, { + method: opts.method ?? 'GET', + headers: opts.headers, + body: opts.body, + signal: controller.signal, + }); + clearTimeout(timer); + return response; + } catch (err) { + clearTimeout(timer); + throw classifyError(err, url); + } +} + +// --------------------------------------------------------------------------- +// Proxy peer selection & emergency sweep +// --------------------------------------------------------------------------- + +async function pickHealthyProxy(excludeUrl: string): Promise { + const ownUrl = process.env.BETTER_AUTH_URL!; + const [peer] = await db.select() + .from(serverRegistry) + .where( + and( + eq(serverRegistry.isHealthy, true), + ne(serverRegistry.url, excludeUrl), + ne(serverRegistry.url, ownUrl), + ), + ) + .orderBy(desc(serverRegistry.lastSeen)) + .limit(1); + + return peer ?? null; +} + +async function emergencySweep(excludeUrl: string): Promise { + debug('emergency sweep: pinging all unhealthy servers'); + const ownUrl = process.env.BETTER_AUTH_URL!; + + const unhealthyServers = await db.select() + .from(serverRegistry) + .where( + and( + eq(serverRegistry.isHealthy, false), + ne(serverRegistry.url, excludeUrl), + ne(serverRegistry.url, ownUrl), + ), + ) + .orderBy(desc(serverRegistry.lastSeen)); + + const checkable = unhealthyServers.filter(s => { + if (!s.unhealthyReason) return true; + const policy = getThreatPolicy(s.unhealthyReason as FederationErrorCode); + return policy.directHealthCheckable; + }); + + if (checkable.length === 0) { + debug('emergency sweep: no direct-checkable servers'); + return null; + } + + debug('emergency sweep: pinging %d servers in parallel (timeout %dms)', checkable.length, EMERGENCY_SWEEP_TIMEOUT); + + const results = await Promise.allSettled( + checkable.map(async (server) => { + const res = await fetch(server.url + '/discover', { + signal: AbortSignal.timeout(EMERGENCY_SWEEP_TIMEOUT), + }); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + return server; + }), + ); + + const recovered: (typeof serverRegistry.$inferSelect)[] = []; + for (const result of results) { + if (result.status === 'fulfilled') { + recovered.push(result.value); + } + } + + if (recovered.length === 0) { + debug('emergency sweep: no servers recovered — federation is STRANDED'); + console.error('[federation] STRANDED: all known peers are unreachable. Inbound registration is the only recovery path.'); + return null; + } + + debug('emergency sweep: %d server(s) recovered', recovered.length); + for (const server of recovered) { + await markServerHealthy(server.url); + } + + return recovered[0]; +} + +// --------------------------------------------------------------------------- +// Proxy routing +// --------------------------------------------------------------------------- + +async function attemptProxyRoute( + url: string, + opts: FederationFetchOptions, + targetServerUrl: string, + proxyPeer: typeof serverRegistry.$inferSelect, +): Promise { + debug('proxy route: sending through %s → %s', proxyPeer.url, targetServerUrl); + + const [targetServer] = await db.select() + .from(serverRegistry) + .where(eq(serverRegistry.url, targetServerUrl)) + .limit(1); + + if (!targetServer) { + throw new Error(`Target server ${targetServerUrl} not found in registry for proxy routing`); + } + + const recipientKey = new Uint8Array(Buffer.from(targetServer.encryptionPublicKey, 'base64')); + const innerPayload = JSON.stringify({ + targetUrl: url, + method: opts.method ?? 'GET', + headers: opts.headers ?? {}, + body: opts.body ?? null, + }); + + const encrypted = encryptPayload(innerPayload, recipientKey); + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + const proxyUrl = proxyPeer.url + '/proxy'; + const proxyBody = JSON.stringify({ + method: 'PROXY', + targetUrl: targetServerUrl + '/proxy', + publicSigningKey: process.env.FEDERATION_PUBLIC_KEY!, + publicEncryptionKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, + payload: encrypted, + signature, + }); + + const proxyResponse = await fetch(proxyUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Federation-Origin': process.env.BETTER_AUTH_URL!, + 'Origin': process.env.BETTER_AUTH_URL!, + }, + body: proxyBody, + signal: AbortSignal.timeout(opts.timeout ?? 15_000), + }); + + if (!proxyResponse.ok) { + throw new Error(`Proxy ${proxyPeer.url} returned ${proxyResponse.status}`); + } + + return { response: proxyResponse, proxied: true, proxyPeer: proxyPeer.url }; +} + +// --------------------------------------------------------------------------- +// Main entry point +// --------------------------------------------------------------------------- + +export async function federationFetch( + url: string, + opts: FederationFetchOptions = {}, +): Promise { + const serverUrl = extractServerUrl(url, opts.serverUrl); + + // Gate 0: direct fetch + try { + const response = await directFetch(url, opts); + return { response, proxied: false }; + } catch (err) { + if (!(err instanceof FederationError)) throw err; + + debug('direct fetch to %s failed: %s', url, err.code); + + if (!opts.skipHealthUpdate) { + await markServerUnhealthy(serverUrl, err.code).catch(e => + debug('failed to mark %s unhealthy: %O', serverUrl, e), + ); + } + + const policy = getThreatPolicy(err.code); + + // Gate 1: proxy fallback + if (opts.proxyFallback && policy.proxyEligible) { + let proxyPeer = await pickHealthyProxy(serverUrl); + + // If no healthy proxy is found, we'll do an emergency sweep to find a new proxy. + if (!proxyPeer) { + proxyPeer = await emergencySweep(serverUrl); + } + + if (proxyPeer) { + try { + return await attemptProxyRoute(url, opts, serverUrl, proxyPeer); + } catch (proxyErr) { + debug('proxy route through %s failed: %O', proxyPeer.url, proxyErr); + } + } else { + throw new Error("No healthy proxy found. Emergency sweep failed."); + } + } + + throw err; + } +} diff --git a/src/lib/federation/peer-registry-url.ts b/src/lib/federation/peer-registry-url.ts new file mode 100644 index 0000000..6204ecc --- /dev/null +++ b/src/lib/federation/peer-registry-url.ts @@ -0,0 +1,19 @@ +/** + * Values for `follows.follower_server_url` / `follows.following_server_url`, which FK to + * `server_registry.url`. This instance is intentionally absent from that table, so when the + * peer is local we persist null instead of violating the FK. + */ +export function peerRegistryUrlOrNull(peerUrl: string | null | undefined): string | null { + if (peerUrl == null || peerUrl === "") return null; + const own = process.env.BETTER_AUTH_URL; + let peerOrigin: string; + try { + peerOrigin = new URL(peerUrl).origin; + } catch { + return null; + } + if (!own) return peerOrigin; + const ownOrigin = new URL(own).origin; + if (peerOrigin === ownOrigin) return null; + return peerOrigin; +} diff --git a/src/lib/federation/proxy-helpers/federated-post.ts b/src/lib/federation/proxy-helpers/federated-post.ts new file mode 100644 index 0000000..f01b235 --- /dev/null +++ b/src/lib/federation/proxy-helpers/federated-post.ts @@ -0,0 +1,94 @@ +import db from "@/lib/db"; +import { posts } from "@/lib/db/schema"; +import { getOwnSigningSecretKey, signMessage, verifySignature } from "@/lib/federation/keytools"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; +import { and, eq } from "drizzle-orm"; +import type { z } from "zod"; + +type Tx = Parameters[0]>[0]; + +export type FederatedPostSender = { + publicKey: string; + encryptionPublicKey: string; + url: string; +}; + +export type FederatedPostResult = + | { + ok: true; + innerPayload: string; + signature: string; + senderEncryptionPublicKeyB64: string; + } + | { ok: false; error: string; code: string; status: number }; + +export async function applyFederatedPostInTransaction( + tx: Tx, + envelope: z.infer, + bodySignature: string, + sender: FederatedPostSender, +): Promise { + const senderPublicKey = new Uint8Array(Buffer.from(sender.publicKey, "base64")); + if (!verifySignature(envelope._raw, bodySignature, senderPublicKey)) { + return { + ok: false, + error: "The provided signature is invalid. Please redo the discovery process and try again.", + code: "INVALID_SIGNATURE", + status: 403, + }; + } + + const [existing] = await tx + .select({ id: posts.id }) + .from(posts) + .where( + and( + eq(posts.federationUrl, envelope.federationUrl), + eq(posts.federationPostId, envelope.post.id), + ), + ) + .limit(1); + + if (existing) { + return { + ok: false, + error: "This post has already been federated to this server.", + code: "FEDERATED_POST_ALREADY_EXISTS", + status: 409, + }; + } + + const localId = crypto.randomUUID(); + const published = new Date(envelope.post.published); + + await tx.insert(posts).values({ + id: localId, + content: envelope.post.content, + authorId: null, + federatedAuthorId: envelope.post.authorId, + published, + isLocal: false, + isPrivate: envelope.post.isPrivate, + federationUrl: envelope.federationUrl, + federationPostId: envelope.post.id, + createdAt: new Date(), + }); + + const innerPayload = JSON.stringify({ + post: { + id: localId, + federationPostId: envelope.post.id, + }, + federationUrl: process.env.BETTER_AUTH_URL!, + method: "FEDERATE_POST" as const, + }); + + const signature = signMessage(innerPayload, getOwnSigningSecretKey()); + + return { + ok: true, + innerPayload, + signature, + senderEncryptionPublicKeyB64: sender.encryptionPublicKey, + }; +} diff --git a/src/lib/federation/registry.ts b/src/lib/federation/registry.ts index d8b7d97..308c823 100644 --- a/src/lib/federation/registry.ts +++ b/src/lib/federation/registry.ts @@ -1,5 +1,6 @@ import db from '@/lib/db'; import { serverRegistry } from '@/lib/db/schema'; +import { federationFetch, FederationError, type FederationErrorCode } from '@/lib/federation/fetch'; import { assertSafeUrl } from '@/lib/federation/url-guard'; import createDebug from 'debug'; import { eq } from 'drizzle-orm'; @@ -16,7 +17,43 @@ export async function upsertServer(url: string, publicKey: string, encryptionPub createdAt: new Date(), updatedAt: new Date(), isHealthy: true, - }).onConflictDoNothing(); + healthCheckAttempts: 0, + unhealthyReason: null, + }).onConflictDoUpdate({ + target: serverRegistry.url, + set: { + lastSeen: new Date(), + updatedAt: new Date(), + }, + }); +} + +export async function markServerUnhealthy(serverUrl: string, reason: FederationErrorCode): Promise { + debug('marking server %s as unhealthy (reason: %s)', serverUrl, reason); + await db.update(serverRegistry).set({ + isHealthy: false, + unhealthyReason: reason, + healthCheckAttempts: 0, + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); + + try { + const { scheduleHealthCheck } = await import('@/lib/bull'); + await scheduleHealthCheck(serverUrl, 0); + } catch (err) { + debug('failed to schedule health check for %s: %O', serverUrl, err); + } +} + +export async function markServerHealthy(serverUrl: string): Promise { + debug('marking server %s as healthy', serverUrl); + await db.update(serverRegistry).set({ + isHealthy: true, + unhealthyReason: null, + healthCheckAttempts: 0, + lastSeen: new Date(), + updatedAt: new Date(), + }).where(eq(serverRegistry.url, serverUrl)); } export class DiscoveryError extends Error { @@ -38,15 +75,18 @@ export async function discoverAndRegister(serverUrl: string): Promise { let remote: { url?: string; publicKey?: string; encryptionPublicKey?: string }; try { - const res = await fetch(serverUrl + '/discover', { - signal: AbortSignal.timeout(10_000), + const { response } = await federationFetch(serverUrl + '/discover', { + serverUrl, }); - if (!res.ok) { - throw new DiscoveryError(`GET /discover returned ${res.status}`); + if (!response.ok) { + throw new DiscoveryError(`GET /discover returned ${response.status}`); } - remote = await res.json(); + remote = await response.json(); } catch (err) { if (err instanceof DiscoveryError) throw err; + if (err instanceof FederationError) { + throw new DiscoveryError(`Failed to reach ${serverUrl}/discover: ${err.code}`); + } throw new DiscoveryError(`Failed to reach ${serverUrl}/discover: ${err instanceof Error ? err.message : err}`); } @@ -72,7 +112,7 @@ export async function discoverAndRegister(serverUrl: string): Promise { debug('sending mutual REGISTER to %s', serverUrl); try { - await fetch(serverUrl + '/discover', { + await federationFetch(serverUrl + '/discover', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ @@ -81,7 +121,7 @@ export async function discoverAndRegister(serverUrl: string): Promise { publicKey: process.env.FEDERATION_PUBLIC_KEY!, encryptionPublicKey: process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!, }), - signal: AbortSignal.timeout(10_000), + serverUrl, }); } catch (err) { debug('mutual REGISTER to %s failed (non-fatal): %s', serverUrl, err instanceof Error ? err.message : err); diff --git a/src/lib/federation/threat-model.ts b/src/lib/federation/threat-model.ts new file mode 100644 index 0000000..909f8b7 --- /dev/null +++ b/src/lib/federation/threat-model.ts @@ -0,0 +1,57 @@ +import type { FederationErrorCode } from "./fetch"; + +export interface ThreatPolicy { + proxyEligible: boolean; + directHealthCheckable: boolean; + description: string; +} + +export const DEFAULT_THREAT_MODEL: Record = { + DNS_BLOCKED: { + proxyEligible: true, + directHealthCheckable: false, + description: "DNS unreachable -- likely censored, relay-eligible", + }, + TIMEOUT: { + proxyEligible: false, + directHealthCheckable: true, + description: "Timed out -- ambiguous, rely on existing BullMQ retries", + }, + CONN_REFUSED: { + proxyEligible: false, + directHealthCheckable: true, + description: "Connection refused -- server is down, proxy won't help", + }, + CONN_RESET: { + proxyEligible: false, + directHealthCheckable: true, + description: "Connection reset -- ambiguous, likely server-side", + }, + TLS_ERROR: { + proxyEligible: false, + directHealthCheckable: true, + description: "TLS failure -- possible MITM, do not proxy", + }, + UNKNOWN: { + proxyEligible: true, + directHealthCheckable: true, + description: "Unknown error -- safety default", + }, +}; + +export const EMERGENCY_SWEEP_TIMEOUT = 2_000; + +/** + * Admin overrides -- edit this to match your federation's threat model. + * Any key you set here overrides the corresponding default above. + * + * Example for a heavily censored region: + * TIMEOUT: { proxyEligible: true }, + */ +export const THREAT_MODEL_OVERRIDES: Partial>> = {}; + +export function getThreatPolicy(code: FederationErrorCode): ThreatPolicy { + const base = DEFAULT_THREAT_MODEL[code]; + const override = THREAT_MODEL_OVERRIDES[code]; + return override ? { ...base, ...override } : base; +} diff --git a/src/lib/federation/url-guard.ts b/src/lib/federation/url-guard.ts index 7065669..fc0fdfb 100644 --- a/src/lib/federation/url-guard.ts +++ b/src/lib/federation/url-guard.ts @@ -1,6 +1,6 @@ import createDebug from "debug"; -const debug = createDebug("federation:url-guard"); +const debug = createDebug("app:federation:url-guard"); const BLOCKED_HOSTNAMES = new Set([ "localhost", @@ -12,8 +12,21 @@ const BLOCKED_HOSTNAMES = new Set([ "169.254.169.254", ]); -const DEV_ALLOWED_HOSTNAMES = new Set(["localhost", "127.0.0.1", process.env.DEV_ALLOWED_HOSTNAMES!]); -debug("DEV_ALLOWED_HOSTNAMES: %o", DEV_ALLOWED_HOSTNAMES); +const SSRF_BYPASS = process.env.FEDERATION_ALLOW_PRIVATE_URLS === "true"; + +const DEV_ALLOWED_HOSTNAMES = new Set([ + "localhost", + "127.0.0.1", +]); + +if (typeof process.env.DEV_ALLOWED_HOSTNAMES === "string" && process.env.DEV_ALLOWED_HOSTNAMES.trim() !== "") { + for (const h of process.env.DEV_ALLOWED_HOSTNAMES.split(",")) { + const hostname = h.trim(); + if (hostname) DEV_ALLOWED_HOSTNAMES.add(hostname); + } +} + +debug("SSRF bypass: %s, DEV_ALLOWED_HOSTNAMES: %s", SSRF_BYPASS, [...DEV_ALLOWED_HOSTNAMES].join(", ")); function isPrivateIPv4(hostname: string): boolean { const parts = hostname.split(".").map(Number); if (parts.length !== 4 || parts.some((p) => isNaN(p))) return false; @@ -39,9 +52,8 @@ function isPrivateIPv6(hostname: string): boolean { /** * Throws if the URL points to a private/internal address or uses a - * non-HTTP(S) protocol. In development, localhost/127.0.0.1 are explicitly - * allowed for local federation testing while all other safety checks - * remain enforced. + * non-HTTP(S) protocol. Set FEDERATION_ALLOW_PRIVATE_URLS=true to + * allow localhost/127.0.0.1 for local federation testing. */ export function assertSafeUrl(url: string): void { let parsed: URL; @@ -57,7 +69,7 @@ export function assertSafeUrl(url: string): void { const hostname = parsed.hostname; - if (process.env.NODE_ENV === "development" && DEV_ALLOWED_HOSTNAMES.has(hostname)) { + if (SSRF_BYPASS && DEV_ALLOWED_HOSTNAMES.has(hostname)) { return; } diff --git a/src/lib/plugins/server/federation.ts b/src/lib/plugins/server/federation.ts index 3024555..46a2988 100644 --- a/src/lib/plugins/server/federation.ts +++ b/src/lib/plugins/server/federation.ts @@ -43,6 +43,17 @@ export const federation = () => { type: "boolean", required: true, index: false + }, + healthCheckAttempts: { + type: "number", + required: true, + index: false, + defaultValue: 0 + }, + unhealthyReason: { + type: "string", + required: false, + index: false } } }, diff --git a/src/lib/plugins/server/helpers/social/endpoints/follows.ts b/src/lib/plugins/server/helpers/social/endpoints/follows.ts index 2881433..6ec2653 100644 --- a/src/lib/plugins/server/helpers/social/endpoints/follows.ts +++ b/src/lib/plugins/server/helpers/social/endpoints/follows.ts @@ -1,8 +1,10 @@ import { getFederationQueue } from "@/lib/bull"; import db from "@/lib/db"; import { blacklistedServers, deliveryJobs, follows, serverRegistry, user } from "@/lib/db/schema"; -import { decryptPayload, getOwnEncryptionSecretKey, verifySignature } from "@/lib/federation/keytools"; +import { verifySignature } from "@/lib/federation/keytools"; +import { peerRegistryUrlOrNull } from "@/lib/federation/peer-registry-url"; import { discoverAndRegister, DiscoveryError } from "@/lib/federation/registry"; +import { FollowEnvelopeSchema } from "@/lib/zod/methods/FollowSchema"; import { createAuthEndpoint, getSessionFromCtx } from "better-auth/api"; import createDebug from "debug"; import { and, eq } from "drizzle-orm"; @@ -20,38 +22,7 @@ const followSchema = z.discriminatedUnion( z.object({ method: z.literal("FEDERATE"), signature: z.string(), - payload: z.object({ - ephemeralPublicKey: z.string(), - iv: z.string(), - ciphertext: z.string(), - authTag: z.string(), - }).transform((payload, ctx) => { - try { - const decrypted = decryptPayload(payload, getOwnEncryptionSecretKey()); - const parsedPayload = JSON.parse(decrypted); - - const parsedPayloadSchema = z.object({ - following: z.object({ - id: z.string(), - createdAt: z.coerce.date(), - followerId: z.string(), - followingId: z.string(), - accepted: z.boolean(), - followerServerUrl: z.string().nullable(), - }), - federationUrl: z.string(), - method: z.literal("FEDERATE"), - }).safeParse(parsedPayload); - if (!parsedPayloadSchema.success) { - ctx.addIssue({ code: "custom", message: "Invalid payload" }); - return z.never(); - } - return { ...parsedPayloadSchema.data, _raw: decrypted }; - } catch { - ctx.addIssue({ code: "custom", message: "Invalid payload" }); - return z.never(); - } - }), + payload: FollowEnvelopeSchema }), z.object({ method: z.literal("UNFOLLOW"), @@ -64,12 +35,11 @@ export const followUser = createAuthEndpoint("/social/follows", { method: "POST", body: followSchema, }, async (context) => { - debug("FOLLOW – %s", context.body.method); + const { method } = context.body; switch (method) { case "INSERT": { const session = await getSessionFromCtx(context); - debug("FOLLOW – user: %o", session); if (!session) { return context.json({ error: "Unauthorized" }, { status: 401 }); }; @@ -150,7 +120,7 @@ export const followUser = createAuthEndpoint("/social/follows", { followingId: userId, accepted: false, createdAt: new Date(), - followerServerUrl: serverUrl, + followerServerUrl: peerRegistryUrlOrNull(serverUrl), }).returning(); const job = await db.insert(deliveryJobs).values({ @@ -221,7 +191,7 @@ export const followUser = createAuthEndpoint("/social/follows", { followingId: following.followingId, accepted, createdAt: new Date(), - followingServerUrl: server.url, + followingServerUrl: peerRegistryUrlOrNull(server.url), }); return context.json({ status: "acknowledged", accepted }, { status: 200 }); diff --git a/src/lib/plugins/server/helpers/social/endpoints/posts.ts b/src/lib/plugins/server/helpers/social/endpoints/posts.ts index dc860ae..2935047 100644 --- a/src/lib/plugins/server/helpers/social/endpoints/posts.ts +++ b/src/lib/plugins/server/helpers/social/endpoints/posts.ts @@ -1,65 +1,182 @@ -import db from "@/lib/db"; -import { deliveryJobs, follows, posts } from "@/lib/db/schema"; import { getFederationQueue, type FederationDeliveryJob } from "@/lib/bull"; +import db from "@/lib/db"; +import { deliveryJobs, follows, posts, serverRegistry } from "@/lib/db/schema"; +import { encryptPayload } from "@/lib/federation/keytools"; +import { applyFederatedPostInTransaction } from "@/lib/federation/proxy-helpers/federated-post"; +import { EncryptedEnvelopeBaseSchema } from "@/lib/zod/EncryptedEnvelope"; +import { PostEnvelopeSchema } from "@/lib/zod/methods/PostFederationSchema"; import minioClient from "@/plugins/server/storage/minio.client"; import { createAuthEndpoint, getSessionFromCtx } from "better-auth/api"; +import createDebug from "debug"; import { and, eq } from "drizzle-orm"; import { z } from "zod"; import { postContentSchema } from "../social"; +const debug = createDebug("app:plugins:server:helpers:social:posts"); + +const federatedPostRequestSchema = z.object({ + method: z.literal("FEDERATE_POST"), + payload: EncryptedEnvelopeBaseSchema, + signature: z.string(), +}); + +const createPostBodySchema = z.union([federatedPostRequestSchema, postContentSchema]); + export const createPost = createAuthEndpoint("/social/posts", { method: "POST", - body: postContentSchema, + body: createPostBodySchema, }, async (context) => { - const content = context.body; - const user = await getSessionFromCtx(context) + const body = context.body; + + if (typeof body === "object" && body !== null && "method" in body && body.method === "FEDERATE_POST") { + const { payload: encryptedPayload, signature } = body; + + const parsedEnvelope = PostEnvelopeSchema.safeParse(encryptedPayload); + if (!parsedEnvelope.success) { + return context.json( + { error: "Invalid federated post payload", code: "INVALID_FEDERATED_POST_PAYLOAD" }, + { status: 400 }, + ); + } + + const envelope = parsedEnvelope.data; + const [server] = await db + .select({ + url: serverRegistry.url, + publicKey: serverRegistry.publicKey, + encryptionPublicKey: serverRegistry.encryptionPublicKey, + }) + .from(serverRegistry) + .where(eq(serverRegistry.url, envelope.federationUrl)) + .limit(1); + + if (!server) { + return context.json( + { + error: "Unknown federation server. Please redo the discovery process and try again.", + code: "UNKNOWN_FEDERATION_SERVER_INTERACTION", + }, + { status: 403 }, + ); + } + + const result = await db.transaction(async (tx) => + applyFederatedPostInTransaction(tx, envelope, signature, server), + ); + + if (!result.ok) { + return context.json({ error: result.error, code: result.code }, { status: result.status }); + } + + const recipientKey = new Uint8Array(Buffer.from(result.senderEncryptionPublicKeyB64, "base64")); + return context.json( + { + method: "PROXY_RESPONSE" as const, + status: "acknowledged", + data: encryptPayload(result.innerPayload, recipientKey), + signature: result.signature, + }, + { status: 200 }, + ); + } + + const content = body; + const user = await getSessionFromCtx(context); if (!user) { return context.json({ error: "Unauthorized" }, { status: 401 }); } - // Create post - const post = await db.insert(posts).values({ - id: crypto.randomUUID(), - content: content, - authorId: user.user.id, - published: new Date(), - isLocal: true, - createdAt: new Date(), - }).returning({ id: posts.id }); + const isPrivate = user.user.isPrivate; + const shouldPropagate = { + all: true, + followers: isPrivate, + none: false, + }[user.user.postPropagationPolicy as "all" | "followers" | "none"] ?? true; - // Enqueue federation delivery jobs for each follower's server - const followers = await db.select().from(follows).where(and(eq(follows.followingId, user.user.id), eq(follows.accepted, true))); - const uniqueUrls = [...new Set(followers.map(f => f.followerServerUrl).filter(Boolean))] as string[]; - const payload = JSON.stringify({ content }); + const postId = crypto.randomUUID(); + const published = new Date(); + const inserted = await db + .insert(posts) + .values({ + id: postId, + content, + authorId: user.user.id, + published, + isLocal: shouldPropagate, + isPrivate, + federationUrl: process.env.BETTER_AUTH_URL!, + federationPostId: postId, + createdAt: new Date(), + }) + .returning({ id: posts.id }); - const jobRows = uniqueUrls.map(url => ({ - id: crypto.randomUUID(), - targetUrl: url + "/social/posts", - serverUrl: url, - payload, - attempts: 0, - createdAt: new Date(), - })); + let federationDeliveriesQueued = 0; - if (jobRows.length > 0) { - await db.insert(deliveryJobs).values(jobRows); + if (shouldPropagate) { + const followers = await db + .select() + .from(follows) + .where(and(eq(follows.followingId, user.user.id), eq(follows.accepted, true))); + const following = await db + .select() + .from(follows) + .where(and(eq(follows.followerId, user.user.id), eq(follows.accepted, true))); - await getFederationQueue().addBulk( - jobRows.map(row => ({ - name: 'deliver-post' as const, - data: { - deliveryJobId: row.id, - targetUrl: row.targetUrl, - serverUrl: row.serverUrl, - payload: row.payload, - } satisfies FederationDeliveryJob, - })), - ); + debug("followers: %o", followers); + debug("following: %o", following); + + const uniqueUrls = [ + ...new Set([ + ...followers.map((f) => f.followingServerUrl).filter(Boolean), + ...following.map((f) => f.followerServerUrl).filter(Boolean), + ]), + ] as string[]; + + federationDeliveriesQueued = uniqueUrls.length; + + if (uniqueUrls.length > 0) { + const jobPayload = JSON.stringify({ + method: "FEDERATE_POST" as const, + federationUrl: process.env.BETTER_AUTH_URL!, + post: { + id: postId, + content, + authorId: user.user.id, + published: published.toISOString(), + isPrivate, + }, + }); + + const jobRows = uniqueUrls.map((url) => ({ + id: crypto.randomUUID(), + targetUrl: url + "/api/auth/social/posts", + serverUrl: url, + payload: jobPayload, + attempts: 0, + createdAt: new Date(), + })); + + await db.insert(deliveryJobs).values(jobRows); + + await getFederationQueue().addBulk( + jobRows.map((row) => ({ + name: "deliver-post" as const, + data: { + deliveryJobId: row.id, + targetUrl: row.targetUrl, + serverUrl: row.serverUrl, + payload: row.payload, + } satisfies FederationDeliveryJob, + })), + ); + } } - return context.json({ id: post[0].id }, { status: 200 }); - + return context.json( + { id: inserted[0].id, federationDeliveriesQueued }, + { status: 200 }, + ); }); export const getPost = createAuthEndpoint("/social/posts/:id", { @@ -107,4 +224,4 @@ export const uploadFile = createAuthEndpoint("/social/posts/files", { const objectUrl = `${protocol}://${process.env.MINIO_ENDPOINT}:${process.env.MINIO_PORT}/${process.env.MINIO_BUCKET}/${objectKey}`; return context.json({ presignedUrl, objectUrl, objectKey }, { status: 200 }); -}) \ No newline at end of file +}); diff --git a/src/lib/plugins/server/helpers/social/social.ts b/src/lib/plugins/server/helpers/social/social.ts index efae50a..61b8d0a 100644 --- a/src/lib/plugins/server/helpers/social/social.ts +++ b/src/lib/plugins/server/helpers/social/social.ts @@ -63,13 +63,18 @@ export default { }, authorId: { type: "string", - required: true, + required: false, index: false, references: { model: "user", field: "id" } }, + federatedAuthorId: { + type: "string", + required: false, + index: false, + }, published: { type: "date", required: true, @@ -84,6 +89,7 @@ export default { defaultValue: false, }, // "isPrivate" will be used to determine if the post should be visible only for the user's followers + // If "isLocal" is set to true and this to false, only users on the same server will be able to see the psot isPrivate: { type: "boolean", required: false, @@ -99,6 +105,12 @@ export default { type: "string", required: false, index: true, + }, + // This serves as a reference to the post on the original server this post came from + federationPostId: { + type: "string", + required: false, + index: true, } } }, diff --git a/src/lib/zod/EncryptedEnvelope.ts b/src/lib/zod/EncryptedEnvelope.ts new file mode 100644 index 0000000..f2793dd --- /dev/null +++ b/src/lib/zod/EncryptedEnvelope.ts @@ -0,0 +1,40 @@ +import { z, type ZodType } from "zod"; +import { decryptPayload, getOwnEncryptionSecretKey } from "../federation/keytools"; + +/** + * Raw envelope shape — validates the four ECIES fields without decrypting. + * Use this when you only need to confirm the envelope structure (e.g. a proxy + * that forwards opaque ciphertext without access to the recipient's key). + */ +export const EncryptedEnvelopeBaseSchema = z.object({ + ephemeralPublicKey: z.string(), + iv: z.string(), + ciphertext: z.string(), + authTag: z.string(), +}); + +/** + * Factory that returns an envelope schema which decrypts the ciphertext and + * validates the resulting plaintext against {@link innerSchema}. + * + * The output type is `z.infer & { _raw: string }` — the parsed inner + * payload plus the raw decrypted JSON string (useful for signature verification). + */ +export function createEncryptedEnvelopeSchema(innerSchema: T) { + return EncryptedEnvelopeBaseSchema.transform((payload, ctx) => { + try { + const decrypted = decryptPayload(payload, getOwnEncryptionSecretKey()); + const parsed = innerSchema.safeParse(JSON.parse(decrypted)); + + if (!parsed.success) { + ctx.addIssue({ code: "custom", message: parsed.error.issues.map(i => i.message).join("; ") }); + return z.NEVER; + } + + return Object.assign({}, parsed.data as z.infer, { _raw: decrypted }); + } catch { + ctx.addIssue({ code: "custom", message: "Decryption failed" }); + return z.NEVER; + } + }); +} diff --git a/src/lib/zod/methods/FollowSchema.ts b/src/lib/zod/methods/FollowSchema.ts new file mode 100644 index 0000000..d7a7b84 --- /dev/null +++ b/src/lib/zod/methods/FollowSchema.ts @@ -0,0 +1,18 @@ +import { z } from "zod"; +import { createEncryptedEnvelopeSchema } from "../EncryptedEnvelope"; + +const FollowInnerPayloadSchema = z.object({ + following: z.object({ + id: z.string(), + createdAt: z.coerce.date(), + followerId: z.string(), + followingId: z.string(), + accepted: z.boolean(), + followerServerUrl: z.string().nullable(), + }), + federationUrl: z.string(), + method: z.literal("FEDERATE"), +}); + +export const FollowEnvelopeSchema = createEncryptedEnvelopeSchema(FollowInnerPayloadSchema); +export default FollowInnerPayloadSchema; diff --git a/src/lib/zod/methods/PostFederationSchema.ts b/src/lib/zod/methods/PostFederationSchema.ts new file mode 100644 index 0000000..808be44 --- /dev/null +++ b/src/lib/zod/methods/PostFederationSchema.ts @@ -0,0 +1,18 @@ +import { postContentSchema } from "@/lib/plugins/server/helpers/social/social"; +import { z } from "zod"; +import { createEncryptedEnvelopeSchema } from "../EncryptedEnvelope"; + +export const PostInnerPayloadSchema = z.object({ + post: z.object({ + id: z.string(), + content: postContentSchema, + /** User id on the sending federation node (not required to exist on the recipient). */ + authorId: z.string(), + published: z.string(), + isPrivate: z.boolean(), + }), + federationUrl: z.string(), + method: z.literal("FEDERATE_POST"), +}); + +export const PostEnvelopeSchema = createEncryptedEnvelopeSchema(PostInnerPayloadSchema); diff --git a/tests/helpers/queue.ts b/tests/helpers/queue.ts new file mode 100644 index 0000000..7346439 --- /dev/null +++ b/tests/helpers/queue.ts @@ -0,0 +1,50 @@ +import { Queue, type Job } from "bullmq" +import Redis from "ioredis" + +function createRedis() { + return new Redis(process.env.REDIS_URL!, { maxRetriesPerRequest: null }) +} + +const HEALTH_CHECK_QUEUE = "federation-health-check" +const RETRY_QUEUE = "federation-retry" + +let _healthQueue: Queue | null = null +let _retryQueue: Queue | null = null + +export function getTestHealthCheckQueue(): Queue { + if (!_healthQueue) { + _healthQueue = new Queue(HEALTH_CHECK_QUEUE, { connection: createRedis() as never }) + } + return _healthQueue +} + +export function getTestRetryQueue(): Queue { + if (!_retryQueue) { + _retryQueue = new Queue(RETRY_QUEUE, { connection: createRedis() as never }) + } + return _retryQueue +} + +export async function getHealthCheckJobsFor(serverUrl: string): Promise { + const queue = getTestHealthCheckQueue() + const jobs = await queue.getJobs(["waiting", "delayed", "active", "completed", "failed"]) + return jobs.filter((j) => j.data?.serverUrl === serverUrl) +} + +export async function getRetryJobsFor(serverUrl: string): Promise { + const queue = getTestRetryQueue() + const jobs = await queue.getJobs(["waiting", "delayed", "active", "completed", "failed"]) + return jobs.filter((j) => j.data?.serverUrl === serverUrl) +} + +export async function drainAllQueues(): Promise { + const hq = getTestHealthCheckQueue() + const rq = getTestRetryQueue() + await hq.obliterate({ force: true }).catch(() => {}) + await rq.obliterate({ force: true }).catch(() => {}) +} + +export async function closeQueues(): Promise { + if (_healthQueue) { await _healthQueue.close(); _healthQueue = null } + if (_retryQueue) { await _retryQueue.close(); _retryQueue = null } +} diff --git a/tests/proxies/follow.ts b/tests/proxies/follow.ts new file mode 100644 index 0000000..2545508 --- /dev/null +++ b/tests/proxies/follow.ts @@ -0,0 +1,596 @@ +/** + * Manual proxy chain test script. + * + * You need 3 different instances up and running to use this test script. That includes yours. + * + * Exercises the full A → B → C → B → A proxy relay against real federation + * instances. Run this from Server A while Server B (proxy) and Server C + * (target) are already up. + * + * Usage: + * bun run testProxy.ts --proxy --target + * + * Examples: + * bun run testProxy.ts --proxy https://proxy.example.com --target https://target.example.com + * bun run testProxy.ts --proxy http://localhost:3001 --target http://localhost:3002 + */ + +import db from "@/lib/db"; +import { deliveryJobs, follows, serverRegistry } from "@/lib/db/schema"; +import { encryptPayload, fingerprintKey, signMessage } from "@/lib/federation/keytools"; +import { config } from "dotenv"; +import { desc, eq } from "drizzle-orm"; +import nacl from "tweetnacl"; + +config({ path: ".env.local" }); + +const FETCH_TIMEOUT_MS = 15_000; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +interface FedKeys { + signingPublicKey: string; + signingSecretKey: string; + encryptionPublicKey: string; + encryptionSecretKey: string; +} + +function generateKeypair(): FedKeys { + const signing = nacl.sign.keyPair(); + const encryption = nacl.box.keyPair(); + return { + signingPublicKey: Buffer.from(signing.publicKey).toString("base64"), + signingSecretKey: Buffer.from(signing.secretKey).toString("base64"), + encryptionPublicKey: Buffer.from(encryption.publicKey).toString("base64"), + encryptionSecretKey: Buffer.from(encryption.secretKey).toString("base64"), + }; +} + +async function readErrorBody(response: Response): Promise { + try { + const body = await response.json(); + return body?.error ?? body?.message ?? JSON.stringify(body); + } catch { + try { + return await response.text(); + } catch { + return response.statusText; + } + } +} + +interface TestResult { + name: string; + passed: boolean; + message: string; +} + +const results: TestResult[] = []; + +function pass(name: string, message = "OK") { + console.log(` ✔ ${name}`); + if (message !== "OK") console.log(` ${message}`); + results.push({ name, passed: true, message }); +} + +function fail(name: string, message: string) { + console.error(` ✘ ${name}`); + console.error(` ${message}`); + results.push({ name, passed: false, message }); +} + +// --------------------------------------------------------------------------- +// Validate environment +// --------------------------------------------------------------------------- + +const REQUIRED_ENV = [ + "FEDERATION_PUBLIC_KEY", + "FEDERATION_PRIVATE_KEY", + "FEDERATION_ENCRYPTION_PUBLIC_KEY", + "FEDERATION_ENCRYPTION_PRIVATE_KEY", + "BETTER_AUTH_URL", +] as const; + +const missing = REQUIRED_ENV.filter((k) => !process.env[k]); +if (missing.length > 0) { + console.error("Missing required environment variables:"); + missing.forEach((k) => console.error(` - ${k}`)); + console.error("Ensure .env.local is present and populated."); + process.exit(1); +} + +const ORIGIN = process.env.BETTER_AUTH_URL!; +const OWN_SIGNING_PUB = process.env.FEDERATION_PUBLIC_KEY!; +const OWN_ENCRYPTION_PUB = process.env.FEDERATION_ENCRYPTION_PUBLIC_KEY!; + +// --------------------------------------------------------------------------- +// Parse arguments +// --------------------------------------------------------------------------- + +function argAfter(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +const proxyUrl = argAfter("--proxy"); +const targetUrl = argAfter("--target"); +const bearerToken = argAfter("--bearer"); +const targetUserId = argAfter("--user"); + +if (!proxyUrl || !targetUrl) { + console.error("Usage: bun run testProxy.ts --proxy --target [options]"); + console.error(""); + console.error(" --proxy URL of Server B (the proxy)"); + console.error(" --target URL of Server C (the target)"); + console.error(" --test-fallback Enable proxy fallback test (requires C blocked from A)"); + console.error(" --bearer Bearer token for A's API (required for --test-fallback)"); + console.error(" --user User ID on Server C to follow (required for --test-fallback)"); + process.exit(1); +} + +console.log("Proxy chain test"); +console.log(` Server A (us): ${ORIGIN}`); +console.log(` Server B (proxy): ${proxyUrl}`); +console.log(` Server C (target): ${targetUrl}`); +console.log(` A signing key: ${fingerprintKey(OWN_SIGNING_PUB).slice(0, 16)}…`); +console.log(` A encryption key: ${fingerprintKey(OWN_ENCRYPTION_PUB).slice(0, 16)}…`); + +// --------------------------------------------------------------------------- +// 1. Discovery check +// --------------------------------------------------------------------------- + +interface DiscoverResponse { + url: string; + publicKey: string; + encryptionPublicKey: string; + peers: { url: string; isHealthy: boolean }[]; +} + +console.log("\n── Discovery ────────────────────────────────────────────"); + +let proxyInfo: DiscoverResponse; +let targetInfo: DiscoverResponse; + +try { + const res = await fetch(`${proxyUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server B (${proxyUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + proxyInfo = await res.json(); + console.log(` B: ${proxyInfo.url}`); + console.log(` signing: ${fingerprintKey(proxyInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(proxyInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${proxyInfo.peers.length}`); +} catch (err) { + console.error(`Cannot reach Server B at ${proxyUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); +} + +const isFallbackMode = process.argv.includes("--test-fallback"); + +if (isFallbackMode) { + // C is blocked from A — load C's info from A's local registry instead + const [cRecord] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetUrl)).limit(1); + if (!cRecord) { + console.error(` Server C (${targetUrl}) not found in local registry. Run mutual discovery before blocking.`); + process.exit(1); + } + targetInfo = { + url: cRecord.url, + publicKey: cRecord.publicKey, + encryptionPublicKey: cRecord.encryptionPublicKey, + peers: [], + }; + console.log(` C: ${targetInfo.url} (from local registry — blocked)`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); +} else { + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server C (${targetUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + targetInfo = await res.json(); + console.log(` C: ${targetInfo.url}`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${targetInfo.peers.length}`); + } catch (err) { + console.error(`Cannot reach Server C at ${targetUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); + } +} + +const aOnB = proxyInfo.peers.some((p) => p.url === ORIGIN); +console.log(` A registered on B: ${aOnB}`); + +if (!aOnB) { + console.error("\n A is not registered on B. Run mutual discovery first."); + process.exit(1); +} + +if (!isFallbackMode) { + const aOnC = targetInfo.peers.some((p) => p.url === ORIGIN); + console.log(` A registered on C: ${aOnC}`); + if (!aOnC) { + console.error("\n A is not registered on C. Run mutual discovery first."); + process.exit(1); + } +} + +// --------------------------------------------------------------------------- +// 2–5: Direct tests (skipped in --test-fallback mode since C is blocked) +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n Skipping direct tests (2–5) — C is blocked in fallback mode."); +} + +if (!isFallbackMode) { + + // --------------------------------------------------------------------------- + // 2. Full proxy relay: A → B → C → B → A + // --------------------------------------------------------------------------- + + console.log("\n── Test: Full proxy relay (A → B → C → B → A) ─────────"); + + { + const testName = "full proxy relay"; + try { + const nonce = crypto.randomUUID(); + const innerPayload = JSON.stringify({ + action: "proxy-test", + nonce, + timestamp: Date.now(), + sender: ORIGIN, + }); + + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + const signature = signMessage(innerPayload, new Uint8Array(Buffer.from(process.env.FEDERATION_PRIVATE_KEY!, "base64"))); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: OWN_SIGNING_PUB, + publicEncryptionKey: OWN_ENCRYPTION_PUB, + payload: encrypted, + signature, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status !== 200) { + fail(testName, `expected 200, got ${res.status}: ${JSON.stringify(body)}`); + } else if (body.method !== "PROXY_RESPONSE") { + fail(testName, `expected method=PROXY_RESPONSE, got ${body.method}`); + } else if (!body.payload) { + fail(testName, "response missing payload (B did not relay C's response)"); + } else if (body.payload.method !== "PROXY_RESPONSE") { + fail(testName, `inner payload method=${body.payload.method}, expected PROXY_RESPONSE`); + } else { + pass(testName, `nonce=${nonce}, C responded: "${body.payload.message ?? JSON.stringify(body.payload)}"`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 3. Direct TARGETED: A → C + // --------------------------------------------------------------------------- + + console.log("\n── Test: Direct TARGETED (A → C) ────────────────────────"); + + { + const testName = "direct TARGETED to C"; + try { + const innerPayload = JSON.stringify({ + action: "targeted-test", + nonce: crypto.randomUUID(), + sender: ORIGIN, + }); + + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const res = await fetch(`${targetUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify({ + method: "TARGETED", + payload: encrypted, + }), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status !== 200) { + fail(testName, `expected 200, got ${res.status}: ${JSON.stringify(body)}`); + } else if (body.method !== "PROXY_RESPONSE") { + fail(testName, `expected method=PROXY_RESPONSE, got ${body.method}`); + } else { + pass(testName, `C says: "${body.message}"`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 4. Sender validation — bad signing key + // --------------------------------------------------------------------------- + + console.log("\n── Test: Sender validation (bad keys → B) ──────────────"); + + { + const testName = "reject mismatched signing key"; + try { + const fakeKeys = generateKeypair(); + + const innerPayload = JSON.stringify({ action: "bad-key-test" }); + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: fakeKeys.signingPublicKey, + publicEncryptionKey: OWN_ENCRYPTION_PUB, + payload: encrypted, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": ORIGIN, + "Origin": ORIGIN, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status === 403 && body.code === "INCORRECT_KEYS") { + pass(testName, `B correctly rejected: "${body.error}"`); + } else { + fail(testName, `expected 403/INCORRECT_KEYS, got ${res.status}/${body.code}: ${JSON.stringify(body)}`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + + // --------------------------------------------------------------------------- + // 5. Unknown sender + // --------------------------------------------------------------------------- + + console.log("\n── Test: Unknown sender (→ B) ────────────────────────────"); + + { + const testName = "reject unknown sender"; + try { + const unknownKeys = generateKeypair(); + const unknownOrigin = "https://totally-unknown-federation-" + crypto.randomUUID().slice(0, 8) + ".test"; + + const innerPayload = JSON.stringify({ action: "unknown-sender-test" }); + const targetEncKey = new Uint8Array(Buffer.from(targetInfo.encryptionPublicKey, "base64")); + const encrypted = encryptPayload(innerPayload, targetEncKey); + + const proxyBody = { + method: "PROXY", + targetUrl: targetUrl + "/proxy", + publicSigningKey: unknownKeys.signingPublicKey, + publicEncryptionKey: unknownKeys.encryptionPublicKey, + payload: encrypted, + }; + + const res = await fetch(`${proxyUrl}/proxy`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Federation-Origin": unknownOrigin, + "Origin": unknownOrigin, + }, + body: JSON.stringify(proxyBody), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const body = await res.json(); + + if (res.status === 403 && body.code === "UNKNOWN_FEDERATION_SERVER_INTERACTION") { + pass(testName, `B correctly rejected: "${body.error}"`); + } else { + fail(testName, `expected 403/UNKNOWN_FEDERATION_SERVER_INTERACTION, got ${res.status}/${body.code}: ${JSON.stringify(body)}`); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } + +} // end if (!isFallbackMode) + +// --------------------------------------------------------------------------- +// 6. Auto proxy fallback via real follow delivery pipeline +// Sends a follow request through A's API → BullMQ worker picks it up → +// federationFetch with proxyFallback:true → direct to C fails → proxied +// through B → C processes → worker updates follow.accepted +// +// Requires: +// - Server C blocked from A (firewall) +// - --bearer and --user flags +// +// Block C: netsh advfirewall firewall add rule name="Block Federation C" dir=out action=block remoteip= remoteport= protocol=tcp +// Unblock: netsh advfirewall firewall delete rule name="Block Federation C" +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n── Test: Auto proxy fallback (follow delivery pipeline) ─"); + + if (!bearerToken || !targetUserId) { + console.error(" --test-fallback requires --bearer and --user "); + process.exit(1); + } + + // Step 1: verify C is unreachable directly + { + const testName = "direct fetch to C fails"; + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(5_000), + }); + fail(testName, `direct fetch succeeded (${res.status}) — C is not blocked from A. Block it first.`); + } catch { + pass(testName, "C is unreachable from A (blocked)"); + } + } + + // Step 2: send follow request through A's API + { + const testName = "follow delivery via proxy fallback"; + try { + console.log(` Sending follow request for user ${targetUserId} on ${targetUrl}...`); + + const followRes = await fetch(`${ORIGIN}/api/auth/social/follows`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "Authorization": `Bearer ${bearerToken}`, + }, + body: JSON.stringify({ + method: "INSERT", + userId: targetUserId, + federationUrl: targetUrl, + }), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const followBody = await followRes.json(); + + if (!followRes.ok) { + fail(testName, `follow request failed (${followRes.status}): ${JSON.stringify(followBody)}`); + } else { + const followId = followBody.following?.[0]?.id; + if (!followId) { + fail(testName, `follow created but no ID returned: ${JSON.stringify(followBody)}`); + } else { + console.log(` Follow record created: ${followId}`); + console.log(" Waiting for BullMQ worker to process delivery job..."); + + // Step 3: poll until the delivery job completes (worker processes it) + const maxWait = 60_000; + const pollInterval = 2_000; + let elapsed = 0; + let delivered = false; + + while (elapsed < maxWait) { + await new Promise((r) => setTimeout(r, pollInterval)); + elapsed += pollInterval; + + // Check if delivery job for this target still exists (removed on success) + const pendingJobs = await db.select() + .from(deliveryJobs) + .where(eq(deliveryJobs.targetUrl, targetUrl + "/api/auth/social/follows")) + .orderBy(desc(deliveryJobs.createdAt)) + .limit(5); + + // Check if follow.accepted was updated (worker sets this on success) + const [followRecord] = await db.select() + .from(follows) + .where(eq(follows.id, followId)) + .limit(1); + + const jobCount = pendingJobs.length; + const accepted = followRecord?.accepted; + + process.stdout.write(`\r Polling... ${Math.round(elapsed / 1000)}s — jobs pending: ${jobCount}, accepted: ${accepted} `); + + if (accepted === true) { + delivered = true; + break; + } + } + + console.log(""); + + if (delivered) { + pass(testName, "follow delivered through proxy and accepted by C"); + } else { + // Check final state for diagnostics + const [finalFollow] = await db.select() + .from(follows) + .where(eq(follows.id, followId)) + .limit(1); + + const remainingJobs = await db.select() + .from(deliveryJobs) + .where(eq(deliveryJobs.targetUrl, targetUrl + "/api/auth/social/follows")) + .limit(5); + + fail(testName, + `timed out after ${maxWait / 1000}s. ` + + `follow.accepted=${finalFollow?.accepted}, ` + + `pending delivery jobs=${remainingJobs.length}. ` + + `Check worker logs (DEBUG=app:federation:*) for details.`, + ); + } + + // Cleanup: remove the test follow record + console.log(" Cleaning up test follow record..."); + // await db.delete(follows).where(eq(follows.id, followId)); + } + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } + } +} else { + console.log("\n Skipping auto-fallback test (pass --test-fallback to enable)."); + console.log(" Requires: --test-fallback --bearer --user "); + console.log(" And C must be blocked from A's machine (firewall rule)."); +} + +// --------------------------------------------------------------------------- +// Summary +// --------------------------------------------------------------------------- + +const passed = results.filter((r) => r.passed); +const failed = results.filter((r) => !r.passed); + +console.log("\n════════════════════════════════════════════════════════"); +console.log(`Results: ${passed.length} passed, ${failed.length} failed out of ${results.length}`); + +if (failed.length > 0) { + console.error("\nFailed tests:"); + failed.forEach((f) => console.error(` ✘ ${f.name}: ${f.message}`)); + process.exit(1); +} + +console.log("\nAll tests passed."); +process.exit(0); diff --git a/tests/proxies/post.ts b/tests/proxies/post.ts new file mode 100644 index 0000000..3f6b92f --- /dev/null +++ b/tests/proxies/post.ts @@ -0,0 +1,409 @@ +/** + * Manual post proxy / federation test (Server A). + * + * Exercises the real path only — same as production: + * POST ${A}/api/auth/social/posts → BullMQ worker → federationFetch (direct or via proxy B) → C. + * + * Does not POST ciphertext to B’s `/proxy` by hand; the worker does that after your createPost. + * + * Usage: + * bun run tests/proxies/post.ts --proxy --target --bearer + * + * Prerequisites on A: + * - Bearer user must have at least one accepted follower whose `followerServerUrl` points at C + * (same base URL as `--target` / registry). + * - Propagation must enqueue jobs (e.g. policy `all`, or private + `followers`). + * + * Optional: + * --test-fallback C blocked from A: load C from A’s server_registry only; verify direct C fetch fails first + * --test-no-remote-followers Expect 200 with federationDeliveriesQueued === 0 (propagation on, no remote follower URLs) + * + * Examples: + * bun run tests/proxies/post.ts --proxy http://localhost:3001 --target http://host.docker.internal:3002 --bearer --test-fallback + */ + +import db from "@/lib/db"; +import { deliveryJobs, serverRegistry } from "@/lib/db/schema"; +import { fingerprintKey } from "@/lib/federation/keytools"; +import { config } from "dotenv"; +import { and, desc, eq, like } from "drizzle-orm"; + +config({ path: ".env.local" }); + +const FETCH_TIMEOUT_MS = 15_000; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async function readErrorBody(response: Response): Promise { + try { + const body = await response.json(); + return body?.error ?? body?.message ?? JSON.stringify(body); + } catch { + try { + return await response.text(); + } catch { + return response.statusText; + } + } +} + +interface TestResult { + name: string; + passed: boolean; + message: string; +} + +const results: TestResult[] = []; + +function pass(name: string, message = "OK") { + console.log(` ✔ ${name}`); + if (message !== "OK") console.log(` ${message}`); + results.push({ name, passed: true, message }); +} + +function fail(name: string, message: string) { + console.error(` ✘ ${name}`); + console.error(` ${message}`); + results.push({ name, passed: false, message }); +} + +// --------------------------------------------------------------------------- +// Validate environment +// --------------------------------------------------------------------------- + +const REQUIRED_ENV = [ + "FEDERATION_PUBLIC_KEY", + "FEDERATION_PRIVATE_KEY", + "FEDERATION_ENCRYPTION_PUBLIC_KEY", + "FEDERATION_ENCRYPTION_PRIVATE_KEY", + "BETTER_AUTH_URL", +] as const; + +const missing = REQUIRED_ENV.filter((k) => !process.env[k]); +if (missing.length > 0) { + console.error("Missing required environment variables:"); + missing.forEach((k) => console.error(` - ${k}`)); + console.error("Ensure .env.local is present and populated."); + process.exit(1); +} + +const ORIGIN = process.env.BETTER_AUTH_URL!; + +// --------------------------------------------------------------------------- +// Parse arguments +// --------------------------------------------------------------------------- + +function argAfter(flag: string): string | undefined { + const idx = process.argv.indexOf(flag); + return idx !== -1 ? process.argv[idx + 1] : undefined; +} + +const proxyUrl = argAfter("--proxy"); +const targetUrl = argAfter("--target"); +const bearerToken = argAfter("--bearer"); +const testNoRemoteFollowers = process.argv.includes("--test-no-remote-followers"); +const isFallbackMode = process.argv.includes("--test-fallback"); + +if (!proxyUrl || !targetUrl || !bearerToken) { + console.error( + "Usage: bun run tests/proxies/post.ts --proxy --target --bearer [options]", + ); + console.error(""); + console.error(" --proxy Server B (proxy); used for /discover sanity check"); + console.error(" --target Server C base URL (must match server_registry.url on A for --test-fallback)"); + console.error(" --bearer Session token on A (required — test hits POST /api/auth/social/posts)"); + console.error(" --test-fallback C unreachable from A; load C from registry; verify block then deliver via API"); + console.error(" --test-no-remote-followers Expect createPost 400 NO_REMOTE_FOLLOWERS (runs after main test if set)"); + process.exit(1); +} + +if (testNoRemoteFollowers && !bearerToken) { + console.error("--test-no-remote-followers requires --bearer "); + process.exit(1); +} + +console.log("Post delivery test (A API → worker → federation/proxy)"); +console.log(` Server A (us): ${ORIGIN}`); +console.log(` Server B (proxy): ${proxyUrl}`); +console.log(` Server C (target): ${targetUrl}`); + +const DEFAULT_POST_CONTENT = [{ type: "text" as const, value: "proxy post test" }]; + +// --------------------------------------------------------------------------- +// 1. Discovery (B reachable from A; C from registry or live) +// --------------------------------------------------------------------------- + +interface DiscoverResponse { + url: string; + publicKey: string; + encryptionPublicKey: string; + peers: { url: string; isHealthy: boolean }[]; +} + +console.log("\n── Discovery ────────────────────────────────────────────"); + +let proxyInfo: DiscoverResponse; +let targetInfo: DiscoverResponse; + +try { + const res = await fetch(`${proxyUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server B (${proxyUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + proxyInfo = await res.json(); + console.log(` B: ${proxyInfo.url}`); + console.log(` signing: ${fingerprintKey(proxyInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(proxyInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${proxyInfo.peers.length}`); +} catch (err) { + console.error(`Cannot reach Server B at ${proxyUrl}/discover: ${err instanceof Error ? err.message : err}`); + process.exit(1); +} + +if (isFallbackMode) { + const [cRecord] = await db.select().from(serverRegistry).where(eq(serverRegistry.url, targetUrl)).limit(1); + if (!cRecord) { + console.error(` Server C (${targetUrl}) not found in local registry. Run mutual discovery before blocking.`); + process.exit(1); + } + targetInfo = { + url: cRecord.url, + publicKey: cRecord.publicKey, + encryptionPublicKey: cRecord.encryptionPublicKey, + peers: [], + }; + console.log(` C: ${targetInfo.url} (from local registry — blocked from A)`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); +} else { + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + if (!res.ok) { + console.error(`Server C (${targetUrl}) returned ${res.status}: ${await readErrorBody(res)}`); + process.exit(1); + } + targetInfo = await res.json(); + console.log(` C: ${targetInfo.url}`); + console.log(` signing: ${fingerprintKey(targetInfo.publicKey).slice(0, 16)}…`); + console.log(` encryption: ${fingerprintKey(targetInfo.encryptionPublicKey).slice(0, 16)}…`); + console.log(` peers: ${targetInfo.peers.length}`); + } catch (err) { + console.error(`Cannot reach Server C at ${targetUrl}/discover: ${err instanceof Error ? err.message : err}`); + console.error("\n If C is firewalled from A, pass --test-fallback (load C from A’s registry)."); + process.exit(1); + } +} + +const aOnB = proxyInfo.peers.some((p) => p.url === ORIGIN); +console.log(` A registered on B: ${aOnB}`); + +if (!aOnB) { + console.error("\n A is not registered on B. Run mutual discovery first."); + process.exit(1); +} + +if (!isFallbackMode) { + const aOnC = targetInfo.peers.some((p) => p.url === ORIGIN); + console.log(` A registered on C: ${aOnC}`); + if (!aOnC) { + console.error("\n A is not registered on C. Run mutual discovery first."); + process.exit(1); + } +} + +const targetPostsUrl = `${targetInfo.url.replace(/\/$/, "")}/api/auth/social/posts`; + +// --------------------------------------------------------------------------- +// 2. Optional: confirm C is unreachable when --test-fallback +// --------------------------------------------------------------------------- + +if (isFallbackMode) { + console.log("\n── Test: direct fetch to C fails (blocked) ─────────────"); + + const testName = "direct fetch to C fails"; + try { + const res = await fetch(`${targetUrl}/discover`, { + signal: AbortSignal.timeout(5_000), + }); + fail(testName, `direct fetch succeeded (${res.status}) — C is not blocked from A. Block it first.`); + } catch { + pass(testName, "C is unreachable from A (blocked)"); + } +} + +// --------------------------------------------------------------------------- +// 3. Real createPost on A → worker → C (via proxy when needed) +// --------------------------------------------------------------------------- + +console.log("\n── Test: post delivery via A API + worker ──────────────"); + +{ + const testName = "POST /api/auth/social/posts → deliver-post job completes"; + try { + console.log(` Creating post on A; expecting delivery to ${targetPostsUrl}…`); + + const postRes = await fetch(`${ORIGIN}/api/auth/social/posts`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${bearerToken}`, + }, + body: JSON.stringify(DEFAULT_POST_CONTENT), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const postBody = await postRes.json(); + + if (!postRes.ok) { + fail( + testName, + `createPost failed (${postRes.status}): ${JSON.stringify(postBody)} — need accepted remote followers on C and propagating policy`, + ); + } else { + const postId = postBody.id as string | undefined; + if (!postId) { + fail(testName, `no post id in response: ${JSON.stringify(postBody)}`); + } else { + console.log(` Post created: ${postId}`); + console.log(" Waiting for BullMQ worker to deliver FEDERATE_POST…"); + + await new Promise((r) => setTimeout(r, 300)); + + const jobsForPost = await db + .select() + .from(deliveryJobs) + .where(like(deliveryJobs.payload, `%${postId}%`)); + + if (jobsForPost.length === 0) { + fail( + testName, + "No delivery_jobs row for this post after createPost — propagation off, federationDeliveriesQueued was 0, or bug.", + ); + } else { + const forTarget = jobsForPost.filter((j) => j.targetUrl === targetPostsUrl); + if (forTarget.length === 0) { + const urls = [...new Set(jobsForPost.map((j) => j.targetUrl))].join(", "); + fail( + testName, + `Delivery job(s) target other URL(s): ${urls} — expected ${targetPostsUrl} (followerServerUrl must match C).`, + ); + } else { + const maxWait = 60_000; + const pollInterval = 2_000; + let elapsed = 300; + let delivered = false; + + while (elapsed < maxWait) { + await new Promise((r) => setTimeout(r, pollInterval)); + elapsed += pollInterval; + + const pendingJobs = await db + .select() + .from(deliveryJobs) + .where( + and( + eq(deliveryJobs.targetUrl, targetPostsUrl), + like(deliveryJobs.payload, `%${postId}%`), + ), + ) + .orderBy(desc(deliveryJobs.createdAt)) + .limit(5); + + process.stdout.write( + `\r Polling… ${Math.round(elapsed / 1000)}s — pending jobs for this post: ${pendingJobs.length} `, + ); + + if (pendingJobs.length === 0) { + delivered = true; + break; + } + } + + console.log(""); + + if (delivered) { + pass(testName, "delivery job finished (worker reached C, direct or via proxy)"); + } else { + fail( + testName, + `timed out after ${maxWait / 1000}s with jobs still pending. ` + + `Check worker (DEBUG=app:federation:*), Redis, proxy, and firewall.`, + ); + } + } + } + } + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } +} + +// --------------------------------------------------------------------------- +// 4. Optional: propagation on but no remote URLs (200, zero deliveries) +// --------------------------------------------------------------------------- + +if (testNoRemoteFollowers) { + console.log("\n── Test: createPost 200 + federationDeliveriesQueued === 0 ─"); + + const testName = "createPost saves post but queues no federation deliveries"; + try { + const postRes = await fetch(`${ORIGIN}/api/auth/social/posts`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${bearerToken}`, + }, + body: JSON.stringify(DEFAULT_POST_CONTENT), + signal: AbortSignal.timeout(FETCH_TIMEOUT_MS), + }); + + const postBody = await postRes.json(); + + if ( + postRes.status === 200 && + postBody.id && + postBody.federationDeliveriesQueued === 0 + ) { + pass(testName, `post ${postBody.id} — no remote follower server URLs under propagation`); + } else if (postRes.ok && postBody.federationDeliveriesQueued > 0) { + fail( + testName, + `expected federationDeliveriesQueued === 0, got ${postBody.federationDeliveriesQueued} (user has remote followers or wrong test account).`, + ); + } else { + fail( + testName, + `expected 200 with id and federationDeliveriesQueued 0, got ${postRes.status}: ${JSON.stringify(postBody)}`, + ); + } + } catch (err) { + fail(testName, `${err instanceof Error ? err.message : err}`); + } +} + +// --------------------------------------------------------------------------- +// Summary +// --------------------------------------------------------------------------- + +const passed = results.filter((r) => r.passed); +const failed = results.filter((r) => !r.passed); + +console.log("\n════════════════════════════════════════════════════════"); +console.log(`Results: ${passed.length} passed, ${failed.length} failed out of ${results.length}`); + +if (failed.length > 0) { + console.error("\nFailed tests:"); + failed.forEach((f) => console.error(` ✘ ${f.name}: ${f.message}`)); + process.exit(1); +} + +console.log("\nAll tests passed."); +process.exit(0); diff --git a/tsconfig.json b/tsconfig.json index 06d4df8..641f35a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,45 +1,46 @@ { - "compilerOptions": { - "target": "ES2017", - "lib": [ - "dom", - "dom.iterable", - "esnext" - ], - "allowJs": true, - "skipLibCheck": true, - "strict": true, - "noEmit": true, - "esModuleInterop": true, - "module": "esnext", - "moduleResolution": "bundler", - "resolveJsonModule": true, - "isolatedModules": true, - "jsx": "react-jsx", - "incremental": true, - "plugins": [ - { - "name": "next" - } - ], - "paths": { - "@/*": [ - "./src/*" - ], - "@/plugins/*": [ - "./src/lib/plugins/*" - ], - } - }, - "include": [ - "next-env.d.ts", - "**/*.ts", - "**/*.tsx", - ".next/types/**/*.ts", - ".next/dev/types/**/*.ts", - "**/*.mts" - ], - "exclude": [ - "node_modules" - ] -} \ No newline at end of file + "compilerOptions": { + "target": "ES2017", + "lib": [ + "dom", + "dom.iterable", + "esnext" + ], + "allowJs": true, + "skipLibCheck": true, + "strict": true, + "noEmit": true, + "esModuleInterop": true, + "module": "esnext", + "moduleResolution": "bundler", + "resolveJsonModule": true, + "isolatedModules": true, + "jsx": "react-jsx", + "incremental": true, + "plugins": [ + { + "name": "next" + } + ], + "paths": { + "@/*": [ + "./src/*" + ], + "@/plugins/*": [ + "./src/lib/plugins/*" + ] + } + }, + "include": [ + "next-env.d.ts", + "**/*.ts", + "**/*.tsx", + ".next/types/**/*.ts", + ".next/dev/types/**/*.ts", + "**/*.mts", + ".next/dev/dev/types/**/*.ts" + ], + "exclude": [ + "node_modules" + ] +}