diff --git a/D1_SETUP.md b/D1_SETUP.md new file mode 100644 index 0000000..505682d --- /dev/null +++ b/D1_SETUP.md @@ -0,0 +1,97 @@ +# D1 Database Setup + +This project uses Cloudflare D1 for storage instead of KV to avoid eventual consistency issues. + +## Local Development + +For local development, Wrangler automatically creates a local D1 database: + +```bash +npx wrangler dev +``` + +## Production Setup + +### 1. Create the D1 Database + +```bash +npx wrangler d1 create rondevu-sessions +``` + +This will output something like: + +``` +✅ Successfully created DB 'rondevu-sessions' + +[[d1_databases]] +binding = "DB" +database_name = "rondevu-sessions" +database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" +``` + +### 2. Update wrangler.toml + +Copy the `database_id` from the output and update it in `wrangler.toml`: + +```toml +[[d1_databases]] +binding = "DB" +database_name = "rondevu-sessions" +database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" # Replace with your actual ID +``` + +### 3. Initialize the Database Schema + +```bash +npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/schema.sql +``` + +### 4. Deploy + +```bash +npx wrangler deploy +``` + +## Database Migrations + +To run migrations on the remote database: + +```bash +npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/schema.sql +``` + +To run migrations on the local database: + +```bash +npx wrangler d1 execute rondevu-sessions --local --file=./migrations/schema.sql +``` + +## Querying the Database + +### Remote Database + +```bash +# List all sessions +npx wrangler d1 execute rondevu-sessions --remote --command="SELECT * FROM sessions" + +# Count sessions +npx wrangler d1 execute rondevu-sessions --remote --command="SELECT COUNT(*) FROM sessions" + +# Delete expired sessions +npx wrangler d1 execute rondevu-sessions --remote --command="DELETE FROM sessions WHERE expires_at <= $(date +%s)000" +``` + +### Local Database + +Replace `--remote` with `--local` for local queries. + +## Why D1 Instead of KV? + +D1 provides: +- **Strong consistency** - No race conditions from eventual consistency +- **ACID transactions** - Atomic updates prevent data corruption +- **SQL queries** - More powerful query capabilities +- **Relational data** - Better for complex queries and joins +- **No propagation delay** - Immediate read-after-write consistency + +KV's eventual consistency was causing race conditions where ICE candidate updates would overwrite answers with stale data. diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md index 4c4f15d..8db32ff 100644 --- a/DEPLOYMENT.md +++ b/DEPLOYMENT.md @@ -12,7 +12,7 @@ This guide covers deploying Rondevu to various platforms. ## Cloudflare Workers -Deploy to Cloudflare's edge network using Cloudflare Workers and KV storage. +Deploy to Cloudflare's edge network using Cloudflare Workers and D1 storage. ### Prerequisites @@ -27,26 +27,34 @@ npm install -g wrangler wrangler login ``` -2. **Create KV Namespace** +2. **Create D1 Database** ```bash # For production - wrangler kv:namespace create SESSIONS + npx wrangler d1 create rondevu-sessions - # This will output something like: - # { binding = "SESSIONS", id = "abc123..." } + # This will output: + # database_name = "rondevu-sessions" + # database_id = "abc123..." ``` 3. **Update wrangler.toml** - Edit `wrangler.toml` and replace `YOUR_KV_NAMESPACE_ID` with the ID from step 2: + Edit `wrangler.toml` and replace the `database_id` with the ID from step 2: ```toml - [[kv_namespaces]] - binding = "SESSIONS" - id = "abc123..." # Your actual KV namespace ID + [[d1_databases]] + binding = "DB" + database_name = "rondevu-sessions" + database_id = "abc123..." # Your actual D1 database ID ``` -4. **Configure Environment Variables** (Optional) +4. **Run Database Migration** + ```bash + # Run the migration on the remote database + npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/0001_add_peer_id.sql + ``` + +5. **Configure Environment Variables** (Optional) Update `wrangler.toml` to customize settings: @@ -64,7 +72,7 @@ npx wrangler dev # The local development server will: # - Start on http://localhost:8787 -# - Use a local KV namespace automatically +# - Use a local D1 database automatically # - Hot-reload on file changes ``` @@ -109,9 +117,9 @@ npx wrangler tail Cloudflare Workers Free Tier includes: - 100,000 requests/day - 10ms CPU time per request -- KV: 100,000 reads/day, 1,000 writes/day +- D1: 5 GB storage, 5 million reads/day, 100,000 writes/day -For higher usage, see [Cloudflare Workers pricing](https://workers.cloudflare.com/#plans). +For higher usage, see [Cloudflare Workers pricing](https://workers.cloudflare.com/#plans) and [D1 pricing](https://developers.cloudflare.com/d1/platform/pricing/). ### Advantages diff --git a/README.md b/README.md index 3c12aaa..386e394 100644 --- a/README.md +++ b/README.md @@ -1,242 +1,62 @@ # Rondevu -An open signaling and tracking server for peer discovery. Enables peers to find each other through a topic-based HTTP API with Origin isolation for organizing peer-to-peer applications. +🎯 Meet WebRTC peers by topic, by peer ID, or by connection ID. -## Features +## Rondevu Server -- 🚀 **Fast & Lightweight** - Built with [Hono](https://hono.dev/) framework -- 📂 **Topic-Based Organization** - Group sessions by topic for easy peer discovery -- 🔒 **Origin Isolation** - Sessions are isolated by HTTP Origin header to group topics by domain -- 🏷️ **Peer Identification** - Info field prevents duplicate connections to same peer -- 🔌 **Pluggable Storage** - Storage interface supports SQLite and in-memory adapters -- 🐳 **Docker Ready** - Minimal Alpine-based Docker image -- ⏱️ **Session Timeout** - Configurable session expiration from initiation time -- 🔐 **Type Safe** - Written in TypeScript with full type definitions +A simple HTTP server for WebRTC peer signaling and discovery. -## Quick Start +**Three ways to connect:** by topic, by peer ID, or by connection ID. -### Using Node.js +### Quick Start +**Node.js:** ```bash -# Install dependencies -npm install - -# Run in development mode -npm run dev - -# Build and run in production -npm run build -npm start +npm install && npm start ``` -### Using Docker - +**Docker:** ```bash -# Build the image -docker build -t rondevu . - -# Run with default settings (SQLite database) -docker run -p 3000:3000 rondevu - -# Run with in-memory storage -docker run -p 3000:3000 -e STORAGE_TYPE=memory rondevu - -# Run with custom timeout (10 minutes) -docker run -p 3000:3000 -e SESSION_TIMEOUT=600000 rondevu +docker build -t rondevu . && docker run -p 3000:3000 rondevu ``` -### Using Cloudflare Workers - +**Cloudflare Workers:** ```bash -# Install Wrangler CLI -npm install -g wrangler - -# Login to Cloudflare -wrangler login - -# Create KV namespace -wrangler kv:namespace create SESSIONS - -# Update wrangler.toml with the KV namespace ID - -# Deploy to Cloudflare's edge network npx wrangler deploy ``` -See [DEPLOYMENT.md](./DEPLOYMENT.md#cloudflare-workers) for detailed instructions. +See [DEPLOYMENT.md](./DEPLOYMENT.md) for details. -## Configuration +### API -Configuration is done through environment variables: +```bash +# Create offer +POST /:topic/offer {"peerId":"alice","offer":"..."} -| Variable | Description | Default | -|--------------------|--------------------------------------------------|-------------| -| `PORT` | Server port | `3000` | -| `STORAGE_TYPE` | Storage backend: `sqlite` or `memory` | `sqlite` | -| `STORAGE_PATH` | Path to SQLite database file | `./data.db` | -| `SESSION_TIMEOUT` | Session timeout in milliseconds | `300000` | -| `CORS_ORIGINS` | Comma-separated list of allowed origins | `*` | +# List sessions +GET /:topic/sessions -### Example .env file +# Send answer +POST /answer {"code":"...","answer":"..."} + +# Poll for updates +POST /poll {"code":"...","side":"offerer|answerer"} +``` + +See [API.md](./API.md) for details. + +### Configuration ```env PORT=3000 -STORAGE_TYPE=sqlite -STORAGE_PATH=./sessions.db SESSION_TIMEOUT=300000 -CORS_ORIGINS=https://example.com,https://app.example.com +CORS_ORIGINS=* ``` -## API Documentation +### Storage -See [API.md](./API.md) for complete API documentation. +Supports SQLite (Node.js/Docker) or D1 (Cloudflare Workers). -### Quick Overview - -**List all active topics (with pagination):** -```bash -curl -X GET http://localhost:3000/ \ - -H "Origin: https://example.com" -# Returns: {"topics":[{"topic":"my-room","count":3}],"pagination":{...}} -``` - -**Create an offer (announce yourself as available):** -```bash -curl -X POST http://localhost:3000/my-room/offer \ - -H "Content-Type: application/json" \ - -H "Origin: https://example.com" \ - -d '{"info":"peer-123","offer":""}' -# Returns: {"code":"550e8400-e29b-41d4-a716-446655440000"} -``` - -**List available peers in a topic:** -```bash -curl -X GET http://localhost:3000/my-room/sessions \ - -H "Origin: https://example.com" -# Returns: {"sessions":[...]} -``` - -**Connect to a peer:** -```bash -curl -X POST http://localhost:3000/answer \ - -H "Content-Type: application/json" \ - -H "Origin: https://example.com" \ - -d '{"code":"550e8400-...","answer":"","side":"answerer"}' -# Returns: {"success":true} -``` - -## Architecture - -### Storage Interface - -The storage layer is abstracted through a simple interface, making it easy to implement custom storage backends: - -```typescript -interface Storage { - createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise; - listSessionsByTopic(origin: string, topic: string): Promise; - getSession(code: string, origin: string): Promise; - updateSession(code: string, origin: string, update: Partial): Promise; - deleteSession(code: string): Promise; - cleanup(): Promise; - close(): Promise; -} -``` - -### Built-in Storage Adapters - -**SQLite Storage** (`sqlite.ts`) -- For Node.js/Docker deployments -- Persistent file-based or in-memory -- Automatic session cleanup -- Simple and reliable - -**Cloudflare KV Storage** (`kv.ts`) -- For Cloudflare Workers deployments -- Global edge storage -- Automatic TTL-based expiration -- Distributed and highly available - -### Custom Storage Adapters - -You can implement your own storage adapter by implementing the `Storage` interface: - -```typescript -import { Storage, Session } from './storage/types'; - -export class CustomStorage implements Storage { - async createSession(offer: string, expiresAt: number): Promise { - // Your implementation - } - // ... implement other methods -} -``` - -## Development - -### Project Structure - -``` -rondevu/ -├── src/ -│ ├── index.ts # Node.js server entry point -│ ├── app.ts # Hono application -│ ├── config.ts # Configuration -│ └── storage/ -│ ├── types.ts # Storage interface -│ ├── sqlite.ts # SQLite adapter -│ └── codeGenerator.ts # Code generation utility -├── Dockerfile # Docker build configuration -├── build.js # Build script -├── API.md # API documentation -└── README.md # This file -``` - -### Building - -```bash -# Build TypeScript -npm run build - -# Run built version -npm start -``` - -### Docker Build - -```bash -# Build the image -docker build -t rondevu . - -# Run with volume for persistent storage -docker run -p 3000:3000 -v $(pwd)/data:/app/data rondevu -``` - -## How It Works - -1. **Discover topics** (optional): Call `GET /` to see all active topics and peer counts -2. **Peer A** announces availability by posting to `/:topic/offer` with peer identifier and signaling data -3. Server generates a unique UUID code and stores the session (bucketed by Origin and topic) -4. **Peer B** discovers available peers using `GET /:topic/sessions` -5. **Peer B** filters out their own session using the info field to avoid self-connection -6. **Peer B** selects a peer and posts their connection data to `POST /answer` with the session code -7. Both peers exchange signaling data through `POST /answer` endpoint -8. Both peers poll for updates using `POST /poll` to retrieve connection information -9. Sessions automatically expire after the configured timeout - -This allows peers in distributed systems to discover each other without requiring a centralized registry, while maintaining isolation between different applications through Origin headers. - -### Origin Isolation - -Sessions are isolated by the HTTP `Origin` header, ensuring that: -- Peers can only see sessions from their own origin -- Session codes cannot be accessed cross-origin -- Topics are organized by application domain - -## License +### License MIT - -## Contributing - -Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/migrations/0001_add_peer_id.sql b/migrations/0001_add_peer_id.sql new file mode 100644 index 0000000..963b2a9 --- /dev/null +++ b/migrations/0001_add_peer_id.sql @@ -0,0 +1,21 @@ +-- Drop old sessions table with 'info' column +DROP TABLE IF EXISTS sessions; + +-- Create sessions table with peer_id column +CREATE TABLE sessions ( + code TEXT PRIMARY KEY, + origin TEXT NOT NULL, + topic TEXT NOT NULL, + peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), + offer TEXT NOT NULL, + answer TEXT, + offer_candidates TEXT NOT NULL DEFAULT '[]', + answer_candidates TEXT NOT NULL DEFAULT '[]', + created_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL +); + +-- Create indexes for efficient queries +CREATE INDEX idx_expires_at ON sessions(expires_at); +CREATE INDEX idx_origin_topic ON sessions(origin, topic); +CREATE INDEX idx_origin_topic_expires ON sessions(origin, topic, expires_at); diff --git a/migrations/schema.sql b/migrations/schema.sql new file mode 100644 index 0000000..0fdfc51 --- /dev/null +++ b/migrations/schema.sql @@ -0,0 +1,18 @@ +-- Create sessions table +CREATE TABLE IF NOT EXISTS sessions ( + code TEXT PRIMARY KEY, + origin TEXT NOT NULL, + topic TEXT NOT NULL, + peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), + offer TEXT NOT NULL, + answer TEXT, + offer_candidates TEXT NOT NULL DEFAULT '[]', + answer_candidates TEXT NOT NULL DEFAULT '[]', + created_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL +); + +-- Create indexes for efficient queries +CREATE INDEX IF NOT EXISTS idx_expires_at ON sessions(expires_at); +CREATE INDEX IF NOT EXISTS idx_origin_topic ON sessions(origin, topic); +CREATE INDEX IF NOT EXISTS idx_origin_topic_expires ON sessions(origin, topic, expires_at); diff --git a/package.json b/package.json index 14d049b..17a8fce 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,8 @@ "typecheck": "tsc", "dev": "ts-node src/index.ts", "start": "node dist/index.js", - "test": "echo \"Error: no test specified\" && exit 1" + "test": "echo \"Error: no test specified\" && exit 1", + "deploy": "npm run build && npx wrangler deploy" }, "devDependencies": { "@cloudflare/workers-types": "^4.20251014.0", diff --git a/src/app.ts b/src/app.ts index c65eb2d..fb4a963 100644 --- a/src/app.ts +++ b/src/app.ts @@ -67,8 +67,8 @@ export function createApp(storage: Storage, config: AppConfig) { return c.json({ error: 'Missing required parameter: topic' }, 400); } - if (topic.length > 256) { - return c.json({ error: 'Topic string must be 256 characters or less' }, 400); + if (topic.length > 1024) { + return c.json({ error: 'Topic string must be 1024 characters or less' }, 400); } const sessions = await storage.listSessionsByTopic(origin, topic); @@ -76,7 +76,7 @@ export function createApp(storage: Storage, config: AppConfig) { return c.json({ sessions: sessions.map(s => ({ code: s.code, - info: s.info, + peerId: s.peerId, offer: s.offer, offerCandidates: s.offerCandidates, createdAt: s.createdAt, @@ -92,29 +92,29 @@ export function createApp(storage: Storage, config: AppConfig) { /** * POST /:topic/offer * Creates a new offer and returns a unique session code - * Body: { info: string, offer: string } + * Body: { peerId: string, offer: string } */ app.post('/:topic/offer', async (c) => { try { const origin = c.req.header('Origin') || c.req.header('origin') || 'unknown'; const topic = c.req.param('topic'); const body = await c.req.json(); - const { info, offer } = body; + const { peerId, offer, code: customCode } = body; if (!topic || typeof topic !== 'string') { return c.json({ error: 'Missing or invalid required parameter: topic' }, 400); } - if (topic.length > 256) { - return c.json({ error: 'Topic string must be 256 characters or less' }, 400); + if (topic.length > 1024) { + return c.json({ error: 'Topic string must be 1024 characters or less' }, 400); } - if (!info || typeof info !== 'string') { - return c.json({ error: 'Missing or invalid required parameter: info' }, 400); + if (!peerId || typeof peerId !== 'string') { + return c.json({ error: 'Missing or invalid required parameter: peerId' }, 400); } - if (info.length > 1024) { - return c.json({ error: 'Info string must be 1024 characters or less' }, 400); + if (peerId.length > 1024) { + return c.json({ error: 'PeerId string must be 1024 characters or less' }, 400); } if (!offer || typeof offer !== 'string') { @@ -122,7 +122,7 @@ export function createApp(storage: Storage, config: AppConfig) { } const expiresAt = Date.now() + config.sessionTimeout; - const code = await storage.createSession(origin, topic, info, offer, expiresAt); + const code = await storage.createSession(origin, topic, peerId, offer, expiresAt, customCode); return c.json({ code }, 200); } catch (err) { diff --git a/src/storage/d1.ts b/src/storage/d1.ts new file mode 100644 index 0000000..a5897b8 --- /dev/null +++ b/src/storage/d1.ts @@ -0,0 +1,286 @@ +import { Storage, Session } from './types.ts'; + +// Generate a UUID v4 +function generateUUID(): string { + return crypto.randomUUID(); +} + +/** + * D1 storage adapter for session management using Cloudflare D1 + */ +export class D1Storage implements Storage { + private db: D1Database; + + /** + * Creates a new D1 storage instance + * @param db D1Database instance from Cloudflare Workers environment + */ + constructor(db: D1Database) { + this.db = db; + } + + /** + * Initializes database schema + * This should be run once during setup, not on every request + */ + async initializeDatabase(): Promise { + await this.db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + code TEXT PRIMARY KEY, + origin TEXT NOT NULL, + topic TEXT NOT NULL, + peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), + offer TEXT NOT NULL, + answer TEXT, + offer_candidates TEXT NOT NULL DEFAULT '[]', + answer_candidates TEXT NOT NULL DEFAULT '[]', + created_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_expires_at ON sessions(expires_at); + CREATE INDEX IF NOT EXISTS idx_origin_topic ON sessions(origin, topic); + CREATE INDEX IF NOT EXISTS idx_origin_topic_expires ON sessions(origin, topic, expires_at); + `); + } + + async listTopics(origin: string, page: number = 1, limit: number = 100): Promise<{ + topics: Array<{ topic: string; count: number }>; + pagination: { + page: number; + limit: number; + total: number; + hasMore: boolean; + }; + }> { + // Clamp limit to maximum of 1000 + const effectiveLimit = Math.min(limit, 1000); + const offset = (page - 1) * effectiveLimit; + + try { + // Get total count of topics for this origin + const countResult = await this.db.prepare(` + SELECT COUNT(DISTINCT topic) as total + FROM sessions + WHERE origin = ? AND expires_at > ? AND answer IS NULL + `).bind(origin, Date.now()).first(); + + const total = countResult ? Number(countResult.total) : 0; + + // Get paginated topics + const result = await this.db.prepare(` + SELECT topic, COUNT(*) as count + FROM sessions + WHERE origin = ? AND expires_at > ? AND answer IS NULL + GROUP BY topic + ORDER BY topic ASC + LIMIT ? OFFSET ? + `).bind(origin, Date.now(), effectiveLimit, offset).all(); + + // D1 returns results in the results array, or empty array if no results + if (!result.results) { + console.error('[D1] listTopics: No results property in response:', result); + return { + topics: [], + pagination: { + page, + limit: effectiveLimit, + total: 0, + hasMore: false, + }, + }; + } + + const topics = result.results.map((row: any) => ({ + topic: row.topic, + count: Number(row.count), + })); + + return { + topics, + pagination: { + page, + limit: effectiveLimit, + total, + hasMore: offset + topics.length < total, + }, + }; + } catch (error) { + console.error('[D1] listTopics error:', error); + throw error; + } + } + + async listSessionsByTopic(origin: string, topic: string): Promise { + try { + const result = await this.db.prepare(` + SELECT * FROM sessions + WHERE origin = ? AND topic = ? AND expires_at > ? AND answer IS NULL + ORDER BY created_at DESC + `).bind(origin, topic, Date.now()).all(); + + if (!result.results) { + console.error('[D1] listSessionsByTopic: No results property in response:', result); + return []; + } + + return result.results.map((row: any) => ({ + code: row.code, + origin: row.origin, + topic: row.topic, + peerId: row.peer_id, + offer: row.offer, + answer: row.answer || undefined, + offerCandidates: JSON.parse(row.offer_candidates), + answerCandidates: JSON.parse(row.answer_candidates), + createdAt: row.created_at, + expiresAt: row.expires_at, + })); + } catch (error) { + console.error('[D1] listSessionsByTopic error:', error); + throw error; + } + } + + async createSession( + origin: string, + topic: string, + peerId: string, + offer: string, + expiresAt: number, + customCode?: string + ): Promise { + let code: string; + let attempts = 0; + const maxAttempts = 10; + + // Generate unique code or use custom + do { + code = customCode || generateUUID(); + attempts++; + + if (attempts > maxAttempts) { + throw new Error('Failed to generate unique session code'); + } + + try { + await this.db.prepare(` + INSERT INTO sessions (code, origin, topic, peer_id, offer, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `).bind(code, origin, topic, peerId, offer, Date.now(), expiresAt).run(); + + break; + } catch (err: any) { + // If unique constraint failed with custom code, throw error + if (err.message?.includes('UNIQUE constraint failed')) { + if (customCode) { + throw new Error(`Session code '${customCode}' already exists`); + } + // Try again with new generated code + continue; + } + throw err; + } + } while (true); + + return code; + } + + async getSession(code: string, origin: string): Promise { + try { + const result = await this.db.prepare(` + SELECT * FROM sessions + WHERE code = ? AND origin = ? AND expires_at > ? + `).bind(code, origin, Date.now()).first(); + + if (!result) { + return null; + } + + const row: any = result; + + return { + code: row.code, + origin: row.origin, + topic: row.topic, + peerId: row.peer_id, + offer: row.offer, + answer: row.answer || undefined, + offerCandidates: JSON.parse(row.offer_candidates || '[]'), + answerCandidates: JSON.parse(row.answer_candidates || '[]'), + createdAt: row.created_at, + expiresAt: row.expires_at, + }; + } catch (error) { + console.error('[D1] getSession error:', error); + throw error; + } + } + + async updateSession(code: string, origin: string, update: Partial): Promise { + // Verify session exists and origin matches + const current = await this.getSession(code, origin); + + if (!current) { + throw new Error('Session not found or origin mismatch'); + } + + // Build update query dynamically based on what fields are being updated + const updates: string[] = []; + const values: any[] = []; + + if (update.answer !== undefined) { + updates.push('answer = ?'); + values.push(update.answer); + } + + if (update.offerCandidates !== undefined) { + updates.push('offer_candidates = ?'); + values.push(JSON.stringify(update.offerCandidates)); + } + + if (update.answerCandidates !== undefined) { + updates.push('answer_candidates = ?'); + values.push(JSON.stringify(update.answerCandidates)); + } + + if (updates.length === 0) { + return; // Nothing to update + } + + // Add WHERE clause values + values.push(code, origin); + + // D1 provides strong consistency, so this update is atomic and immediately visible + const query = ` + UPDATE sessions + SET ${updates.join(', ')} + WHERE code = ? AND origin = ? + `; + + await this.db.prepare(query).bind(...values).run(); + } + + async deleteSession(code: string): Promise { + await this.db.prepare(` + DELETE FROM sessions WHERE code = ? + `).bind(code).run(); + } + + async cleanupExpiredSessions(): Promise { + const result = await this.db.prepare(` + DELETE FROM sessions WHERE expires_at <= ? + `).bind(Date.now()).run(); + + return result.meta.changes || 0; + } + + async cleanup(): Promise { + await this.cleanupExpiredSessions(); + } + + async close(): Promise { + // D1 doesn't require explicit connection closing + // Connections are managed by the Cloudflare Workers runtime + } +} diff --git a/src/storage/kv.ts b/src/storage/kv.ts deleted file mode 100644 index 8890b5c..0000000 --- a/src/storage/kv.ts +++ /dev/null @@ -1,241 +0,0 @@ -import { Storage, Session } from './types.ts'; - -/** - * Cloudflare KV storage adapter for session management - */ -export class KVStorage implements Storage { - private kv: KVNamespace; - - /** - * Creates a new KV storage instance - * @param kv Cloudflare KV namespace binding - */ - constructor(kv: KVNamespace) { - this.kv = kv; - } - - /** - * Generates a unique code using Web Crypto API - */ - private generateCode(): string { - return crypto.randomUUID(); - } - - /** - * Gets the key for storing a session - */ - private sessionKey(code: string): string { - return `session:${code}`; - } - - /** - * Gets the key for the topic index - */ - private topicIndexKey(origin: string, topic: string): string { - return `index:${origin}:${topic}`; - } - - async createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise { - // Validate info length - if (info.length > 1024) { - throw new Error('Info string must be 1024 characters or less'); - } - - const code = this.generateCode(); - const createdAt = Date.now(); - - const session: Session = { - code, - origin, - topic, - info, - offer, - answer: undefined, - offerCandidates: [], - answerCandidates: [], - createdAt, - expiresAt, - }; - - // Calculate TTL in seconds for KV - const ttl = Math.max(60, Math.floor((expiresAt - createdAt) / 1000)); - - // Store the session - await this.kv.put( - this.sessionKey(code), - JSON.stringify(session), - { expirationTtl: ttl } - ); - - // Update the topic index - const indexKey = this.topicIndexKey(origin, topic); - const existingIndex = await this.kv.get(indexKey, 'json') as string[] | null; - const updatedIndex = existingIndex ? [...existingIndex, code] : [code]; - - // Set index TTL to slightly longer than session TTL to avoid race conditions - await this.kv.put( - indexKey, - JSON.stringify(updatedIndex), - { expirationTtl: ttl + 300 } - ); - - return code; - } - - async listSessionsByTopic(origin: string, topic: string): Promise { - const indexKey = this.topicIndexKey(origin, topic); - const codes = await this.kv.get(indexKey, 'json') as string[] | null; - - if (!codes || codes.length === 0) { - return []; - } - - // Fetch all sessions in parallel - const sessionPromises = codes.map(async (code) => { - const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null; - return sessionData; - }); - - const sessions = await Promise.all(sessionPromises); - - // Filter out expired or answered sessions, and null values - const now = Date.now(); - const validSessions = sessions.filter( - (session): session is Session => - session !== null && - session.expiresAt > now && - session.answer === undefined - ); - - // Sort by creation time (newest first) - return validSessions.sort((a, b) => b.createdAt - a.createdAt); - } - - async listTopics(origin: string, page: number, limit: number): Promise<{ - topics: Array<{ topic: string; count: number }>; - pagination: { - page: number; - limit: number; - total: number; - hasMore: boolean; - }; - }> { - // Ensure limit doesn't exceed 1000 - const safeLimit = Math.min(Math.max(1, limit), 1000); - const safePage = Math.max(1, page); - - const prefix = `index:${origin}:`; - const topicCounts = new Map(); - - // List all index keys for this origin - const list = await this.kv.list({ prefix }); - - // Process each topic index - for (const key of list.keys) { - // Extract topic from key: "index:{origin}:{topic}" - const topic = key.name.substring(prefix.length); - - // Get the session codes for this topic - const codes = await this.kv.get(key.name, 'json') as string[] | null; - - if (!codes || codes.length === 0) { - continue; - } - - // Fetch sessions to count only valid ones (unexpired and unanswered) - const sessionPromises = codes.map(async (code) => { - const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null; - return sessionData; - }); - - const sessions = await Promise.all(sessionPromises); - - // Count valid sessions - const now = Date.now(); - const validCount = sessions.filter( - (session) => - session !== null && - session.expiresAt > now && - session.answer === undefined - ).length; - - if (validCount > 0) { - topicCounts.set(topic, validCount); - } - } - - // Convert to array and sort by topic name - const allTopics = Array.from(topicCounts.entries()) - .map(([topic, count]) => ({ topic, count })) - .sort((a, b) => a.topic.localeCompare(b.topic)); - - // Apply pagination - const total = allTopics.length; - const offset = (safePage - 1) * safeLimit; - const topics = allTopics.slice(offset, offset + safeLimit); - - return { - topics, - pagination: { - page: safePage, - limit: safeLimit, - total, - hasMore: offset + topics.length < total, - }, - }; - } - - async getSession(code: string, origin: string): Promise { - const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null; - - if (!sessionData) { - return null; - } - - // Validate origin and expiration - if (sessionData.origin !== origin || sessionData.expiresAt <= Date.now()) { - return null; - } - - return sessionData; - } - - async updateSession(code: string, origin: string, update: Partial): Promise { - const current = await this.getSession(code, origin); - - if (!current) { - throw new Error('Session not found or origin mismatch'); - } - - // Merge updates - const updated: Session = { - ...current, - ...(update.answer !== undefined && { answer: update.answer }), - ...(update.offerCandidates !== undefined && { offerCandidates: update.offerCandidates }), - ...(update.answerCandidates !== undefined && { answerCandidates: update.answerCandidates }), - }; - - // Calculate remaining TTL - const ttl = Math.max(60, Math.floor((updated.expiresAt - Date.now()) / 1000)); - - // Update the session - await this.kv.put( - this.sessionKey(code), - JSON.stringify(updated), - { expirationTtl: ttl } - ); - } - - async deleteSession(code: string): Promise { - await this.kv.delete(this.sessionKey(code)); - } - - async cleanup(): Promise { - // KV automatically expires keys based on TTL - // No manual cleanup needed - } - - async close(): Promise { - // No connection to close for KV - } -} diff --git a/src/storage/sqlite.ts b/src/storage/sqlite.ts index e73dafb..f21a195 100644 --- a/src/storage/sqlite.ts +++ b/src/storage/sqlite.ts @@ -28,7 +28,7 @@ export class SQLiteStorage implements Storage { code TEXT PRIMARY KEY, origin TEXT NOT NULL, topic TEXT NOT NULL, - info TEXT NOT NULL CHECK(length(info) <= 1024), + peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024), offer TEXT NOT NULL, answer TEXT, offer_candidates TEXT NOT NULL DEFAULT '[]', @@ -62,19 +62,19 @@ export class SQLiteStorage implements Storage { return randomUUID(); } - async createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise { - // Validate info length - if (info.length > 1024) { - throw new Error('Info string must be 1024 characters or less'); + async createSession(origin: string, topic: string, peerId: string, offer: string, expiresAt: number, customCode?: string): Promise { + // Validate peerId length + if (peerId.length > 1024) { + throw new Error('PeerId string must be 1024 characters or less'); } let code: string; let attempts = 0; const maxAttempts = 10; - // Try to generate a unique code + // Try to generate or use custom code do { - code = this.generateCode(); + code = customCode || this.generateCode(); attempts++; if (attempts > maxAttempts) { @@ -83,15 +83,19 @@ export class SQLiteStorage implements Storage { try { const stmt = this.db.prepare(` - INSERT INTO sessions (code, origin, topic, info, offer, created_at, expires_at) + INSERT INTO sessions (code, origin, topic, peer_id, offer, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?) `); - stmt.run(code, origin, topic, info, offer, Date.now(), expiresAt); + stmt.run(code, origin, topic, peerId, offer, Date.now(), expiresAt); break; } catch (err: any) { - // If unique constraint failed, try again + // If unique constraint failed with custom code, throw error if (err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY') { + if (customCode) { + throw new Error(`Session code '${customCode}' already exists`); + } + // Try again with new generated code continue; } throw err; @@ -114,7 +118,7 @@ export class SQLiteStorage implements Storage { code: row.code, origin: row.origin, topic: row.topic, - info: row.info, + peerId: row.peer_id, offer: row.offer, answer: row.answer || undefined, offerCandidates: JSON.parse(row.offer_candidates), @@ -189,7 +193,7 @@ export class SQLiteStorage implements Storage { code: row.code, origin: row.origin, topic: row.topic, - info: row.info, + peerId: row.peer_id, offer: row.offer, answer: row.answer || undefined, offerCandidates: JSON.parse(row.offer_candidates), diff --git a/src/storage/types.ts b/src/storage/types.ts index a420e0f..1cb9ab3 100644 --- a/src/storage/types.ts +++ b/src/storage/types.ts @@ -5,7 +5,7 @@ export interface Session { code: string; origin: string; topic: string; - info: string; + peerId: string; offer: string; answer?: string; offerCandidates: string[]; @@ -23,12 +23,13 @@ export interface Storage { * Creates a new session with the given offer * @param origin The Origin header from the request * @param topic The topic to post the offer to - * @param info User info string (max 1024 chars) + * @param peerId Peer identifier string (max 1024 chars) * @param offer The WebRTC SDP offer message * @param expiresAt Unix timestamp when the session should expire + * @param customCode Optional custom code (if not provided, generates UUID) * @returns The unique session code */ - createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise; + createSession(origin: string, topic: string, peerId: string, offer: string, expiresAt: number, customCode?: string): Promise; /** * Lists all unanswered sessions for a given origin and topic diff --git a/src/worker.ts b/src/worker.ts index 3696545..1006120 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,11 +1,11 @@ import { createApp } from './app.ts'; -import { KVStorage } from './storage/kv.ts'; +import { D1Storage } from './storage/d1.ts'; /** * Cloudflare Workers environment bindings */ export interface Env { - SESSIONS: KVNamespace; + DB: D1Database; SESSION_TIMEOUT?: string; CORS_ORIGINS?: string; } @@ -15,8 +15,8 @@ export interface Env { */ export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { - // Initialize KV storage - const storage = new KVStorage(env.SESSIONS); + // Initialize D1 storage + const storage = new D1Storage(env.DB); // Parse configuration const sessionTimeout = env.SESSION_TIMEOUT