diff --git a/README.md b/README.md index a22d2f1..e64383d 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,25 @@ -# Rondevu +# Rondevu Server -🎯 **Simple WebRTC peer signaling** +🌐 **Topic-based peer discovery and WebRTC signaling** -Direct peer-to-peer connections via offer/answer exchange. +Scalable peer-to-peer connection establishment with topic-based discovery, stateless authentication, and complete WebRTC signaling. **Related repositories:** -- [rondevu-client](https://github.com/xtr-dev/rondevu-client) - TypeScript client library -- [rondevu-demo](https://github.com/xtr-dev/rondevu-demo) - Interactive demo +- [@xtr-dev/rondevu-client](https://www.npmjs.com/package/@xtr-dev/rondevu-client) - TypeScript client library +- [rondevu-demo](https://rondevu-demo.pages.dev) - Interactive demo --- -## Rondevu Server +## Features -HTTP signaling server for WebRTC peer connection establishment. Supports SQLite (Node.js/Docker) and Cloudflare D1 (Workers) storage backends. +- **Topic-Based Discovery**: Tag offers with topics (e.g., torrent infohashes) for efficient peer finding +- **Stateless Authentication**: AES-256-GCM encrypted credentials, no server-side sessions +- **Bloom Filters**: Client-side peer exclusion for efficient discovery +- **Multi-Offer Support**: Create multiple offers per peer simultaneously +- **Complete WebRTC Signaling**: Offer/answer exchange and ICE candidate relay +- **Dual Storage**: SQLite (Node.js/Docker) and Cloudflare D1 (Workers) backends -### Quick Start +## Quick Start **Node.js:** ```bash @@ -31,37 +36,148 @@ docker build -t rondevu . && docker run -p 3000:3000 -e STORAGE_PATH=:memory: ro npx wrangler deploy ``` -### API +## API Endpoints -```bash -# Create offer -POST /offer {"peerId":"alice","offer":"...","code":"my-room"} +### Public Endpoints -# Send answer/candidates -POST /answer {"code":"my-room","answer":"...","side":"answerer"} +#### `GET /` +Returns server version and info -# Poll for updates -POST /poll {"code":"my-room","side":"offerer"} +#### `GET /health` +Health check endpoint with version -# Health check with version -GET /health +#### `POST /register` +Register a new peer and receive credentials (peerId + secret) -# Version info -GET / +**Response:** +```json +{ + "peerId": "f17c195f067255e357232e34cf0735d9", + "secret": "DdorTR8QgSn9yngn+4qqR8cs1aMijvX..." +} ``` -### Configuration +#### `GET /topics?limit=50&offset=0` +List all topics with active peer counts (paginated) + +**Query Parameters:** +- `limit` (optional): Maximum number of topics to return (default: 50, max: 200) +- `offset` (optional): Number of topics to skip (default: 0) + +**Response:** +```json +{ + "topics": [ + {"topic": "movie-xyz", "activePeers": 42}, + {"topic": "torrent-abc", "activePeers": 15} + ], + "total": 123, + "limit": 50, + "offset": 0 +} +``` + +#### `GET /offers/by-topic/:topic?limit=50&bloom=...` +Find offers by topic with optional bloom filter exclusion + +**Query Parameters:** +- `limit` (optional): Maximum offers to return (default: 50, max: 200) +- `bloom` (optional): Base64-encoded bloom filter to exclude known peers + +**Response:** +```json +{ + "topic": "movie-xyz", + "offers": [ + { + "id": "offer-id", + "peerId": "peer-id", + "sdp": "v=0...", + "topics": ["movie-xyz", "hd-content"], + "expiresAt": 1234567890, + "lastSeen": 1234567890 + } + ], + "total": 42, + "returned": 10 +} +``` + +#### `GET /peers/:peerId/offers` +View all offers from a specific peer + +### Authenticated Endpoints + +All authenticated endpoints require `Authorization: Bearer {peerId}:{secret}` header. + +#### `POST /offers` +Create one or more offers + +**Request:** +```json +{ + "offers": [ + { + "sdp": "v=0...", + "topics": ["movie-xyz", "hd-content"], + "ttl": 300000 + } + ] +} +``` + +#### `GET /offers/mine` +List all offers owned by authenticated peer + +#### `PUT /offers/:offerId/heartbeat` +Update last_seen timestamp for an offer + +#### `DELETE /offers/:offerId` +Delete a specific offer + +#### `POST /offers/:offerId/answer` +Answer an offer (locks it to answerer) + +**Request:** +```json +{ + "sdp": "v=0..." +} +``` + +#### `GET /offers/answers` +Poll for answers to your offers + +#### `POST /offers/:offerId/ice-candidates` +Post ICE candidates for an offer + +**Request:** +```json +{ + "candidates": ["candidate:1 1 UDP..."] +} +``` + +#### `GET /offers/:offerId/ice-candidates?since=1234567890` +Get ICE candidates from the other peer + +## Configuration Environment variables: | Variable | Default | Description | |----------|---------|-------------| | `PORT` | `3000` | Server port (Node.js/Docker) | -| `OFFER_TIMEOUT` | `60000` | Offer timeout in milliseconds (1 minute) | | `CORS_ORIGINS` | `*` | Comma-separated allowed origins | -| `STORAGE_PATH` | `./offers.db` | SQLite database path (use `:memory:` for in-memory) | -| `VERSION` | `0.0.1` | Server version (semver) | +| `STORAGE_PATH` | `./rondevu.db` | SQLite database path (use `:memory:` for in-memory) | +| `VERSION` | `0.4.0` | Server version (semver) | +| `AUTH_SECRET` | Random 32-byte hex | Secret key for credential encryption | +| `OFFER_DEFAULT_TTL` | `300000` | Default offer TTL in ms (5 minutes) | +| `OFFER_MIN_TTL` | `60000` | Minimum offer TTL in ms (1 minute) | +| `OFFER_MAX_TTL` | `3600000` | Maximum offer TTL in ms (1 hour) | +| `MAX_OFFERS_PER_REQUEST` | `10` | Maximum offers per create request | +| `MAX_TOPICS_PER_OFFER` | `20` | Maximum topics per offer | -### License +## License MIT diff --git a/package.json b/package.json index 4828cc3..7f65755 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@xtr-dev/rondevu-server", - "version": "0.0.1", - "description": "Open signaling and tracking server for peer discovery in distributed P2P applications", + "version": "0.1.0", + "description": "Topic-based peer discovery and signaling server for distributed P2P applications", "main": "dist/index.js", "scripts": { "build": "node build.js", diff --git a/src/app.ts b/src/app.ts index e9aad56..fd8ed36 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,20 +1,21 @@ import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { Storage } from './storage/types.ts'; +import { Config } from './config.ts'; +import { createAuthMiddleware, getAuthenticatedPeerId } from './middleware/auth.ts'; +import { generatePeerId, encryptPeerId } from './crypto.ts'; +import { parseBloomFilter } from './bloom.ts'; import type { Context } from 'hono'; -export interface AppConfig { - offerTimeout: number; - corsOrigins: string[]; - version?: string; -} - /** - * Creates the Hono application with WebRTC signaling endpoints + * Creates the Hono application with topic-based WebRTC signaling endpoints */ -export function createApp(storage: Storage, config: AppConfig) { +export function createApp(storage: Storage, config: Config) { const app = new Hono(); + // Create auth middleware + const authMiddleware = createAuthMiddleware(config.authSecret); + // Enable CORS with dynamic origin handling app.use('/*', cors({ origin: (origin) => { @@ -29,8 +30,8 @@ export function createApp(storage: Storage, config: AppConfig) { // Default to first allowed origin return config.corsOrigins[0]; }, - allowMethods: ['GET', 'POST', 'OPTIONS'], - allowHeaders: ['Content-Type', 'Origin'], + allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], + allowHeaders: ['Content-Type', 'Origin', 'Authorization'], exposeHeaders: ['Content-Type'], maxAge: 600, credentials: true, @@ -42,7 +43,9 @@ export function createApp(storage: Storage, config: AppConfig) { */ app.get('/', (c) => { return c.json({ - version: config.version || 'unknown' + version: config.version, + name: 'Rondevu', + description: 'Topic-based peer discovery and signaling server' }); }); @@ -54,161 +57,467 @@ export function createApp(storage: Storage, config: AppConfig) { return c.json({ status: 'ok', timestamp: Date.now(), - version: config.version || 'unknown' + version: config.version }); }); /** - * POST /offer - * Creates a new offer and returns a unique code - * Body: { peerId: string, offer: string, code?: string } + * POST /register + * Register a new peer and receive credentials */ - app.post('/offer', async (c) => { + app.post('/register', async (c) => { try { - const body = await c.req.json(); - const { peerId, offer, code: customCode } = body; + // Generate new peer ID + const peerId = generatePeerId(); - if (!peerId || typeof peerId !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: peerId' }, 400); - } + // Encrypt peer ID with server secret (async operation) + const secret = await encryptPeerId(peerId, config.authSecret); - if (peerId.length > 1024) { - return c.json({ error: 'PeerId string must be 1024 characters or less' }, 400); - } - - if (!offer || typeof offer !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: offer' }, 400); - } - - const expiresAt = Date.now() + config.offerTimeout; - const code = await storage.createOffer(peerId, offer, expiresAt, customCode); - - return c.json({ code }, 200); + return c.json({ + peerId, + secret + }, 200); } catch (err) { - console.error('Error creating offer:', err); - - // Check if it's a code clash error - if (err instanceof Error && err.message.includes('already exists')) { - return c.json({ error: err.message }, 409); - } - + console.error('Error registering peer:', err); return c.json({ error: 'Internal server error' }, 500); } }); /** - * POST /answer - * Responds to an existing offer or sends ICE candidates - * Body: { code: string, answer?: string, candidate?: string, side: 'offerer' | 'answerer' } + * POST /offers + * Creates one or more offers with topics + * Requires authentication */ - app.post('/answer', async (c) => { + app.post('/offers', authMiddleware, async (c) => { try { const body = await c.req.json(); - const { code, answer, candidate, side } = body; + const { offers } = body; - if (!code || typeof code !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: code' }, 400); + if (!Array.isArray(offers) || offers.length === 0) { + return c.json({ error: 'Missing or invalid required parameter: offers (must be non-empty array)' }, 400); } - if (!side || (side !== 'offerer' && side !== 'answerer')) { - return c.json({ error: 'Invalid or missing parameter: side (must be "offerer" or "answerer")' }, 400); + if (offers.length > config.maxOffersPerRequest) { + return c.json({ error: `Too many offers. Maximum ${config.maxOffersPerRequest} per request` }, 400); } - if (!answer && !candidate) { - return c.json({ error: 'Missing required parameter: answer or candidate' }, 400); - } + const peerId = getAuthenticatedPeerId(c); - if (answer && candidate) { - return c.json({ error: 'Cannot provide both answer and candidate' }, 400); - } - - const offer = await storage.getOffer(code); - - if (!offer) { - return c.json({ error: 'Offer not found or expired' }, 404); - } - - if (answer) { - await storage.updateOffer(code, { answer }); - } - - if (candidate) { - if (side === 'offerer') { - const updatedCandidates = [...offer.offerCandidates, candidate]; - await storage.updateOffer(code, { offerCandidates: updatedCandidates }); - } else { - const updatedCandidates = [...offer.answerCandidates, candidate]; - await storage.updateOffer(code, { answerCandidates: updatedCandidates }); + // Validate and prepare offers + const offerRequests = []; + for (const offer of offers) { + // Validate SDP + if (!offer.sdp || typeof offer.sdp !== 'string') { + return c.json({ error: 'Each offer must have an sdp field' }, 400); } + + if (offer.sdp.length > 65536) { + return c.json({ error: 'SDP must be 64KB or less' }, 400); + } + + // Validate topics + if (!Array.isArray(offer.topics) || offer.topics.length === 0) { + return c.json({ error: 'Each offer must have a non-empty topics array' }, 400); + } + + if (offer.topics.length > config.maxTopicsPerOffer) { + return c.json({ error: `Too many topics. Maximum ${config.maxTopicsPerOffer} per offer` }, 400); + } + + for (const topic of offer.topics) { + if (typeof topic !== 'string' || topic.length === 0 || topic.length > 256) { + return c.json({ error: 'Each topic must be a string between 1 and 256 characters' }, 400); + } + } + + // Validate and clamp TTL + let ttl = offer.ttl || config.offerDefaultTtl; + if (ttl < config.offerMinTtl) { + ttl = config.offerMinTtl; + } + if (ttl > config.offerMaxTtl) { + ttl = config.offerMaxTtl; + } + + offerRequests.push({ + id: offer.id, + peerId, + sdp: offer.sdp, + topics: offer.topics, + expiresAt: Date.now() + ttl, + }); } - return c.json({ success: true }, 200); + // Create offers + const createdOffers = await storage.createOffers(offerRequests); + + // Return simplified response + return c.json({ + offers: createdOffers.map(o => ({ + id: o.id, + peerId: o.peerId, + topics: o.topics, + expiresAt: o.expiresAt + })) + }, 200); } catch (err) { - console.error('Error handling answer:', err); + console.error('Error creating offers:', err); return c.json({ error: 'Internal server error' }, 500); } }); /** - * POST /poll - * Polls for offer data (offer, answer, ICE candidates) - * Body: { code: string, side: 'offerer' | 'answerer' } + * GET /offers/by-topic/:topic + * Find offers by topic with optional bloom filter exclusion + * Public endpoint (no auth required) */ - app.post('/poll', async (c) => { + app.get('/offers/by-topic/:topic', async (c) => { try { - const body = await c.req.json(); - const { code, side } = body; + const topic = c.req.param('topic'); + const bloomParam = c.req.query('bloom'); + const limitParam = c.req.query('limit'); - if (!code || typeof code !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: code' }, 400); + const limit = limitParam ? Math.min(parseInt(limitParam, 10), 200) : 50; + + // Parse bloom filter if provided + let excludePeerIds: string[] = []; + if (bloomParam) { + const bloom = parseBloomFilter(bloomParam); + if (!bloom) { + return c.json({ error: 'Invalid bloom filter format' }, 400); + } + + // Get all offers for topic first + const allOffers = await storage.getOffersByTopic(topic); + + // Test each peer ID against bloom filter + const excludeSet = new Set(); + for (const offer of allOffers) { + if (bloom.test(offer.peerId)) { + excludeSet.add(offer.peerId); + } + } + + excludePeerIds = Array.from(excludeSet); } - if (!side || (side !== 'offerer' && side !== 'answerer')) { - return c.json({ error: 'Invalid or missing parameter: side (must be "offerer" or "answerer")' }, 400); - } + // Get filtered offers + let offers = await storage.getOffersByTopic(topic, excludePeerIds.length > 0 ? excludePeerIds : undefined); - const offer = await storage.getOffer(code); + // Apply limit + const total = offers.length; + offers = offers.slice(0, limit); + return c.json({ + topic, + offers: offers.map(o => ({ + id: o.id, + peerId: o.peerId, + sdp: o.sdp, + topics: o.topics, + expiresAt: o.expiresAt, + lastSeen: o.lastSeen + })), + total: bloomParam ? total + excludePeerIds.length : total, + returned: offers.length + }, 200); + } catch (err) { + console.error('Error fetching offers by topic:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * GET /topics + * List all topics with active peer counts (paginated) + * Public endpoint (no auth required) + */ + app.get('/topics', async (c) => { + try { + const limitParam = c.req.query('limit'); + const offsetParam = c.req.query('offset'); + + const limit = limitParam ? Math.min(parseInt(limitParam, 10), 200) : 50; + const offset = offsetParam ? parseInt(offsetParam, 10) : 0; + + const result = await storage.getTopics(limit, offset); + + return c.json({ + topics: result.topics, + total: result.total, + limit, + offset + }, 200); + } catch (err) { + console.error('Error fetching topics:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * GET /peers/:peerId/offers + * View all offers from a specific peer + * Public endpoint + */ + app.get('/peers/:peerId/offers', async (c) => { + try { + const peerId = c.req.param('peerId'); + const offers = await storage.getOffersByPeerId(peerId); + + // Collect unique topics + const topicsSet = new Set(); + offers.forEach(o => o.topics.forEach(t => topicsSet.add(t))); + + return c.json({ + peerId, + offers: offers.map(o => ({ + id: o.id, + sdp: o.sdp, + topics: o.topics, + expiresAt: o.expiresAt, + lastSeen: o.lastSeen + })), + topics: Array.from(topicsSet) + }, 200); + } catch (err) { + console.error('Error fetching peer offers:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * GET /offers/mine + * List all offers owned by authenticated peer + * Requires authentication + */ + app.get('/offers/mine', authMiddleware, async (c) => { + try { + const peerId = getAuthenticatedPeerId(c); + const offers = await storage.getOffersByPeerId(peerId); + + return c.json({ + peerId, + offers: offers.map(o => ({ + id: o.id, + sdp: o.sdp, + topics: o.topics, + createdAt: o.createdAt, + expiresAt: o.expiresAt, + lastSeen: o.lastSeen, + answererPeerId: o.answererPeerId, + answeredAt: o.answeredAt + })) + }, 200); + } catch (err) { + console.error('Error fetching own offers:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * PUT /offers/:offerId/heartbeat + * Update last_seen timestamp for an offer + * Requires authentication and ownership + */ + app.put('/offers/:offerId/heartbeat', authMiddleware, async (c) => { + try { + const offerId = c.req.param('offerId'); + const peerId = getAuthenticatedPeerId(c); + + // Verify ownership + const offer = await storage.getOfferById(offerId); if (!offer) { return c.json({ error: 'Offer not found or expired' }, 404); } - if (side === 'offerer') { - return c.json({ - answer: offer.answer || null, - answerCandidates: offer.answerCandidates, - }); - } else { - return c.json({ - offer: offer.offer, - offerCandidates: offer.offerCandidates, - }); + if (offer.peerId !== peerId) { + return c.json({ error: 'Not authorized to update this offer' }, 403); } + + const now = Date.now(); + await storage.updateOfferLastSeen(offerId, now); + + return c.json({ + id: offerId, + lastSeen: now + }, 200); } catch (err) { - console.error('Error polling offer:', err); + console.error('Error updating offer heartbeat:', err); return c.json({ error: 'Internal server error' }, 500); } }); /** - * POST /leave - * Ends a session by deleting the offer - * Body: { code: string } + * DELETE /offers/:offerId + * Delete a specific offer + * Requires authentication and ownership */ - app.post('/leave', async (c) => { + app.delete('/offers/:offerId', authMiddleware, async (c) => { try { - const body = await c.req.json(); - const { code } = body; + const offerId = c.req.param('offerId'); + const peerId = getAuthenticatedPeerId(c); - if (!code || typeof code !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: code' }, 400); + const deleted = await storage.deleteOffer(offerId, peerId); + + if (!deleted) { + return c.json({ error: 'Offer not found or not authorized' }, 404); } - await storage.deleteOffer(code); - - return c.json({ success: true }, 200); + return c.json({ deleted: true }, 200); } catch (err) { - console.error('Error leaving session:', err); + console.error('Error deleting offer:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * POST /offers/:offerId/answer + * Answer a specific offer (locks it to answerer) + * Requires authentication + */ + app.post('/offers/:offerId/answer', authMiddleware, async (c) => { + try { + const offerId = c.req.param('offerId'); + const peerId = getAuthenticatedPeerId(c); + const body = await c.req.json(); + const { sdp } = body; + + if (!sdp || typeof sdp !== 'string') { + return c.json({ error: 'Missing or invalid required parameter: sdp' }, 400); + } + + if (sdp.length > 65536) { + return c.json({ error: 'SDP must be 64KB or less' }, 400); + } + + const result = await storage.answerOffer(offerId, peerId, sdp); + + if (!result.success) { + return c.json({ error: result.error }, 400); + } + + return c.json({ + offerId, + answererId: peerId, + answeredAt: Date.now() + }, 200); + } catch (err) { + console.error('Error answering offer:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * GET /offers/answers + * Poll for answers to all of authenticated peer's offers + * Requires authentication (offerer) + */ + app.get('/offers/answers', authMiddleware, async (c) => { + try { + const peerId = getAuthenticatedPeerId(c); + const offers = await storage.getAnsweredOffers(peerId); + + return c.json({ + answers: offers.map(o => ({ + offerId: o.id, + answererId: o.answererPeerId, + sdp: o.answerSdp, + answeredAt: o.answeredAt, + topics: o.topics + })) + }, 200); + } catch (err) { + console.error('Error fetching answers:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * POST /offers/:offerId/ice-candidates + * Post ICE candidates for an offer + * Requires authentication (must be offerer or answerer) + */ + app.post('/offers/:offerId/ice-candidates', authMiddleware, async (c) => { + try { + const offerId = c.req.param('offerId'); + const peerId = getAuthenticatedPeerId(c); + const body = await c.req.json(); + const { candidates } = body; + + if (!Array.isArray(candidates) || candidates.length === 0) { + return c.json({ error: 'Missing or invalid required parameter: candidates (must be non-empty array)' }, 400); + } + + // Verify offer exists and caller is offerer or answerer + const offer = await storage.getOfferById(offerId); + if (!offer) { + return c.json({ error: 'Offer not found or expired' }, 404); + } + + let role: 'offerer' | 'answerer'; + if (offer.peerId === peerId) { + role = 'offerer'; + } else if (offer.answererPeerId === peerId) { + role = 'answerer'; + } else { + return c.json({ error: 'Not authorized to post ICE candidates for this offer' }, 403); + } + + const added = await storage.addIceCandidates(offerId, peerId, role, candidates); + + return c.json({ + offerId, + candidatesAdded: added + }, 200); + } catch (err) { + console.error('Error adding ICE candidates:', err); + return c.json({ error: 'Internal server error' }, 500); + } + }); + + /** + * GET /offers/:offerId/ice-candidates + * Poll for ICE candidates from the other peer + * Requires authentication (must be offerer or answerer) + */ + app.get('/offers/:offerId/ice-candidates', authMiddleware, async (c) => { + try { + const offerId = c.req.param('offerId'); + const peerId = getAuthenticatedPeerId(c); + const sinceParam = c.req.query('since'); + + const since = sinceParam ? parseInt(sinceParam, 10) : undefined; + + // Verify offer exists and caller is offerer or answerer + const offer = await storage.getOfferById(offerId); + if (!offer) { + return c.json({ error: 'Offer not found or expired' }, 404); + } + + let targetRole: 'offerer' | 'answerer'; + if (offer.peerId === peerId) { + // Offerer wants answerer's candidates + targetRole = 'answerer'; + } else if (offer.answererPeerId === peerId) { + // Answerer wants offerer's candidates + targetRole = 'offerer'; + } else { + return c.json({ error: 'Not authorized to view ICE candidates for this offer' }, 403); + } + + const candidates = await storage.getIceCandidates(offerId, targetRole, since); + + return c.json({ + offerId, + candidates: candidates.map(c => ({ + candidate: c.candidate, + peerId: c.peerId, + role: c.role, + createdAt: c.createdAt + })) + }, 200); + } catch (err) { + console.error('Error fetching ICE candidates:', err); return c.json({ error: 'Internal server error' }, 500); } }); diff --git a/src/bloom.ts b/src/bloom.ts new file mode 100644 index 0000000..bab94e2 --- /dev/null +++ b/src/bloom.ts @@ -0,0 +1,62 @@ +/** + * Bloom filter utility for testing if peer IDs might be in a set + * Used to filter out known peers from discovery results + */ + +export class BloomFilter { + private bits: Uint8Array; + private size: number; + private numHashes: number; + + /** + * Creates a bloom filter from a base64 encoded bit array + */ + constructor(base64Data: string, numHashes: number = 3) { + // Decode base64 to buffer + const buffer = Buffer.from(base64Data, 'base64'); + this.bits = new Uint8Array(buffer); + this.size = this.bits.length * 8; + this.numHashes = numHashes; + } + + /** + * Test if a peer ID might be in the filter + * Returns true if possibly in set, false if definitely not in set + */ + test(peerId: string): boolean { + for (let i = 0; i < this.numHashes; i++) { + const hash = this.hash(peerId, i); + const index = hash % this.size; + const byteIndex = Math.floor(index / 8); + const bitIndex = index % 8; + + if (!(this.bits[byteIndex] & (1 << bitIndex))) { + return false; + } + } + return true; + } + + /** + * Simple hash function (FNV-1a variant) + */ + private hash(str: string, seed: number): number { + let hash = 2166136261 ^ seed; + for (let i = 0; i < str.length; i++) { + hash ^= str.charCodeAt(i); + hash += (hash << 1) + (hash << 4) + (hash << 7) + (hash << 8) + (hash << 24); + } + return hash >>> 0; + } +} + +/** + * Helper to parse bloom filter from base64 string + */ +export function parseBloomFilter(base64: string): BloomFilter | null { + try { + return new BloomFilter(base64); + } catch { + return null; + } +} diff --git a/src/config.ts b/src/config.ts index 0a778f6..d22f95d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,3 +1,5 @@ +import { generateSecretKey } from './crypto.ts'; + /** * Application configuration * Reads from environment variables with sensible defaults @@ -6,23 +8,44 @@ export interface Config { port: number; storageType: 'sqlite' | 'memory'; storagePath: string; - offerTimeout: number; corsOrigins: string[]; version: string; + authSecret: string; + offerDefaultTtl: number; + offerMaxTtl: number; + offerMinTtl: number; + cleanupInterval: number; + maxOffersPerRequest: number; + maxTopicsPerOffer: number; } /** * Loads configuration from environment variables */ export function loadConfig(): Config { + // Generate or load auth secret + let authSecret = process.env.AUTH_SECRET; + if (!authSecret) { + authSecret = generateSecretKey(); + console.warn('WARNING: No AUTH_SECRET provided. Generated temporary secret:', authSecret); + console.warn('All peer credentials will be invalidated on server restart.'); + console.warn('Set AUTH_SECRET environment variable to persist credentials across restarts.'); + } + return { port: parseInt(process.env.PORT || '3000', 10), storageType: (process.env.STORAGE_TYPE || 'sqlite') as 'sqlite' | 'memory', storagePath: process.env.STORAGE_PATH || ':memory:', - offerTimeout: parseInt(process.env.OFFER_TIMEOUT || '60000', 10), corsOrigins: process.env.CORS_ORIGINS ? process.env.CORS_ORIGINS.split(',').map(o => o.trim()) : ['*'], version: process.env.VERSION || 'unknown', + authSecret, + offerDefaultTtl: parseInt(process.env.OFFER_DEFAULT_TTL || '60000', 10), + offerMaxTtl: parseInt(process.env.OFFER_MAX_TTL || '86400000', 10), + offerMinTtl: parseInt(process.env.OFFER_MIN_TTL || '60000', 10), + cleanupInterval: parseInt(process.env.CLEANUP_INTERVAL || '60000', 10), + maxOffersPerRequest: parseInt(process.env.MAX_OFFERS_PER_REQUEST || '100', 10), + maxTopicsPerOffer: parseInt(process.env.MAX_TOPICS_PER_OFFER || '50', 10), }; } diff --git a/src/crypto.ts b/src/crypto.ts new file mode 100644 index 0000000..dc4afe0 --- /dev/null +++ b/src/crypto.ts @@ -0,0 +1,149 @@ +/** + * Crypto utilities for stateless peer authentication + * Uses Web Crypto API for compatibility with both Node.js and Cloudflare Workers + */ + +const ALGORITHM = 'AES-GCM'; +const IV_LENGTH = 12; // 96 bits for GCM +const KEY_LENGTH = 32; // 256 bits + +/** + * Generates a random peer ID (16 bytes = 32 hex chars) + */ +export function generatePeerId(): string { + const bytes = crypto.getRandomValues(new Uint8Array(16)); + return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join(''); +} + +/** + * Generates a random secret key for encryption (32 bytes = 64 hex chars) + */ +export function generateSecretKey(): string { + const bytes = crypto.getRandomValues(new Uint8Array(KEY_LENGTH)); + return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join(''); +} + +/** + * Convert hex string to Uint8Array + */ +function hexToBytes(hex: string): Uint8Array { + const bytes = new Uint8Array(hex.length / 2); + for (let i = 0; i < hex.length; i += 2) { + bytes[i / 2] = parseInt(hex.substring(i, i + 2), 16); + } + return bytes; +} + +/** + * Convert Uint8Array to base64 string + */ +function bytesToBase64(bytes: Uint8Array): string { + const binString = Array.from(bytes, (byte) => + String.fromCodePoint(byte) + ).join(''); + return btoa(binString); +} + +/** + * Convert base64 string to Uint8Array + */ +function base64ToBytes(base64: string): Uint8Array { + const binString = atob(base64); + return Uint8Array.from(binString, (char) => char.codePointAt(0)!); +} + +/** + * Encrypts a peer ID using the server secret key + * Returns base64-encoded encrypted data (IV + ciphertext) + */ +export async function encryptPeerId(peerId: string, secretKeyHex: string): Promise { + const keyBytes = hexToBytes(secretKeyHex); + + if (keyBytes.length !== KEY_LENGTH) { + throw new Error(`Secret key must be ${KEY_LENGTH * 2} hex characters (${KEY_LENGTH} bytes)`); + } + + // Import key + const key = await crypto.subtle.importKey( + 'raw', + keyBytes, + { name: ALGORITHM, length: 256 }, + false, + ['encrypt'] + ); + + // Generate random IV + const iv = crypto.getRandomValues(new Uint8Array(IV_LENGTH)); + + // Encrypt peer ID + const encoder = new TextEncoder(); + const data = encoder.encode(peerId); + + const encrypted = await crypto.subtle.encrypt( + { name: ALGORITHM, iv }, + key, + data + ); + + // Combine IV + ciphertext and encode as base64 + const combined = new Uint8Array(iv.length + encrypted.byteLength); + combined.set(iv, 0); + combined.set(new Uint8Array(encrypted), iv.length); + + return bytesToBase64(combined); +} + +/** + * Decrypts an encrypted peer ID secret + * Returns the plaintext peer ID or throws if decryption fails + */ +export async function decryptPeerId(encryptedSecret: string, secretKeyHex: string): Promise { + try { + const keyBytes = hexToBytes(secretKeyHex); + + if (keyBytes.length !== KEY_LENGTH) { + throw new Error(`Secret key must be ${KEY_LENGTH * 2} hex characters (${KEY_LENGTH} bytes)`); + } + + // Decode base64 + const combined = base64ToBytes(encryptedSecret); + + // Extract IV and ciphertext + const iv = combined.slice(0, IV_LENGTH); + const ciphertext = combined.slice(IV_LENGTH); + + // Import key + const key = await crypto.subtle.importKey( + 'raw', + keyBytes, + { name: ALGORITHM, length: 256 }, + false, + ['decrypt'] + ); + + // Decrypt + const decrypted = await crypto.subtle.decrypt( + { name: ALGORITHM, iv }, + key, + ciphertext + ); + + const decoder = new TextDecoder(); + return decoder.decode(decrypted); + } catch (err) { + throw new Error('Failed to decrypt peer ID: invalid secret or secret key'); + } +} + +/** + * Validates that a peer ID and secret match + * Returns true if valid, false otherwise + */ +export async function validateCredentials(peerId: string, encryptedSecret: string, secretKey: string): Promise { + try { + const decryptedPeerId = await decryptPeerId(encryptedSecret, secretKey); + return decryptedPeerId === peerId; + } catch { + return false; + } +} diff --git a/src/index.ts b/src/index.ts index ccb207f..dd5d302 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,7 +15,12 @@ async function main() { port: config.port, storageType: config.storageType, storagePath: config.storagePath, - offerTimeout: `${config.offerTimeout}ms`, + offerDefaultTtl: `${config.offerDefaultTtl}ms`, + offerMaxTtl: `${config.offerMaxTtl}ms`, + offerMinTtl: `${config.offerMinTtl}ms`, + cleanupInterval: `${config.cleanupInterval}ms`, + maxOffersPerRequest: config.maxOffersPerRequest, + maxTopicsPerOffer: config.maxTopicsPerOffer, corsOrigins: config.corsOrigins, version: config.version, }); @@ -29,11 +34,20 @@ async function main() { throw new Error('Unsupported storage type'); } - const app = createApp(storage, { - offerTimeout: config.offerTimeout, - corsOrigins: config.corsOrigins, - version: config.version, - }); + // Start periodic cleanup of expired offers + const cleanupInterval = setInterval(async () => { + try { + const now = Date.now(); + const deleted = await storage.deleteExpiredOffers(now); + if (deleted > 0) { + console.log(`Cleanup: Deleted ${deleted} expired offer(s)`); + } + } catch (err) { + console.error('Cleanup error:', err); + } + }, config.cleanupInterval); + + const app = createApp(storage, config); const server = serve({ fetch: app.fetch, @@ -41,18 +55,18 @@ async function main() { }); console.log(`Server running on http://localhost:${config.port}`); + console.log('Ready to accept connections'); - process.on('SIGINT', async () => { + // Graceful shutdown handler + const shutdown = async () => { console.log('\nShutting down gracefully...'); + clearInterval(cleanupInterval); await storage.close(); process.exit(0); - }); + }; - process.on('SIGTERM', async () => { - console.log('\nShutting down gracefully...'); - await storage.close(); - process.exit(0); - }); + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); } main().catch((err) => { diff --git a/src/middleware/auth.ts b/src/middleware/auth.ts new file mode 100644 index 0000000..b0e3586 --- /dev/null +++ b/src/middleware/auth.ts @@ -0,0 +1,51 @@ +import { Context, Next } from 'hono'; +import { validateCredentials } from '../crypto.ts'; + +/** + * Authentication middleware for Rondevu + * Validates Bearer token in format: {peerId}:{encryptedSecret} + */ +export function createAuthMiddleware(authSecret: string) { + return async (c: Context, next: Next) => { + const authHeader = c.req.header('Authorization'); + + if (!authHeader) { + return c.json({ error: 'Missing Authorization header' }, 401); + } + + // Expect format: Bearer {peerId}:{secret} + const parts = authHeader.split(' '); + if (parts.length !== 2 || parts[0] !== 'Bearer') { + return c.json({ error: 'Invalid Authorization header format. Expected: Bearer {peerId}:{secret}' }, 401); + } + + const credentials = parts[1].split(':'); + if (credentials.length !== 2) { + return c.json({ error: 'Invalid credentials format. Expected: {peerId}:{secret}' }, 401); + } + + const [peerId, encryptedSecret] = credentials; + + // Validate credentials (async operation) + const isValid = await validateCredentials(peerId, encryptedSecret, authSecret); + if (!isValid) { + return c.json({ error: 'Invalid credentials' }, 401); + } + + // Attach peer ID to context for use in handlers + c.set('peerId', peerId); + + await next(); + }; +} + +/** + * Helper to get authenticated peer ID from context + */ +export function getAuthenticatedPeerId(c: Context): string { + const peerId = c.get('peerId'); + if (!peerId) { + throw new Error('No authenticated peer ID in context'); + } + return peerId; +} diff --git a/src/storage/d1.ts b/src/storage/d1.ts index 40aa81e..7d50761 100644 --- a/src/storage/d1.ts +++ b/src/storage/d1.ts @@ -1,4 +1,4 @@ -import { Storage, Offer } from './types.ts'; +import { Storage, Offer, IceCandidate, CreateOfferRequest, TopicInfo } from './types.ts'; // Generate a UUID v4 function generateUUID(): string { @@ -6,7 +6,8 @@ function generateUUID(): string { } /** - * D1 storage adapter for offer management using Cloudflare D1 + * D1 storage adapter for topic-based offer management using Cloudflare D1 + * NOTE: This implementation is a placeholder and needs to be fully tested */ export class D1Storage implements Storage { private db: D1Database; @@ -20,161 +21,337 @@ export class D1Storage implements Storage { } /** - * Initializes database schema + * Initializes database schema with new topic-based structure * This should be run once during setup, not on every request */ async initializeDatabase(): Promise { await this.db.exec(` CREATE TABLE IF NOT EXISTS offers ( - code TEXT PRIMARY KEY, - peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), - offer TEXT NOT NULL, - answer TEXT, - offer_candidates TEXT NOT NULL DEFAULT '[]', - answer_candidates TEXT NOT NULL DEFAULT '[]', + id TEXT PRIMARY KEY, + peer_id TEXT NOT NULL, + sdp TEXT NOT NULL, created_at INTEGER NOT NULL, - expires_at INTEGER NOT NULL + expires_at INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + answerer_peer_id TEXT, + answer_sdp TEXT, + answered_at INTEGER ); - CREATE INDEX IF NOT EXISTS idx_offers_expires_at ON offers(expires_at); + CREATE INDEX IF NOT EXISTS idx_offers_peer ON offers(peer_id); + CREATE INDEX IF NOT EXISTS idx_offers_expires ON offers(expires_at); + CREATE INDEX IF NOT EXISTS idx_offers_last_seen ON offers(last_seen); + CREATE INDEX IF NOT EXISTS idx_offers_answerer ON offers(answerer_peer_id); + + CREATE TABLE IF NOT EXISTS offer_topics ( + offer_id TEXT NOT NULL, + topic TEXT NOT NULL, + PRIMARY KEY (offer_id, topic), + FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_topics_topic ON offer_topics(topic); + CREATE INDEX IF NOT EXISTS idx_topics_offer ON offer_topics(offer_id); + + CREATE TABLE IF NOT EXISTS ice_candidates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + offer_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + role TEXT NOT NULL CHECK(role IN ('offerer', 'answerer')), + candidate TEXT NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_ice_offer ON ice_candidates(offer_id); + CREATE INDEX IF NOT EXISTS idx_ice_peer ON ice_candidates(peer_id); + CREATE INDEX IF NOT EXISTS idx_ice_created ON ice_candidates(created_at); `); } - async createOffer( - peerId: string, - offer: string, - expiresAt: number, - customCode?: string - ): Promise { - let code: string; - let attempts = 0; - const maxAttempts = 10; + async createOffers(offers: CreateOfferRequest[]): Promise { + const created: Offer[] = []; - // Generate unique code or use custom - do { - code = customCode || generateUUID(); - attempts++; + // D1 doesn't support true transactions yet, so we do this sequentially + for (const offer of offers) { + const id = offer.id || generateUUID(); + const now = Date.now(); - if (attempts > maxAttempts) { - throw new Error('Failed to generate unique offer code'); - } + // Insert offer + await this.db.prepare(` + INSERT INTO offers (id, peer_id, sdp, created_at, expires_at, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + `).bind(id, offer.peerId, offer.sdp, now, offer.expiresAt, now).run(); - try { + // Insert topics + for (const topic of offer.topics) { await this.db.prepare(` - INSERT INTO offers (code, peer_id, offer, created_at, expires_at) - VALUES (?, ?, ?, ?, ?) - `).bind(code, peerId, offer, Date.now(), expiresAt).run(); - - break; - } catch (err: any) { - // If unique constraint failed with custom code, throw error - if (err.message?.includes('UNIQUE constraint failed')) { - if (customCode) { - throw new Error(`Offer code '${customCode}' already exists`); - } - // Try again with new generated code - continue; - } - throw err; - } - } while (true); - - return code; - } - - async getOffer(code: string): Promise { - try { - const result = await this.db.prepare(` - SELECT * FROM offers - WHERE code = ? AND expires_at > ? - `).bind(code, Date.now()).first(); - - if (!result) { - return null; + INSERT INTO offer_topics (offer_id, topic) + VALUES (?, ?) + `).bind(id, topic).run(); } - const row: any = result; - - return { - code: row.code, - peerId: row.peer_id, - offer: row.offer, - answer: row.answer || undefined, - offerCandidates: JSON.parse(row.offer_candidates || '[]'), - answerCandidates: JSON.parse(row.answer_candidates || '[]'), - createdAt: row.created_at, - expiresAt: row.expires_at, - }; - } catch (error) { - console.error('[D1] getOffer error:', error); - throw error; + created.push({ + id, + peerId: offer.peerId, + sdp: offer.sdp, + topics: offer.topics, + createdAt: now, + expiresAt: offer.expiresAt, + lastSeen: now, + }); } + + return created; } - async updateOffer(code: string, update: Partial): Promise { - // Verify offer exists - const current = await this.getOffer(code); - - if (!current) { - throw new Error('Offer not found'); - } - - // Build update query dynamically based on what fields are being updated - const updates: string[] = []; - const values: any[] = []; - - if (update.answer !== undefined) { - updates.push('answer = ?'); - values.push(update.answer); - } - - if (update.offerCandidates !== undefined) { - updates.push('offer_candidates = ?'); - values.push(JSON.stringify(update.offerCandidates)); - } - - if (update.answerCandidates !== undefined) { - updates.push('answer_candidates = ?'); - values.push(JSON.stringify(update.answerCandidates)); - } - - if (updates.length === 0) { - return; // Nothing to update - } - - // Add WHERE clause values - values.push(code); - - // D1 provides strong consistency, so this update is atomic and immediately visible - const query = ` - UPDATE offers - SET ${updates.join(', ')} - WHERE code = ? + async getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise { + let query = ` + SELECT DISTINCT o.* + FROM offers o + INNER JOIN offer_topics ot ON o.id = ot.offer_id + WHERE ot.topic = ? AND o.expires_at > ? `; - await this.db.prepare(query).bind(...values).run(); + const params: any[] = [topic, Date.now()]; + + if (excludePeerIds && excludePeerIds.length > 0) { + const placeholders = excludePeerIds.map(() => '?').join(','); + query += ` AND o.peer_id NOT IN (${placeholders})`; + params.push(...excludePeerIds); + } + + query += ' ORDER BY o.last_seen DESC'; + + const result = await this.db.prepare(query).bind(...params).all(); + + if (!result.results) { + return []; + } + + return Promise.all(result.results.map(row => this.rowToOffer(row as any))); } - async deleteOffer(code: string): Promise { - await this.db.prepare(` - DELETE FROM offers WHERE code = ? - `).bind(code).run(); - } - - async cleanupExpiredOffers(): Promise { + async getOffersByPeerId(peerId: string): Promise { const result = await this.db.prepare(` - DELETE FROM offers WHERE expires_at <= ? - `).bind(Date.now()).run(); + SELECT * FROM offers + WHERE peer_id = ? AND expires_at > ? + ORDER BY last_seen DESC + `).bind(peerId, Date.now()).all(); + + if (!result.results) { + return []; + } + + return Promise.all(result.results.map(row => this.rowToOffer(row as any))); + } + + async getOfferById(offerId: string): Promise { + const result = await this.db.prepare(` + SELECT * FROM offers + WHERE id = ? AND expires_at > ? + `).bind(offerId, Date.now()).first(); + + if (!result) { + return null; + } + + return this.rowToOffer(result as any); + } + + async updateOfferLastSeen(offerId: string, lastSeen: number): Promise { + await this.db.prepare(` + UPDATE offers + SET last_seen = ? + WHERE id = ? AND expires_at > ? + `).bind(lastSeen, offerId, Date.now()).run(); + } + + async deleteOffer(offerId: string, ownerPeerId: string): Promise { + const result = await this.db.prepare(` + DELETE FROM offers + WHERE id = ? AND peer_id = ? + `).bind(offerId, ownerPeerId).run(); + + return (result.meta.changes || 0) > 0; + } + + async deleteExpiredOffers(now: number): Promise { + const result = await this.db.prepare(` + DELETE FROM offers WHERE expires_at < ? + `).bind(now).run(); return result.meta.changes || 0; } - async cleanup(): Promise { - await this.cleanupExpiredOffers(); + async answerOffer( + offerId: string, + answererPeerId: string, + answerSdp: string + ): Promise<{ success: boolean; error?: string }> { + // Check if offer exists and is not expired + const offer = await this.getOfferById(offerId); + + if (!offer) { + return { + success: false, + error: 'Offer not found or expired' + }; + } + + // Check if offer already has an answerer + if (offer.answererPeerId) { + return { + success: false, + error: 'Offer already answered' + }; + } + + // Update offer with answer + const result = await this.db.prepare(` + UPDATE offers + SET answerer_peer_id = ?, answer_sdp = ?, answered_at = ? + WHERE id = ? AND answerer_peer_id IS NULL + `).bind(answererPeerId, answerSdp, Date.now(), offerId).run(); + + if ((result.meta.changes || 0) === 0) { + return { + success: false, + error: 'Offer already answered (race condition)' + }; + } + + return { success: true }; + } + + async getAnsweredOffers(offererPeerId: string): Promise { + const result = await this.db.prepare(` + SELECT * FROM offers + WHERE peer_id = ? AND answerer_peer_id IS NOT NULL AND expires_at > ? + ORDER BY answered_at DESC + `).bind(offererPeerId, Date.now()).all(); + + if (!result.results) { + return []; + } + + return Promise.all(result.results.map(row => this.rowToOffer(row as any))); + } + + async addIceCandidates( + offerId: string, + peerId: string, + role: 'offerer' | 'answerer', + candidates: string[] + ): Promise { + // D1 doesn't have transactions, so insert one by one + for (const candidate of candidates) { + await this.db.prepare(` + INSERT INTO ice_candidates (offer_id, peer_id, role, candidate, created_at) + VALUES (?, ?, ?, ?, ?) + `).bind(offerId, peerId, role, candidate, Date.now()).run(); + } + + return candidates.length; + } + + async getIceCandidates( + offerId: string, + targetRole: 'offerer' | 'answerer', + since?: number + ): Promise { + let query = ` + SELECT * FROM ice_candidates + WHERE offer_id = ? AND role = ? + `; + + const params: any[] = [offerId, targetRole]; + + if (since !== undefined) { + query += ' AND created_at > ?'; + params.push(since); + } + + query += ' ORDER BY created_at ASC'; + + const result = await this.db.prepare(query).bind(...params).all(); + + if (!result.results) { + return []; + } + + return result.results.map((row: any) => ({ + id: row.id, + offerId: row.offer_id, + peerId: row.peer_id, + role: row.role, + candidate: row.candidate, + createdAt: row.created_at, + })); + } + + async getTopics(limit: number, offset: number): Promise<{ + topics: TopicInfo[]; + total: number; + }> { + // Get total count of topics with active offers + const countResult = await this.db.prepare(` + SELECT COUNT(DISTINCT ot.topic) as count + FROM offer_topics ot + INNER JOIN offers o ON ot.offer_id = o.id + WHERE o.expires_at > ? + `).bind(Date.now()).first(); + + const total = (countResult as any)?.count || 0; + + // Get topics with peer counts (paginated) + const topicsResult = await this.db.prepare(` + SELECT + ot.topic, + COUNT(DISTINCT o.peer_id) as active_peers + FROM offer_topics ot + INNER JOIN offers o ON ot.offer_id = o.id + WHERE o.expires_at > ? + GROUP BY ot.topic + ORDER BY active_peers DESC, ot.topic ASC + LIMIT ? OFFSET ? + `).bind(Date.now(), limit, offset).all(); + + const topics = (topicsResult.results || []).map((row: any) => ({ + topic: row.topic, + activePeers: row.active_peers, + })); + + return { topics, total }; } async close(): Promise { // D1 doesn't require explicit connection closing // Connections are managed by the Cloudflare Workers runtime } + + /** + * Helper method to convert database row to Offer object with topics + */ + private async rowToOffer(row: any): Promise { + // Get topics for this offer + const topicResult = await this.db.prepare(` + SELECT topic FROM offer_topics WHERE offer_id = ? + `).bind(row.id).all(); + + const topics = topicResult.results?.map((t: any) => t.topic) || []; + + return { + id: row.id, + peerId: row.peer_id, + sdp: row.sdp, + topics, + createdAt: row.created_at, + expiresAt: row.expires_at, + lastSeen: row.last_seen, + answererPeerId: row.answerer_peer_id || undefined, + answerSdp: row.answer_sdp || undefined, + answeredAt: row.answered_at || undefined, + }; + } } diff --git a/src/storage/sqlite.ts b/src/storage/sqlite.ts index 39f06b7..a333038 100644 --- a/src/storage/sqlite.ts +++ b/src/storage/sqlite.ts @@ -1,9 +1,9 @@ import Database from 'better-sqlite3'; import { randomUUID } from 'crypto'; -import { Storage, Offer } from './types.ts'; +import { Storage, Offer, IceCandidate, CreateOfferRequest, TopicInfo } from './types.ts'; /** - * SQLite storage adapter for offer management + * SQLite storage adapter for topic-based offer management * Supports both file-based and in-memory databases */ export class SQLiteStorage implements Storage { @@ -16,167 +16,356 @@ export class SQLiteStorage implements Storage { constructor(path: string = ':memory:') { this.db = new Database(path); this.initializeDatabase(); - this.startCleanupInterval(); } /** - * Initializes database schema + * Initializes database schema with new topic-based structure */ private initializeDatabase(): void { this.db.exec(` CREATE TABLE IF NOT EXISTS offers ( - code TEXT PRIMARY KEY, - peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), - offer TEXT NOT NULL, - answer TEXT, - offer_candidates TEXT NOT NULL DEFAULT '[]', - answer_candidates TEXT NOT NULL DEFAULT '[]', + id TEXT PRIMARY KEY, + peer_id TEXT NOT NULL, + sdp TEXT NOT NULL, created_at INTEGER NOT NULL, - expires_at INTEGER NOT NULL + expires_at INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + answerer_peer_id TEXT, + answer_sdp TEXT, + answered_at INTEGER ); - CREATE INDEX IF NOT EXISTS idx_offers_expires_at ON offers(expires_at); + CREATE INDEX IF NOT EXISTS idx_offers_peer ON offers(peer_id); + CREATE INDEX IF NOT EXISTS idx_offers_expires ON offers(expires_at); + CREATE INDEX IF NOT EXISTS idx_offers_last_seen ON offers(last_seen); + CREATE INDEX IF NOT EXISTS idx_offers_answerer ON offers(answerer_peer_id); + + CREATE TABLE IF NOT EXISTS offer_topics ( + offer_id TEXT NOT NULL, + topic TEXT NOT NULL, + PRIMARY KEY (offer_id, topic), + FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_topics_topic ON offer_topics(topic); + CREATE INDEX IF NOT EXISTS idx_topics_offer ON offer_topics(offer_id); + + CREATE TABLE IF NOT EXISTS ice_candidates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + offer_id TEXT NOT NULL, + peer_id TEXT NOT NULL, + role TEXT NOT NULL CHECK(role IN ('offerer', 'answerer')), + candidate TEXT NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_ice_offer ON ice_candidates(offer_id); + CREATE INDEX IF NOT EXISTS idx_ice_peer ON ice_candidates(peer_id); + CREATE INDEX IF NOT EXISTS idx_ice_created ON ice_candidates(created_at); `); + + // Enable foreign keys + this.db.pragma('foreign_keys = ON'); } - /** - * Starts periodic cleanup of expired offers - */ - private startCleanupInterval(): void { - // Run cleanup every minute - setInterval(() => { - this.cleanup().catch(err => { - console.error('Cleanup error:', err); - }); - }, 60000); + async createOffers(offers: CreateOfferRequest[]): Promise { + const created: Offer[] = []; + + // Use transaction for atomic creation + const transaction = this.db.transaction((offers: CreateOfferRequest[]) => { + const offerStmt = this.db.prepare(` + INSERT INTO offers (id, peer_id, sdp, created_at, expires_at, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + `); + + const topicStmt = this.db.prepare(` + INSERT INTO offer_topics (offer_id, topic) + VALUES (?, ?) + `); + + for (const offer of offers) { + const id = offer.id || randomUUID(); + const now = Date.now(); + + // Insert offer + offerStmt.run( + id, + offer.peerId, + offer.sdp, + now, + offer.expiresAt, + now + ); + + // Insert topics + for (const topic of offer.topics) { + topicStmt.run(id, topic); + } + + created.push({ + id, + peerId: offer.peerId, + sdp: offer.sdp, + topics: offer.topics, + createdAt: now, + expiresAt: offer.expiresAt, + lastSeen: now, + }); + } + }); + + transaction(offers); + return created; } - /** - * Generates a unique code using UUID - */ - private generateCode(): string { - return randomUUID(); - } + async getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise { + let query = ` + SELECT DISTINCT o.* + FROM offers o + INNER JOIN offer_topics ot ON o.id = ot.offer_id + WHERE ot.topic = ? AND o.expires_at > ? + `; - async createOffer(peerId: string, offer: string, expiresAt: number, customCode?: string): Promise { - // Validate peerId length - if (peerId.length > 1024) { - throw new Error('PeerId string must be 1024 characters or less'); + const params: any[] = [topic, Date.now()]; + + if (excludePeerIds && excludePeerIds.length > 0) { + const placeholders = excludePeerIds.map(() => '?').join(','); + query += ` AND o.peer_id NOT IN (${placeholders})`; + params.push(...excludePeerIds); } - let code: string; - let attempts = 0; - const maxAttempts = 10; + query += ' ORDER BY o.last_seen DESC'; - // Try to generate or use custom code - do { - code = customCode || this.generateCode(); - attempts++; + const stmt = this.db.prepare(query); + const rows = stmt.all(...params) as any[]; - if (attempts > maxAttempts) { - throw new Error('Failed to generate unique offer code'); - } - - try { - const stmt = this.db.prepare(` - INSERT INTO offers (code, peer_id, offer, created_at, expires_at) - VALUES (?, ?, ?, ?, ?) - `); - - stmt.run(code, peerId, offer, Date.now(), expiresAt); - break; - } catch (err: any) { - // If unique constraint failed with custom code, throw error - if (err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY') { - if (customCode) { - throw new Error(`Offer code '${customCode}' already exists`); - } - // Try again with new generated code - continue; - } - throw err; - } - } while (true); - - return code; + return Promise.all(rows.map(row => this.rowToOffer(row))); } - async getOffer(code: string): Promise { + async getOffersByPeerId(peerId: string): Promise { const stmt = this.db.prepare(` - SELECT * FROM offers WHERE code = ? AND expires_at > ? + SELECT * FROM offers + WHERE peer_id = ? AND expires_at > ? + ORDER BY last_seen DESC `); - const row = stmt.get(code, Date.now()) as any; + const rows = stmt.all(peerId, Date.now()) as any[]; + return Promise.all(rows.map(row => this.rowToOffer(row))); + } + + async getOfferById(offerId: string): Promise { + const stmt = this.db.prepare(` + SELECT * FROM offers + WHERE id = ? AND expires_at > ? + `); + + const row = stmt.get(offerId, Date.now()) as any; if (!row) { return null; } - return { - code: row.code, - peerId: row.peer_id, - offer: row.offer, - answer: row.answer || undefined, - offerCandidates: JSON.parse(row.offer_candidates), - answerCandidates: JSON.parse(row.answer_candidates), - createdAt: row.created_at, - expiresAt: row.expires_at, - }; + return this.rowToOffer(row); } - async updateOffer(code: string, update: Partial): Promise { - const current = await this.getOffer(code); - - if (!current) { - throw new Error('Offer not found'); - } - - const updates: string[] = []; - const values: any[] = []; - - if (update.answer !== undefined) { - updates.push('answer = ?'); - values.push(update.answer); - } - - if (update.offerCandidates !== undefined) { - updates.push('offer_candidates = ?'); - values.push(JSON.stringify(update.offerCandidates)); - } - - if (update.answerCandidates !== undefined) { - updates.push('answer_candidates = ?'); - values.push(JSON.stringify(update.answerCandidates)); - } - - if (updates.length === 0) { - return; - } - - values.push(code); - + async updateOfferLastSeen(offerId: string, lastSeen: number): Promise { const stmt = this.db.prepare(` - UPDATE offers SET ${updates.join(', ')} WHERE code = ? + UPDATE offers + SET last_seen = ? + WHERE id = ? AND expires_at > ? `); - stmt.run(...values); + stmt.run(lastSeen, offerId, Date.now()); } - async deleteOffer(code: string): Promise { - const stmt = this.db.prepare('DELETE FROM offers WHERE code = ?'); - stmt.run(code); + async deleteOffer(offerId: string, ownerPeerId: string): Promise { + const stmt = this.db.prepare(` + DELETE FROM offers + WHERE id = ? AND peer_id = ? + `); + + const result = stmt.run(offerId, ownerPeerId); + return result.changes > 0; } - async cleanup(): Promise { - const stmt = this.db.prepare('DELETE FROM offers WHERE expires_at <= ?'); - const result = stmt.run(Date.now()); + async deleteExpiredOffers(now: number): Promise { + const stmt = this.db.prepare('DELETE FROM offers WHERE expires_at < ?'); + const result = stmt.run(now); + return result.changes; + } - if (result.changes > 0) { - console.log(`Cleaned up ${result.changes} expired offer(s)`); + async answerOffer( + offerId: string, + answererPeerId: string, + answerSdp: string + ): Promise<{ success: boolean; error?: string }> { + // Check if offer exists and is not expired + const offer = await this.getOfferById(offerId); + + if (!offer) { + return { + success: false, + error: 'Offer not found or expired' + }; } + + // Check if offer already has an answerer + if (offer.answererPeerId) { + return { + success: false, + error: 'Offer already answered' + }; + } + + // Update offer with answer + const stmt = this.db.prepare(` + UPDATE offers + SET answerer_peer_id = ?, answer_sdp = ?, answered_at = ? + WHERE id = ? AND answerer_peer_id IS NULL + `); + + const result = stmt.run(answererPeerId, answerSdp, Date.now(), offerId); + + if (result.changes === 0) { + return { + success: false, + error: 'Offer already answered (race condition)' + }; + } + + return { success: true }; + } + + async getAnsweredOffers(offererPeerId: string): Promise { + const stmt = this.db.prepare(` + SELECT * FROM offers + WHERE peer_id = ? AND answerer_peer_id IS NOT NULL AND expires_at > ? + ORDER BY answered_at DESC + `); + + const rows = stmt.all(offererPeerId, Date.now()) as any[]; + return Promise.all(rows.map(row => this.rowToOffer(row))); + } + + async addIceCandidates( + offerId: string, + peerId: string, + role: 'offerer' | 'answerer', + candidates: string[] + ): Promise { + const stmt = this.db.prepare(` + INSERT INTO ice_candidates (offer_id, peer_id, role, candidate, created_at) + VALUES (?, ?, ?, ?, ?) + `); + + const transaction = this.db.transaction((candidates: string[]) => { + for (const candidate of candidates) { + stmt.run(offerId, peerId, role, candidate, Date.now()); + } + }); + + transaction(candidates); + return candidates.length; + } + + async getIceCandidates( + offerId: string, + targetRole: 'offerer' | 'answerer', + since?: number + ): Promise { + let query = ` + SELECT * FROM ice_candidates + WHERE offer_id = ? AND role = ? + `; + + const params: any[] = [offerId, targetRole]; + + if (since !== undefined) { + query += ' AND created_at > ?'; + params.push(since); + } + + query += ' ORDER BY created_at ASC'; + + const stmt = this.db.prepare(query); + const rows = stmt.all(...params) as any[]; + + return rows.map(row => ({ + id: row.id, + offerId: row.offer_id, + peerId: row.peer_id, + role: row.role, + candidate: row.candidate, + createdAt: row.created_at, + })); + } + + async getTopics(limit: number, offset: number): Promise<{ + topics: TopicInfo[]; + total: number; + }> { + // Get total count of topics with active offers + const countStmt = this.db.prepare(` + SELECT COUNT(DISTINCT ot.topic) as count + FROM offer_topics ot + INNER JOIN offers o ON ot.offer_id = o.id + WHERE o.expires_at > ? + `); + + const countRow = countStmt.get(Date.now()) as any; + const total = countRow.count; + + // Get topics with peer counts (paginated) + const topicsStmt = this.db.prepare(` + SELECT + ot.topic, + COUNT(DISTINCT o.peer_id) as active_peers + FROM offer_topics ot + INNER JOIN offers o ON ot.offer_id = o.id + WHERE o.expires_at > ? + GROUP BY ot.topic + ORDER BY active_peers DESC, ot.topic ASC + LIMIT ? OFFSET ? + `); + + const rows = topicsStmt.all(Date.now(), limit, offset) as any[]; + + const topics = rows.map(row => ({ + topic: row.topic, + activePeers: row.active_peers, + })); + + return { topics, total }; } async close(): Promise { this.db.close(); } + + /** + * Helper method to convert database row to Offer object with topics + */ + private async rowToOffer(row: any): Promise { + // Get topics for this offer + const topicStmt = this.db.prepare(` + SELECT topic FROM offer_topics WHERE offer_id = ? + `); + + const topicRows = topicStmt.all(row.id) as any[]; + const topics = topicRows.map(t => t.topic); + + return { + id: row.id, + peerId: row.peer_id, + sdp: row.sdp, + topics, + createdAt: row.created_at, + expiresAt: row.expires_at, + lastSeen: row.last_seen, + answererPeerId: row.answerer_peer_id || undefined, + answerSdp: row.answer_sdp || undefined, + answeredAt: row.answered_at || undefined, + }; + } } diff --git a/src/storage/types.ts b/src/storage/types.ts index 34d02d7..9968f9b 100644 --- a/src/storage/types.ts +++ b/src/storage/types.ts @@ -1,57 +1,163 @@ /** - * Represents a WebRTC signaling offer + * Represents a WebRTC signaling offer with topic-based discovery */ export interface Offer { - code: string; + id: string; peerId: string; - offer: string; - answer?: string; - offerCandidates: string[]; - answerCandidates: string[]; + sdp: string; + topics: string[]; createdAt: number; expiresAt: number; + lastSeen: number; + answererPeerId?: string; + answerSdp?: string; + answeredAt?: number; +} + +/** + * Represents an ICE candidate for WebRTC signaling + */ +export interface IceCandidate { + id: number; + offerId: string; + peerId: string; + role: 'offerer' | 'answerer'; + candidate: string; + createdAt: number; +} + +/** + * Represents a topic with active peer count + */ +export interface TopicInfo { + topic: string; + activePeers: number; +} + +/** + * Request to create a new offer + */ +export interface CreateOfferRequest { + id?: string; + peerId: string; + sdp: string; + topics: string[]; + expiresAt: number; } /** - * Storage interface for offer management + * Storage interface for offer management with topic-based discovery * Implementations can use different backends (SQLite, D1, Memory, etc.) */ export interface Storage { /** - * Creates a new offer - * @param peerId Peer identifier string (max 1024 chars) - * @param offer The WebRTC SDP offer message - * @param expiresAt Unix timestamp when the offer should expire - * @param customCode Optional custom code (if not provided, generates UUID) - * @returns The unique offer code + * Creates one or more offers + * @param offers Array of offer creation requests + * @returns Array of created offers with IDs */ - createOffer(peerId: string, offer: string, expiresAt: number, customCode?: string): Promise; + createOffers(offers: CreateOfferRequest[]): Promise; /** - * Retrieves an offer by its code - * @param code The offer code + * Retrieves offers by topic with optional peer ID exclusion + * @param topic Topic to search for + * @param excludePeerIds Optional array of peer IDs to exclude + * @returns Array of offers matching the topic + */ + getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise; + + /** + * Retrieves all offers from a specific peer + * @param peerId Peer identifier + * @returns Array of offers from the peer + */ + getOffersByPeerId(peerId: string): Promise; + + /** + * Retrieves a specific offer by ID + * @param offerId Offer identifier * @returns The offer if found, null otherwise */ - getOffer(code: string): Promise; + getOfferById(offerId: string): Promise; /** - * Updates an existing offer with new data - * @param code The offer code - * @param update Partial offer data to update + * Updates the last_seen timestamp for an offer (heartbeat) + * @param offerId Offer identifier + * @param lastSeen New last_seen timestamp */ - updateOffer(code: string, update: Partial): Promise; + updateOfferLastSeen(offerId: string, lastSeen: number): Promise; /** - * Deletes an offer - * @param code The offer code + * Deletes an offer (with ownership verification) + * @param offerId Offer identifier + * @param ownerPeerId Peer ID of the owner (for verification) + * @returns true if deleted, false if not found or not owned */ - deleteOffer(code: string): Promise; + deleteOffer(offerId: string, ownerPeerId: string): Promise; /** - * Removes expired offers - * Should be called periodically to clean up old data + * Deletes all expired offers + * @param now Current timestamp + * @returns Number of offers deleted */ - cleanup(): Promise; + deleteExpiredOffers(now: number): Promise; + + /** + * Answers an offer (locks it to the answerer) + * @param offerId Offer identifier + * @param answererPeerId Answerer's peer ID + * @param answerSdp WebRTC answer SDP + * @returns Success status and optional error message + */ + answerOffer(offerId: string, answererPeerId: string, answerSdp: string): Promise<{ + success: boolean; + error?: string; + }>; + + /** + * Retrieves all answered offers for a specific offerer + * @param offererPeerId Offerer's peer ID + * @returns Array of answered offers + */ + getAnsweredOffers(offererPeerId: string): Promise; + + /** + * Adds ICE candidates for an offer + * @param offerId Offer identifier + * @param peerId Peer ID posting the candidates + * @param role Role of the peer (offerer or answerer) + * @param candidates Array of ICE candidate strings + * @returns Number of candidates added + */ + addIceCandidates( + offerId: string, + peerId: string, + role: 'offerer' | 'answerer', + candidates: string[] + ): Promise; + + /** + * Retrieves ICE candidates for an offer + * @param offerId Offer identifier + * @param targetRole Role to retrieve candidates for (offerer or answerer) + * @param since Optional timestamp - only return candidates after this time + * @returns Array of ICE candidates + */ + getIceCandidates( + offerId: string, + targetRole: 'offerer' | 'answerer', + since?: number + ): Promise; + + /** + * Retrieves topics with active peer counts (paginated) + * @param limit Maximum number of topics to return + * @param offset Number of topics to skip + * @returns Object with topics array and total count + */ + getTopics(limit: number, offset: number): Promise<{ + topics: TopicInfo[]; + total: number; + }>; /** * Closes the storage connection and releases resources diff --git a/src/worker.ts b/src/worker.ts index 9189a25..d077408 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,12 +1,19 @@ import { createApp } from './app.ts'; import { D1Storage } from './storage/d1.ts'; +import { generateSecretKey } from './crypto.ts'; +import { Config } from './config.ts'; /** * Cloudflare Workers environment bindings */ export interface Env { DB: D1Database; - OFFER_TIMEOUT?: string; + AUTH_SECRET?: string; + OFFER_DEFAULT_TTL?: string; + OFFER_MAX_TTL?: string; + OFFER_MIN_TTL?: string; + MAX_OFFERS_PER_REQUEST?: string; + MAX_TOPICS_PER_OFFER?: string; CORS_ORIGINS?: string; VERSION?: string; } @@ -19,21 +26,29 @@ export default { // Initialize D1 storage const storage = new D1Storage(env.DB); - // Parse configuration - const offerTimeout = env.OFFER_TIMEOUT - ? parseInt(env.OFFER_TIMEOUT, 10) - : 60000; // 1 minute default + // Generate or use provided auth secret + const authSecret = env.AUTH_SECRET || generateSecretKey(); - const corsOrigins = env.CORS_ORIGINS - ? env.CORS_ORIGINS.split(',').map(o => o.trim()) - : ['*']; + // Build config from environment + const config: Config = { + port: 0, // Not used in Workers + storageType: 'sqlite', // D1 is SQLite-compatible + storagePath: '', // Not used with D1 + corsOrigins: env.CORS_ORIGINS + ? env.CORS_ORIGINS.split(',').map(o => o.trim()) + : ['*'], + version: env.VERSION || 'unknown', + authSecret, + offerDefaultTtl: env.OFFER_DEFAULT_TTL ? parseInt(env.OFFER_DEFAULT_TTL, 10) : 60000, + offerMaxTtl: env.OFFER_MAX_TTL ? parseInt(env.OFFER_MAX_TTL, 10) : 86400000, + offerMinTtl: env.OFFER_MIN_TTL ? parseInt(env.OFFER_MIN_TTL, 10) : 60000, + cleanupInterval: 60000, // Not used in Workers (scheduled handler instead) + maxOffersPerRequest: env.MAX_OFFERS_PER_REQUEST ? parseInt(env.MAX_OFFERS_PER_REQUEST, 10) : 100, + maxTopicsPerOffer: env.MAX_TOPICS_PER_OFFER ? parseInt(env.MAX_TOPICS_PER_OFFER, 10) : 50, + }; // Create Hono app - const app = createApp(storage, { - offerTimeout, - corsOrigins, - version: env.VERSION || 'unknown', - }); + const app = createApp(storage, config); // Handle request return app.fetch(request, env, ctx); @@ -41,15 +56,15 @@ export default { /** * Scheduled handler for cron triggers - * Runs every minute to clean up expired offers + * Runs periodically to clean up expired offers */ async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise { const storage = new D1Storage(env.DB); const now = Date.now(); try { - // Delete expired offers using the storage method - const deletedCount = await storage.cleanupExpiredOffers(); + // Delete expired offers + const deletedCount = await storage.deleteExpiredOffers(now); console.log(`Cleaned up ${deletedCount} expired offers at ${new Date(now).toISOString()}`); } catch (error) { diff --git a/wrangler.toml b/wrangler.toml index 5df1989..c389757 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -1,6 +1,7 @@ name = "rondevu" main = "src/worker.ts" compatibility_date = "2024-01-01" +compatibility_flags = ["nodejs_compat"] # D1 Database binding [[d1_databases]] @@ -10,9 +11,17 @@ database_id = "b94e3f71-816d-455b-a89d-927fa49532d0" # Environment variables [vars] -OFFER_TIMEOUT = "60000" # 1 minute in milliseconds +OFFER_DEFAULT_TTL = "60000" # Default offer TTL: 1 minute +OFFER_MAX_TTL = "86400000" # Max offer TTL: 24 hours +OFFER_MIN_TTL = "60000" # Min offer TTL: 1 minute +MAX_OFFERS_PER_REQUEST = "100" # Max offers per request +MAX_TOPICS_PER_OFFER = "50" # Max topics per offer CORS_ORIGINS = "*" # Comma-separated list of allowed origins -VERSION = "0.0.1" # Semantic version +VERSION = "0.1.0" # Semantic version + +# AUTH_SECRET should be set as a secret, not a var +# Run: npx wrangler secret put AUTH_SECRET +# Enter a 64-character hex string (32 bytes) # Build configuration [build]