diff --git a/src/lib/bull/README.md b/src/lib/bull/README.md new file mode 100644 index 0000000..617677b --- /dev/null +++ b/src/lib/bull/README.md @@ -0,0 +1,353 @@ +# Bull — Federation Background Job Queue + +## Overview + +The `bull` module provides a Redis-backed background job processing system for the federation layer. It prevents long-running network operations — such as delivering activity payloads to remote servers or probing server health — from blocking the main request/response cycle. + +Built on [BullMQ](https://docs.bullmq.io/), it exposes two queues and their corresponding workers: + +- **Federation delivery queue** — encrypts and delivers activity payloads (follows, posts, inserts, unfollows) to remote servers, processes acknowledgment responses, and performs automatic cleanup of delivery records. +- **Health-check queue** — periodically probes servers marked as unhealthy, re-classifying them as healthy when they respond successfully, with exponential back-off and a configurable retry limit. + +Workers are designed to be started once at application bootstrap and run for the lifetime of the process. + +--- + +## Interfaces and Types + +### `FederationDeliveryJob` + +```ts +interface FederationDeliveryJob { + deliveryJobId: string; + targetUrl: string; + serverUrl: string; + payload: string; +} +``` + +Defines the data contract for a federation delivery job. + +| Field | Type | Description | +| -------------- | -------- | --------------------------------------------------------------------------- | +| `deliveryJobId`| `string` | Primary key of the corresponding row in the `deliveryJobs` database table. | +| `targetUrl` | `string` | Full URL of the remote server's federation inbox endpoint. | +| `serverUrl` | `string` | Origin URL of the target server (used for registry lookups and blacklisting).| +| `payload` | `string` | Serialized JSON string containing the activity method and associated data. | + +### `HealthCheckJob` + +```ts +interface HealthCheckJob { + serverUrl: string; +} +``` + +Defines the data contract for a health-check job. + +| Field | Type | Description | +| ----------- | -------- | ------------------------------------------- | +| `serverUrl` | `string` | The remote server URL to probe for health. | + +--- + +## Exported Symbols and Functions + +### `DELIVERY_QUEUE_NAME` + +```ts +const DELIVERY_QUEUE_NAME = 'federation-delivery'; +``` + +The Redis queue name for federation delivery jobs. + +--- + +### `HEALTH_CHECK_QUEUE_NAME` + +```ts +const HEALTH_CHECK_QUEUE_NAME = 'federation-health-check'; +``` + +The Redis queue name for health-check jobs. + +--- + +### `getRedisConnection()` + +```ts +function getRedisConnection(): Redis +``` + +Returns a singleton Redis connection (via `ioredis`) configured with `maxRetriesPerRequest: null` as required by BullMQ. The connection URL is read from the `REDIS_URL` environment variable. Intended for queue producers (`getFederationQueue`, `getHealthCheckQueue`). + +**Throws** if `REDIS_URL` is not set. + +--- + +### `getRedisWorkerConnection()` + +```ts +function getRedisWorkerConnection(): Redis +``` + +Returns a separate singleton Redis connection dedicated to BullMQ `Worker` instances. Keeping worker connections distinct from producer connections prevents back-pressure on queue-enqueue operations when workers are under high load. + +**Throws** if `REDIS_URL` is not set. + +--- + +### `getFederationQueue()` + +```ts +function getFederationQueue(): Queue +``` + +Returns a singleton `Queue` instance backed by the `federation-delivery` queue. + +**Default job options:** + +| Option | Value | Rationale | +| ----------------- | ---------------- | ------------------------------------------------------------ | +| `attempts` | `5` | Up to 5 retries before the job is marked as failed. | +| `backoff` | exponential, 5s | Delay doubles on each retry: 5s, 10s, 20s, 40s. | +| `removeOnComplete`| `{ age: 86400 }` | Completed jobs are pruned after 24 hours. | +| `removeOnFail` | `{ age: 604800 }` | Failed jobs are retained for 7 days for diagnostics. | + +--- + +### `getHealthCheckQueue()` + +```ts +function getHealthCheckQueue(): Queue +``` + +Returns a singleton `Queue` instance backed by the `federation-health-check` queue. No custom default job options are applied. + +--- + +### `scheduleHealthCheck()` + +```ts +function scheduleHealthCheck(serverUrl: string, attempt: number): Promise +``` + +Schedules a delayed health-check job for a remote server. + +**Parameters:** + +| Parameter | Type | Description | +| ----------- | -------- | --------------------------------------------------------------------- | +| `serverUrl` | `string` | The remote server URL to check. | +| `attempt` | `number` | Zero-based attempt counter; used to compute the delay and job ID. | + +**Internal logic:** + +1. Computes the delay as `(5 + attempt * 10)` minutes. +2. Derives a deterministic job ID from the server URL using SHA-256 (first 16 hex chars) to avoid collisions between URLs that differ only in non-alphanumeric characters. +3. Adds a single-shot job (auto-removed on completion or failure) to the health-check queue. + +**Returns:** `Promise` + +--- + +### `startFederationWorker()` + +```ts +function startFederationWorker(): { deliveryWorker: Worker; healthCheckWorker: Worker } +``` + +Creates and returns a pair of BullMQ workers that process the federation delivery and health-check queues. This function is idempotent: subsequent calls return the same worker instances. + +Workers use a dedicated Redis connection via `getRedisWorkerConnection()`, separate from the connection used by queue producers (`getRedisConnection()`). This prevents worker processing from starving queue-enqueue operations on the main thread. + +**Delivery worker configuration:** + +| Option | Value | +| ------------- | ------- | +| `concurrency` | `10` | + +**Health-check worker configuration:** + +| Option | Value | +| ------------- | ------- | +| `concurrency` | `3` | + +**Lifecycle events:** + +| Worker | Event | Behavior | +| -------------- | ----------- | --------------------------------------------------------------------------- | +| Delivery | `ready` | Logs a confirmation that the worker is connected to Redis. | +| Delivery | `failed` | Logs the job ID, method, target URL, attempt count, remaining retries, and error. | +| Delivery | `completed` | Deletes the corresponding `deliveryJobs` database row. | +| Delivery | `error` | Logs a generic worker-level error to the console. | +| Health-check | `ready` | Logs a confirmation that the worker is connected to Redis. | +| Health-check | `failed` | Logs the job ID and error message. | +| Health-check | `error` | Logs a generic worker-level error to the console. | + +**Returns:** `{ deliveryWorker, healthCheckWorker }` + +--- + +### `processFederationDelivery(job)` + +```ts +function processFederationDelivery(job: Job): Promise +``` + +The core processor for federation delivery jobs. Executed by the delivery worker for each queue entry. + +**Processing steps:** + +1. **Method validation** — Parses the `payload` JSON and validates that the result is an object with a string-typed `method` field (guards against JSON primitives like `null`, `42`, or `"str"` that would pass JSON.parse but throw a TypeError on property access). If `method` is missing, non-string, or not one of `FEDERATE`, `FEDERATE_POST`, `INSERT`, `UNFOLLOW`, the job fails immediately with an `UnrecoverableError` and its `deliveryJobs` row is deleted. +2. **Blacklist check** — Queries the `blacklistedServers` table. If the `serverUrl` is blacklisted, the job is dropped with an `UnrecoverableError` and the row is cleaned up. +3. **Key resolution** — Looks up the target server in the `serverRegistry` table. If the server is not yet registered, automatic discovery is attempted via `discoverAndRegister()`. If discovery fails, a retryable error is thrown. +4. **Encryption** — Encodes the payload using the target server's `encryptionPublicKey` (base64-decoded into a `Uint8Array`) via `encryptPayload()`. +5. **Database update** — Sets `lastAttemptedAt` and increments `attempts` on the delivery job record. +6. **HTTP delivery** — Validates that `BETTER_AUTH_URL` is set (throws `UnrecoverableError` if missing). Signs the original plaintext payload with the local server's signing key and sends the encrypted payload via `federationFetch()` with a 15-second timeout and proxy fallback. A non-OK response throws a retryable error. +7. **Ack parsing** — Attempts to parse the response body as JSON (throws `UnrecoverableError` on non-JSON response). Inspects the payload for a `PROXY_RESPONSE` acknowledgment nested under `responseBody.payload`. +8. **Ack dispatch** — Routes the acknowledgment to a job-name-specific handler (e.g. `deliver-follow` → `handleFollowAck`). If no handler is registered, the ack is silently ignored. + +**Throws:** + +- `UnrecoverableError` — Malformed payload, missing or non-string method, invalid method, blacklisted server, missing `BETTER_AUTH_URL`, non-JSON response, or missing acknowledgment. +- `Error` — Auto-discovery failure or HTTP delivery failure (retryable by BullMQ). + +--- + +### `processHealthCheck(job)` + +```ts +function processHealthCheck(job: Job): Promise +``` + +The core processor for health-check jobs. Executed by the health-check worker. + +**Processing steps:** + +1. **Server lookup** — Queries the `serverRegistry` table. If the server is not found or is already marked healthy, the job exits early. +2. **Threat-policy check** — If the server has an `unhealthyReason`, the corresponding threat policy is consulted via `getThreatPolicy()`. If the reason is not `directHealthCheckable`, the job skips further processing. +3. **Probe** — Sends an HTTP `GET` to `/discover` with an 8-second timeout. +4. **Success** — If the response is OK, the server is marked healthy via `markServerHealthy()` and the job completes. +5. **Failure** — On HTTP error or network exception, the attempt counter is atomically incremented in the database via `sql` fragment (`healthCheckAttempts + 1`), avoiding read-modify-write races between concurrent worker instances. If fewer than `MAX_HEALTH_CHECK_ATTEMPTS` (5) have been made, a follow-up health-check job is scheduled with exponential delay. Once exhausted, a warning is logged and no further checks are scheduled. + +**Returns:** `Promise` + +--- + +### `handleFollowAck(ackPayload, serverUrl, cachedServerPublicKey, deliveryJobId, jobId)` + +```ts +function handleFollowAck( + ackPayload: AckPayload, + serverUrl: string, + cachedServerPublicKey: string | undefined, + deliveryJobId: string, + jobId: string | undefined, +): Promise +``` + +Processes the acknowledgment (`PROXY_RESPONSE`) for a `deliver-follow` job. + +**Parameters:** + +| Parameter | Type | Description | +| -----------------------| ----------------------- | ------------------------------------------------ | +| `ackPayload` | `AckPayload` | The acknowledgment payload containing signature and decrypted data. | +| `serverUrl` | `string` | Origin URL of the remote server. | +| `cachedServerPublicKey`| `string \| undefined` | The server's signing public key, if already known from the registry at delivery time. | +| `deliveryJobId` | `string` | ID of the delivery job record for cleanup. | +| `jobId` | `string \| undefined` | BullMQ job ID for diagnostic logging. | + +**Internal logic:** + +1. Parses the decrypted payload against `FollowEnvelopeSchema`. Invalid payloads cause an `UnrecoverableError` and delete the delivery job record. +2. Resolves the remote server's signing public key (bumps the database if not cached from the delivery phase). +3. Verifies the cryptographic signature on the acknowledgment. A failed signature check throws `UnrecoverableError`. +4. Looks up the local `follows` row matching `followerId`, `followingId`, and `followerServerUrl`. If no matching row exists, the ack is silently ignored (the remote acknowledged a follow this node does not know about). +5. If the remote `accepted` is explicitly `false`, the local follow record is updated with `acknowledged: true` (the remote explicitly rejected the follow). +6. If the remote `accepted` is explicitly `true`, the local `accepted` column is updated to `true`. +7. If `accepted` is `undefined`/`null`, the local follow record is updated with `acknowledged: true` only (the remote acknowledged receipt without indicating an acceptance state). + +**Throws:** + +- `UnrecoverableError` — Invalid follow payload, missing signing public key, or signature verification failure. + +--- + +## Usage Example + +```ts +// app/bootstrap.ts +import { startFederationWorker, getFederationQueue, scheduleHealthCheck } from '@/lib/bull'; + +// ──────────────────────────────────────────── +// Start workers at application bootstrap +// ──────────────────────────────────────────── +const workers = startFederationWorker(); +// workers.deliveryWorker — processes federation-delivery queue +// workers.healthCheckWorker — processes federation-health-check queue + +// ──────────────────────────────────────────── +// Enqueue a federation delivery job +// ──────────────────────────────────────────── +const queue = getFederationQueue(); +await queue.add('deliver-follow', { + deliveryJobId: 'abc-123', + targetUrl: 'https://remote.example.com/inbox', + serverUrl: 'https://remote.example.com', + payload: JSON.stringify({ + method: 'FEDERATE', + // ... activity data + }), +}); + +// ──────────────────────────────────────────── +// Schedule a delayed health check +// ──────────────────────────────────────────── +await scheduleHealthCheck('https://remote.example.com', 0); +// Runs in ~5 minutes; doubles delay on each retry. +``` + +--- + +## Error Handling + +### Unrecoverable Errors (BullMQ `UnrecoverableError`) + +Jobs that throw `UnrecoverableError` are immediately marked as failed and **will not be retried**, even if the queue's `attempts` option is greater than 1. + +| Scenario | Thrown From | Description | +| --------------------------------- | ------------------------------ | ------------------------------------------------------- | +| Malformed payload JSON | `processFederationDelivery` | The job payload cannot be parsed as valid JSON. | +| Missing or non-string method | `processFederationDelivery` | The `method` field is missing, not a string, or not in the allowed set.| +| Blacklisted target server | `processFederationDelivery` | The target server is in the `blacklistedServers` table. | +| Missing `BETTER_AUTH_URL` | `processFederationDelivery` | The environment variable is not set; federation requests cannot be sent. | +| Non-JSON response from remote | `processFederationDelivery` | The remote returned a 200 OK with a non-JSON body. | +| Missing acknowledgment | `processFederationDelivery` | The remote response does not contain a `PROXY_RESPONSE` payload.| +| Invalid follow ack payload | `handleFollowAck` | The decrypted payload fails `FollowEnvelopeSchema` validation. | +| Missing signing public key | `handleFollowAck` | The server has no `publicKey` in the registry to verify the ack signature. | +| Signature verification failure | `handleFollowAck` | The cryptographic signature on the ack does not match. | + +### Retryable Errors + +Jobs that throw a regular `Error` are returned to the queue and retried according to the queue's backoff configuration. + +| Scenario | Thrown From | Description | +| --------------------------------- | ------------------------------ | ------------------------------------------------------- | +| Auto-discovery failure | `processFederationDelivery` | The server is not in the registry and `discoverAndRegister` throws a non-`DiscoveryError`. | +| HTTP delivery failure | `processFederationDelivery` | The remote endpoint returns a non-OK HTTP status code. | +| Network / fetch error | `processFederationDelivery` | `federationFetch` throws due to timeout, DNS failure, etc. | + +### Silent Skips (No Error) + +| Scenario | Location | Description | +| --------------------------------- | ----------------------------- | ------------------------------------------------------- | +| Unhealthy reason not checkable | `processHealthCheck` | The server's threat policy forbids direct health checks.| +| Server already healthy | `processHealthCheck` | The server is already marked healthy in the registry. | +| Server not in registry | `processHealthCheck` | The server was removed or never registered. | +| Unknown follow ack | `handleFollowAck` | The local `follows` table has no matching row for the acknowledged follow. | + +### Worker-Level Errors + +Worker-level errors (e.g. Redis connection loss) are emitted via the worker's `error` event and logged to the console. These do **not** affect individual jobs; BullMQ will re-establish the connection automatically. diff --git a/src/lib/bull/connection.ts b/src/lib/bull/connection.ts index 1dd42d4..ae23c82 100644 --- a/src/lib/bull/connection.ts +++ b/src/lib/bull/connection.ts @@ -1,6 +1,7 @@ import Redis from 'ioredis'; let _redis: Redis | null = null; +let _workerRedis: Redis | null = null; export function getRedisConnection(): Redis { if (!_redis) { @@ -8,3 +9,12 @@ export function getRedisConnection(): Redis { } return _redis; } + +export function getRedisWorkerConnection(): Redis { + if (!_workerRedis) { + _workerRedis = new Redis(process.env.REDIS_URL!, { + maxRetriesPerRequest: null, + }); + } + return _workerRedis; +} diff --git a/src/lib/bull/processors/delivery.ts b/src/lib/bull/processors/delivery.ts index c2d8fce..690384c 100644 --- a/src/lib/bull/processors/delivery.ts +++ b/src/lib/bull/processors/delivery.ts @@ -3,11 +3,11 @@ import { blacklistedServers, deliveryJobs, serverRegistry } from '@/lib/db/schem import { federationFetch } from '@/lib/federation/fetch'; import { encryptPayload, getOwnSigningSecretKey, signMessage } from '@/lib/federation/keytools'; import { discoverAndRegister, DiscoveryError } from '@/lib/federation/registry'; -import type { FederationDeliveryJob } from '../queues'; -import { handleFollowAck } from './handlers/follow'; import { UnrecoverableError, type Job } from 'bullmq'; import createDebug from 'debug'; import { eq } from 'drizzle-orm'; +import type { FederationDeliveryJob } from '../queues'; +import { handleFollowAck } from './handlers/follow'; const debug = createDebug('app:federation:worker'); @@ -31,6 +31,14 @@ const ackHandlers: Record = { 'deliver-follow': handleFollowAck, }; +function getFederationOrigin(): string { + const origin = process.env.BETTER_AUTH_URL; + if (!origin) { + throw new UnrecoverableError('BETTER_AUTH_URL environment variable is not set, cannot send federation requests'); + } + return origin; +} + // --------------------------------------------------------------------------- // Main processor // --------------------------------------------------------------------------- @@ -40,15 +48,22 @@ export async function processFederationDelivery(job: Job) debug('processing job %s (%s) → %s (attempt %d)', job.id, job.name, targetUrl, job.attemptsMade + 1); // 1. Validate method early — before any I/O. - let method: string; + let parsedPayload: Record; try { - method = JSON.parse(payload).method; + parsedPayload = JSON.parse(payload); } catch { await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); throw new UnrecoverableError(`Malformed payload JSON, dropping job ${job.id}`); } - if (!method || !ALLOWED_METHODS.has(method)) { + if (typeof parsedPayload?.method !== 'string') { + await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); + throw new UnrecoverableError(`Payload missing or non-string method, dropping job ${job.id}`); + } + + const method = parsedPayload.method; + + if (!ALLOWED_METHODS.has(method)) { debug('invalid method: %s, dropping job %s', method, job.id); await db.delete(deliveryJobs).where(eq(deliveryJobs.id, deliveryJobId)); throw new UnrecoverableError(`Invalid method: ${method}, dropping job ${job.id}`); @@ -111,12 +126,14 @@ export async function processFederationDelivery(job: Job) debug('sending encrypted payload to %s', targetUrl); const signature = signMessage(payload, getOwnSigningSecretKey()); + const origin = getFederationOrigin(); + const { response } = await federationFetch(targetUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', - 'Origin': process.env.BETTER_AUTH_URL!, - 'X-Federation-Origin': process.env.BETTER_AUTH_URL!, + 'Origin': origin, + 'X-Federation-Origin': origin, 'X-Federation-Target': targetUrl, }, body: JSON.stringify({ method, payload: encrypted, signature }), @@ -131,20 +148,33 @@ export async function processFederationDelivery(job: Job) } // 6. Parse ack. - const responseBody = await response.json(); - debug('delivery to %s response body: %o', targetUrl, responseBody); + let responseBody: unknown; + try { + responseBody = await response.json(); + } catch { + throw new UnrecoverableError( + `Federation delivery to ${targetUrl} returned non-JSON response`, + ); + } + + debug('delivery to %s acknowledged (body length: %d)', targetUrl, JSON.stringify(responseBody).length); const ackPayload: AckPayload | null = - responseBody.payload?.method === 'PROXY_RESPONSE' - ? responseBody.payload - : responseBody.method === 'PROXY_RESPONSE' - ? responseBody - : null; + responseBody && typeof responseBody === 'object' && 'payload' in (responseBody as Record) && (responseBody as Record).payload !== null + ? ((responseBody as Record).payload as AckPayload | null) + : null; if (!ackPayload) { debug('delivery to %s not acknowledged', targetUrl); throw new UnrecoverableError( - `Federation delivery to ${targetUrl} not acknowledged: ${JSON.stringify(responseBody)}`, + `Federation delivery to ${targetUrl} not acknowledged`, + ); + } + + if (ackPayload.method !== 'PROXY_RESPONSE') { + debug('delivery to %s not acknowledged', targetUrl); + throw new UnrecoverableError( + `Federation delivery to ${targetUrl} not acknowledged`, ); } diff --git a/src/lib/bull/processors/handlers/follow.ts b/src/lib/bull/processors/handlers/follow.ts index e0621f3..6f66df4 100644 --- a/src/lib/bull/processors/handlers/follow.ts +++ b/src/lib/bull/processors/handlers/follow.ts @@ -74,7 +74,6 @@ export async function handleFollowAck( const followData = decrypted.data.following; - // Verify the row exists locally before applying the remote's accepted flag. const [existing] = await db .select({ id: follows.id }) @@ -98,8 +97,8 @@ export async function handleFollowAck( return; } - if (!followData?.accepted) { - debug('follow %s is not accepted but was acknowledged, setting acknowledged to true', followData.id); + if (followData.accepted === false) { + debug('follow %s rejected by remote, setting acknowledged', followData.id); await db.update(follows).set({ acknowledged: true }).where( and( eq(follows.followerId, followData.followerId), @@ -107,20 +106,29 @@ export async function handleFollowAck( eq(follows.followerServerUrl, serverUrl), ), ); - debug('follow %s acknowledged', existing.id); + debug('follow %s acknowledged (rejected)', existing.id); return; } - await db - .update(follows) - .set({ accepted: followData.accepted }) - .where( + if (followData.accepted === true) { + await db.update(follows).set({ accepted: true }).where( and( eq(follows.followerId, followData.followerId), eq(follows.followingId, followData.followingId), eq(follows.followerServerUrl, serverUrl), ), ); + debug('follow %s accepted by remote', followData.id); + return; + } - debug('updated follow %s accepted=%s', followData.id, followData.accepted); + // Remote acknowledged receipt without accepted state; just mark acknowledged. + debug('follow %s acknowledged without accepted state', followData.id); + await db.update(follows).set({ acknowledged: true }).where( + and( + eq(follows.followerId, followData.followerId), + eq(follows.followingId, followData.followingId), + eq(follows.followerServerUrl, serverUrl), + ), + ); } diff --git a/src/lib/bull/processors/health-check.ts b/src/lib/bull/processors/health-check.ts index d1fb614..f7b3dcc 100644 --- a/src/lib/bull/processors/health-check.ts +++ b/src/lib/bull/processors/health-check.ts @@ -7,7 +7,7 @@ import { markServerHealthy } from '@/lib/federation/registry'; import { getThreatPolicy } from '@/lib/federation/threat-model'; import type { Job } from 'bullmq'; import createDebug from 'debug'; -import { eq } from 'drizzle-orm'; +import { eq, sql } from 'drizzle-orm'; import { scheduleHealthCheck, type HealthCheckJob } from '../queues'; const debug = createDebug('app:federation:worker'); @@ -40,7 +40,7 @@ export async function processHealthCheck(job: Job): Promise): Promise( DELIVERY_QUEUE_NAME, processFederationDelivery, { - connection: getRedisConnection() as never, + connection: getRedisWorkerConnection() as never, concurrency: 10, }, ); @@ -65,7 +64,7 @@ export function startFederationWorker(): WorkerHandles { HEALTH_CHECK_QUEUE_NAME, processHealthCheck, { - connection: getRedisConnection() as never, + connection: getRedisWorkerConnection() as never, concurrency: 3, }, );