Replace KV storage with D1, add peer_id field, simplify README

Storage changes:
- Remove KV storage adapter
- Add D1 storage adapter for Cloudflare Workers
- Update schema to use peer_id instead of info field
- Add database migrations for D1

Documentation:
- Simplify README to be more concise
- Update deployment instructions for D1
- Add D1_SETUP.md explaining migration from KV
- Update DEPLOYMENT.md with D1 setup steps

API changes:
- Replace info field with peerId in session creation
- Update all storage interfaces and implementations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-07 21:21:13 +01:00
parent d993d6dbfc
commit 02d460fa7e
12 changed files with 512 additions and 497 deletions

97
D1_SETUP.md Normal file
View File

@@ -0,0 +1,97 @@
# D1 Database Setup
This project uses Cloudflare D1 for storage instead of KV to avoid eventual consistency issues.
## Local Development
For local development, Wrangler automatically creates a local D1 database:
```bash
npx wrangler dev
```
## Production Setup
### 1. Create the D1 Database
```bash
npx wrangler d1 create rondevu-sessions
```
This will output something like:
```
✅ Successfully created DB 'rondevu-sessions'
[[d1_databases]]
binding = "DB"
database_name = "rondevu-sessions"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
```
### 2. Update wrangler.toml
Copy the `database_id` from the output and update it in `wrangler.toml`:
```toml
[[d1_databases]]
binding = "DB"
database_name = "rondevu-sessions"
database_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" # Replace with your actual ID
```
### 3. Initialize the Database Schema
```bash
npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/schema.sql
```
### 4. Deploy
```bash
npx wrangler deploy
```
## Database Migrations
To run migrations on the remote database:
```bash
npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/schema.sql
```
To run migrations on the local database:
```bash
npx wrangler d1 execute rondevu-sessions --local --file=./migrations/schema.sql
```
## Querying the Database
### Remote Database
```bash
# List all sessions
npx wrangler d1 execute rondevu-sessions --remote --command="SELECT * FROM sessions"
# Count sessions
npx wrangler d1 execute rondevu-sessions --remote --command="SELECT COUNT(*) FROM sessions"
# Delete expired sessions
npx wrangler d1 execute rondevu-sessions --remote --command="DELETE FROM sessions WHERE expires_at <= $(date +%s)000"
```
### Local Database
Replace `--remote` with `--local` for local queries.
## Why D1 Instead of KV?
D1 provides:
- **Strong consistency** - No race conditions from eventual consistency
- **ACID transactions** - Atomic updates prevent data corruption
- **SQL queries** - More powerful query capabilities
- **Relational data** - Better for complex queries and joins
- **No propagation delay** - Immediate read-after-write consistency
KV's eventual consistency was causing race conditions where ICE candidate updates would overwrite answers with stale data.

View File

@@ -12,7 +12,7 @@ This guide covers deploying Rondevu to various platforms.
## Cloudflare Workers
Deploy to Cloudflare's edge network using Cloudflare Workers and KV storage.
Deploy to Cloudflare's edge network using Cloudflare Workers and D1 storage.
### Prerequisites
@@ -27,26 +27,34 @@ npm install -g wrangler
wrangler login
```
2. **Create KV Namespace**
2. **Create D1 Database**
```bash
# For production
wrangler kv:namespace create SESSIONS
npx wrangler d1 create rondevu-sessions
# This will output something like:
# { binding = "SESSIONS", id = "abc123..." }
# This will output:
# database_name = "rondevu-sessions"
# database_id = "abc123..."
```
3. **Update wrangler.toml**
Edit `wrangler.toml` and replace `YOUR_KV_NAMESPACE_ID` with the ID from step 2:
Edit `wrangler.toml` and replace the `database_id` with the ID from step 2:
```toml
[[kv_namespaces]]
binding = "SESSIONS"
id = "abc123..." # Your actual KV namespace ID
[[d1_databases]]
binding = "DB"
database_name = "rondevu-sessions"
database_id = "abc123..." # Your actual D1 database ID
```
4. **Configure Environment Variables** (Optional)
4. **Run Database Migration**
```bash
# Run the migration on the remote database
npx wrangler d1 execute rondevu-sessions --remote --file=./migrations/0001_add_peer_id.sql
```
5. **Configure Environment Variables** (Optional)
Update `wrangler.toml` to customize settings:
@@ -64,7 +72,7 @@ npx wrangler dev
# The local development server will:
# - Start on http://localhost:8787
# - Use a local KV namespace automatically
# - Use a local D1 database automatically
# - Hot-reload on file changes
```
@@ -109,9 +117,9 @@ npx wrangler tail
Cloudflare Workers Free Tier includes:
- 100,000 requests/day
- 10ms CPU time per request
- KV: 100,000 reads/day, 1,000 writes/day
- D1: 5 GB storage, 5 million reads/day, 100,000 writes/day
For higher usage, see [Cloudflare Workers pricing](https://workers.cloudflare.com/#plans).
For higher usage, see [Cloudflare Workers pricing](https://workers.cloudflare.com/#plans) and [D1 pricing](https://developers.cloudflare.com/d1/platform/pricing/).
### Advantages

242
README.md
View File

@@ -1,242 +1,62 @@
# Rondevu
An open signaling and tracking server for peer discovery. Enables peers to find each other through a topic-based HTTP API with Origin isolation for organizing peer-to-peer applications.
🎯 Meet WebRTC peers by topic, by peer ID, or by connection ID.
## Features
## Rondevu Server
- 🚀 **Fast & Lightweight** - Built with [Hono](https://hono.dev/) framework
- 📂 **Topic-Based Organization** - Group sessions by topic for easy peer discovery
- 🔒 **Origin Isolation** - Sessions are isolated by HTTP Origin header to group topics by domain
- 🏷️ **Peer Identification** - Info field prevents duplicate connections to same peer
- 🔌 **Pluggable Storage** - Storage interface supports SQLite and in-memory adapters
- 🐳 **Docker Ready** - Minimal Alpine-based Docker image
- ⏱️ **Session Timeout** - Configurable session expiration from initiation time
- 🔐 **Type Safe** - Written in TypeScript with full type definitions
A simple HTTP server for WebRTC peer signaling and discovery.
## Quick Start
**Three ways to connect:** by topic, by peer ID, or by connection ID.
### Using Node.js
### Quick Start
**Node.js:**
```bash
# Install dependencies
npm install
# Run in development mode
npm run dev
# Build and run in production
npm run build
npm start
npm install && npm start
```
### Using Docker
**Docker:**
```bash
# Build the image
docker build -t rondevu .
# Run with default settings (SQLite database)
docker run -p 3000:3000 rondevu
# Run with in-memory storage
docker run -p 3000:3000 -e STORAGE_TYPE=memory rondevu
# Run with custom timeout (10 minutes)
docker run -p 3000:3000 -e SESSION_TIMEOUT=600000 rondevu
docker build -t rondevu . && docker run -p 3000:3000 rondevu
```
### Using Cloudflare Workers
**Cloudflare Workers:**
```bash
# Install Wrangler CLI
npm install -g wrangler
# Login to Cloudflare
wrangler login
# Create KV namespace
wrangler kv:namespace create SESSIONS
# Update wrangler.toml with the KV namespace ID
# Deploy to Cloudflare's edge network
npx wrangler deploy
```
See [DEPLOYMENT.md](./DEPLOYMENT.md#cloudflare-workers) for detailed instructions.
See [DEPLOYMENT.md](./DEPLOYMENT.md) for details.
## Configuration
### API
Configuration is done through environment variables:
```bash
# Create offer
POST /:topic/offer {"peerId":"alice","offer":"..."}
| Variable | Description | Default |
|--------------------|--------------------------------------------------|-------------|
| `PORT` | Server port | `3000` |
| `STORAGE_TYPE` | Storage backend: `sqlite` or `memory` | `sqlite` |
| `STORAGE_PATH` | Path to SQLite database file | `./data.db` |
| `SESSION_TIMEOUT` | Session timeout in milliseconds | `300000` |
| `CORS_ORIGINS` | Comma-separated list of allowed origins | `*` |
# List sessions
GET /:topic/sessions
### Example .env file
# Send answer
POST /answer {"code":"...","answer":"..."}
# Poll for updates
POST /poll {"code":"...","side":"offerer|answerer"}
```
See [API.md](./API.md) for details.
### Configuration
```env
PORT=3000
STORAGE_TYPE=sqlite
STORAGE_PATH=./sessions.db
SESSION_TIMEOUT=300000
CORS_ORIGINS=https://example.com,https://app.example.com
CORS_ORIGINS=*
```
## API Documentation
### Storage
See [API.md](./API.md) for complete API documentation.
Supports SQLite (Node.js/Docker) or D1 (Cloudflare Workers).
### Quick Overview
**List all active topics (with pagination):**
```bash
curl -X GET http://localhost:3000/ \
-H "Origin: https://example.com"
# Returns: {"topics":[{"topic":"my-room","count":3}],"pagination":{...}}
```
**Create an offer (announce yourself as available):**
```bash
curl -X POST http://localhost:3000/my-room/offer \
-H "Content-Type: application/json" \
-H "Origin: https://example.com" \
-d '{"info":"peer-123","offer":"<SIGNALING_DATA>"}'
# Returns: {"code":"550e8400-e29b-41d4-a716-446655440000"}
```
**List available peers in a topic:**
```bash
curl -X GET http://localhost:3000/my-room/sessions \
-H "Origin: https://example.com"
# Returns: {"sessions":[...]}
```
**Connect to a peer:**
```bash
curl -X POST http://localhost:3000/answer \
-H "Content-Type: application/json" \
-H "Origin: https://example.com" \
-d '{"code":"550e8400-...","answer":"<SIGNALING_DATA>","side":"answerer"}'
# Returns: {"success":true}
```
## Architecture
### Storage Interface
The storage layer is abstracted through a simple interface, making it easy to implement custom storage backends:
```typescript
interface Storage {
createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise<string>;
listSessionsByTopic(origin: string, topic: string): Promise<Session[]>;
getSession(code: string, origin: string): Promise<Session | null>;
updateSession(code: string, origin: string, update: Partial<Session>): Promise<void>;
deleteSession(code: string): Promise<void>;
cleanup(): Promise<void>;
close(): Promise<void>;
}
```
### Built-in Storage Adapters
**SQLite Storage** (`sqlite.ts`)
- For Node.js/Docker deployments
- Persistent file-based or in-memory
- Automatic session cleanup
- Simple and reliable
**Cloudflare KV Storage** (`kv.ts`)
- For Cloudflare Workers deployments
- Global edge storage
- Automatic TTL-based expiration
- Distributed and highly available
### Custom Storage Adapters
You can implement your own storage adapter by implementing the `Storage` interface:
```typescript
import { Storage, Session } from './storage/types';
export class CustomStorage implements Storage {
async createSession(offer: string, expiresAt: number): Promise<string> {
// Your implementation
}
// ... implement other methods
}
```
## Development
### Project Structure
```
rondevu/
├── src/
│ ├── index.ts # Node.js server entry point
│ ├── app.ts # Hono application
│ ├── config.ts # Configuration
│ └── storage/
│ ├── types.ts # Storage interface
│ ├── sqlite.ts # SQLite adapter
│ └── codeGenerator.ts # Code generation utility
├── Dockerfile # Docker build configuration
├── build.js # Build script
├── API.md # API documentation
└── README.md # This file
```
### Building
```bash
# Build TypeScript
npm run build
# Run built version
npm start
```
### Docker Build
```bash
# Build the image
docker build -t rondevu .
# Run with volume for persistent storage
docker run -p 3000:3000 -v $(pwd)/data:/app/data rondevu
```
## How It Works
1. **Discover topics** (optional): Call `GET /` to see all active topics and peer counts
2. **Peer A** announces availability by posting to `/:topic/offer` with peer identifier and signaling data
3. Server generates a unique UUID code and stores the session (bucketed by Origin and topic)
4. **Peer B** discovers available peers using `GET /:topic/sessions`
5. **Peer B** filters out their own session using the info field to avoid self-connection
6. **Peer B** selects a peer and posts their connection data to `POST /answer` with the session code
7. Both peers exchange signaling data through `POST /answer` endpoint
8. Both peers poll for updates using `POST /poll` to retrieve connection information
9. Sessions automatically expire after the configured timeout
This allows peers in distributed systems to discover each other without requiring a centralized registry, while maintaining isolation between different applications through Origin headers.
### Origin Isolation
Sessions are isolated by the HTTP `Origin` header, ensuring that:
- Peers can only see sessions from their own origin
- Session codes cannot be accessed cross-origin
- Topics are organized by application domain
## License
### License
MIT
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.

View File

@@ -0,0 +1,21 @@
-- Drop old sessions table with 'info' column
DROP TABLE IF EXISTS sessions;
-- Create sessions table with peer_id column
CREATE TABLE sessions (
code TEXT PRIMARY KEY,
origin TEXT NOT NULL,
topic TEXT NOT NULL,
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
answer_candidates TEXT NOT NULL DEFAULT '[]',
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
-- Create indexes for efficient queries
CREATE INDEX idx_expires_at ON sessions(expires_at);
CREATE INDEX idx_origin_topic ON sessions(origin, topic);
CREATE INDEX idx_origin_topic_expires ON sessions(origin, topic, expires_at);

18
migrations/schema.sql Normal file
View File

@@ -0,0 +1,18 @@
-- Create sessions table
CREATE TABLE IF NOT EXISTS sessions (
code TEXT PRIMARY KEY,
origin TEXT NOT NULL,
topic TEXT NOT NULL,
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
answer_candidates TEXT NOT NULL DEFAULT '[]',
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
-- Create indexes for efficient queries
CREATE INDEX IF NOT EXISTS idx_expires_at ON sessions(expires_at);
CREATE INDEX IF NOT EXISTS idx_origin_topic ON sessions(origin, topic);
CREATE INDEX IF NOT EXISTS idx_origin_topic_expires ON sessions(origin, topic, expires_at);

View File

@@ -8,7 +8,8 @@
"typecheck": "tsc",
"dev": "ts-node src/index.ts",
"start": "node dist/index.js",
"test": "echo \"Error: no test specified\" && exit 1"
"test": "echo \"Error: no test specified\" && exit 1",
"deploy": "npm run build && npx wrangler deploy"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20251014.0",

View File

@@ -67,8 +67,8 @@ export function createApp(storage: Storage, config: AppConfig) {
return c.json({ error: 'Missing required parameter: topic' }, 400);
}
if (topic.length > 256) {
return c.json({ error: 'Topic string must be 256 characters or less' }, 400);
if (topic.length > 1024) {
return c.json({ error: 'Topic string must be 1024 characters or less' }, 400);
}
const sessions = await storage.listSessionsByTopic(origin, topic);
@@ -76,7 +76,7 @@ export function createApp(storage: Storage, config: AppConfig) {
return c.json({
sessions: sessions.map(s => ({
code: s.code,
info: s.info,
peerId: s.peerId,
offer: s.offer,
offerCandidates: s.offerCandidates,
createdAt: s.createdAt,
@@ -92,29 +92,29 @@ export function createApp(storage: Storage, config: AppConfig) {
/**
* POST /:topic/offer
* Creates a new offer and returns a unique session code
* Body: { info: string, offer: string }
* Body: { peerId: string, offer: string }
*/
app.post('/:topic/offer', async (c) => {
try {
const origin = c.req.header('Origin') || c.req.header('origin') || 'unknown';
const topic = c.req.param('topic');
const body = await c.req.json();
const { info, offer } = body;
const { peerId, offer, code: customCode } = body;
if (!topic || typeof topic !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: topic' }, 400);
}
if (topic.length > 256) {
return c.json({ error: 'Topic string must be 256 characters or less' }, 400);
if (topic.length > 1024) {
return c.json({ error: 'Topic string must be 1024 characters or less' }, 400);
}
if (!info || typeof info !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: info' }, 400);
if (!peerId || typeof peerId !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: peerId' }, 400);
}
if (info.length > 1024) {
return c.json({ error: 'Info string must be 1024 characters or less' }, 400);
if (peerId.length > 1024) {
return c.json({ error: 'PeerId string must be 1024 characters or less' }, 400);
}
if (!offer || typeof offer !== 'string') {
@@ -122,7 +122,7 @@ export function createApp(storage: Storage, config: AppConfig) {
}
const expiresAt = Date.now() + config.sessionTimeout;
const code = await storage.createSession(origin, topic, info, offer, expiresAt);
const code = await storage.createSession(origin, topic, peerId, offer, expiresAt, customCode);
return c.json({ code }, 200);
} catch (err) {

286
src/storage/d1.ts Normal file
View File

@@ -0,0 +1,286 @@
import { Storage, Session } from './types.ts';
// Generate a UUID v4
function generateUUID(): string {
return crypto.randomUUID();
}
/**
* D1 storage adapter for session management using Cloudflare D1
*/
export class D1Storage implements Storage {
private db: D1Database;
/**
* Creates a new D1 storage instance
* @param db D1Database instance from Cloudflare Workers environment
*/
constructor(db: D1Database) {
this.db = db;
}
/**
* Initializes database schema
* This should be run once during setup, not on every request
*/
async initializeDatabase(): Promise<void> {
await this.db.exec(`
CREATE TABLE IF NOT EXISTS sessions (
code TEXT PRIMARY KEY,
origin TEXT NOT NULL,
topic TEXT NOT NULL,
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
answer_candidates TEXT NOT NULL DEFAULT '[]',
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_expires_at ON sessions(expires_at);
CREATE INDEX IF NOT EXISTS idx_origin_topic ON sessions(origin, topic);
CREATE INDEX IF NOT EXISTS idx_origin_topic_expires ON sessions(origin, topic, expires_at);
`);
}
async listTopics(origin: string, page: number = 1, limit: number = 100): Promise<{
topics: Array<{ topic: string; count: number }>;
pagination: {
page: number;
limit: number;
total: number;
hasMore: boolean;
};
}> {
// Clamp limit to maximum of 1000
const effectiveLimit = Math.min(limit, 1000);
const offset = (page - 1) * effectiveLimit;
try {
// Get total count of topics for this origin
const countResult = await this.db.prepare(`
SELECT COUNT(DISTINCT topic) as total
FROM sessions
WHERE origin = ? AND expires_at > ? AND answer IS NULL
`).bind(origin, Date.now()).first();
const total = countResult ? Number(countResult.total) : 0;
// Get paginated topics
const result = await this.db.prepare(`
SELECT topic, COUNT(*) as count
FROM sessions
WHERE origin = ? AND expires_at > ? AND answer IS NULL
GROUP BY topic
ORDER BY topic ASC
LIMIT ? OFFSET ?
`).bind(origin, Date.now(), effectiveLimit, offset).all();
// D1 returns results in the results array, or empty array if no results
if (!result.results) {
console.error('[D1] listTopics: No results property in response:', result);
return {
topics: [],
pagination: {
page,
limit: effectiveLimit,
total: 0,
hasMore: false,
},
};
}
const topics = result.results.map((row: any) => ({
topic: row.topic,
count: Number(row.count),
}));
return {
topics,
pagination: {
page,
limit: effectiveLimit,
total,
hasMore: offset + topics.length < total,
},
};
} catch (error) {
console.error('[D1] listTopics error:', error);
throw error;
}
}
async listSessionsByTopic(origin: string, topic: string): Promise<Session[]> {
try {
const result = await this.db.prepare(`
SELECT * FROM sessions
WHERE origin = ? AND topic = ? AND expires_at > ? AND answer IS NULL
ORDER BY created_at DESC
`).bind(origin, topic, Date.now()).all();
if (!result.results) {
console.error('[D1] listSessionsByTopic: No results property in response:', result);
return [];
}
return result.results.map((row: any) => ({
code: row.code,
origin: row.origin,
topic: row.topic,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates),
answerCandidates: JSON.parse(row.answer_candidates),
createdAt: row.created_at,
expiresAt: row.expires_at,
}));
} catch (error) {
console.error('[D1] listSessionsByTopic error:', error);
throw error;
}
}
async createSession(
origin: string,
topic: string,
peerId: string,
offer: string,
expiresAt: number,
customCode?: string
): Promise<string> {
let code: string;
let attempts = 0;
const maxAttempts = 10;
// Generate unique code or use custom
do {
code = customCode || generateUUID();
attempts++;
if (attempts > maxAttempts) {
throw new Error('Failed to generate unique session code');
}
try {
await this.db.prepare(`
INSERT INTO sessions (code, origin, topic, peer_id, offer, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`).bind(code, origin, topic, peerId, offer, Date.now(), expiresAt).run();
break;
} catch (err: any) {
// If unique constraint failed with custom code, throw error
if (err.message?.includes('UNIQUE constraint failed')) {
if (customCode) {
throw new Error(`Session code '${customCode}' already exists`);
}
// Try again with new generated code
continue;
}
throw err;
}
} while (true);
return code;
}
async getSession(code: string, origin: string): Promise<Session | null> {
try {
const result = await this.db.prepare(`
SELECT * FROM sessions
WHERE code = ? AND origin = ? AND expires_at > ?
`).bind(code, origin, Date.now()).first();
if (!result) {
return null;
}
const row: any = result;
return {
code: row.code,
origin: row.origin,
topic: row.topic,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates || '[]'),
answerCandidates: JSON.parse(row.answer_candidates || '[]'),
createdAt: row.created_at,
expiresAt: row.expires_at,
};
} catch (error) {
console.error('[D1] getSession error:', error);
throw error;
}
}
async updateSession(code: string, origin: string, update: Partial<Session>): Promise<void> {
// Verify session exists and origin matches
const current = await this.getSession(code, origin);
if (!current) {
throw new Error('Session not found or origin mismatch');
}
// Build update query dynamically based on what fields are being updated
const updates: string[] = [];
const values: any[] = [];
if (update.answer !== undefined) {
updates.push('answer = ?');
values.push(update.answer);
}
if (update.offerCandidates !== undefined) {
updates.push('offer_candidates = ?');
values.push(JSON.stringify(update.offerCandidates));
}
if (update.answerCandidates !== undefined) {
updates.push('answer_candidates = ?');
values.push(JSON.stringify(update.answerCandidates));
}
if (updates.length === 0) {
return; // Nothing to update
}
// Add WHERE clause values
values.push(code, origin);
// D1 provides strong consistency, so this update is atomic and immediately visible
const query = `
UPDATE sessions
SET ${updates.join(', ')}
WHERE code = ? AND origin = ?
`;
await this.db.prepare(query).bind(...values).run();
}
async deleteSession(code: string): Promise<void> {
await this.db.prepare(`
DELETE FROM sessions WHERE code = ?
`).bind(code).run();
}
async cleanupExpiredSessions(): Promise<number> {
const result = await this.db.prepare(`
DELETE FROM sessions WHERE expires_at <= ?
`).bind(Date.now()).run();
return result.meta.changes || 0;
}
async cleanup(): Promise<void> {
await this.cleanupExpiredSessions();
}
async close(): Promise<void> {
// D1 doesn't require explicit connection closing
// Connections are managed by the Cloudflare Workers runtime
}
}

View File

@@ -1,241 +0,0 @@
import { Storage, Session } from './types.ts';
/**
* Cloudflare KV storage adapter for session management
*/
export class KVStorage implements Storage {
private kv: KVNamespace;
/**
* Creates a new KV storage instance
* @param kv Cloudflare KV namespace binding
*/
constructor(kv: KVNamespace) {
this.kv = kv;
}
/**
* Generates a unique code using Web Crypto API
*/
private generateCode(): string {
return crypto.randomUUID();
}
/**
* Gets the key for storing a session
*/
private sessionKey(code: string): string {
return `session:${code}`;
}
/**
* Gets the key for the topic index
*/
private topicIndexKey(origin: string, topic: string): string {
return `index:${origin}:${topic}`;
}
async createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise<string> {
// Validate info length
if (info.length > 1024) {
throw new Error('Info string must be 1024 characters or less');
}
const code = this.generateCode();
const createdAt = Date.now();
const session: Session = {
code,
origin,
topic,
info,
offer,
answer: undefined,
offerCandidates: [],
answerCandidates: [],
createdAt,
expiresAt,
};
// Calculate TTL in seconds for KV
const ttl = Math.max(60, Math.floor((expiresAt - createdAt) / 1000));
// Store the session
await this.kv.put(
this.sessionKey(code),
JSON.stringify(session),
{ expirationTtl: ttl }
);
// Update the topic index
const indexKey = this.topicIndexKey(origin, topic);
const existingIndex = await this.kv.get(indexKey, 'json') as string[] | null;
const updatedIndex = existingIndex ? [...existingIndex, code] : [code];
// Set index TTL to slightly longer than session TTL to avoid race conditions
await this.kv.put(
indexKey,
JSON.stringify(updatedIndex),
{ expirationTtl: ttl + 300 }
);
return code;
}
async listSessionsByTopic(origin: string, topic: string): Promise<Session[]> {
const indexKey = this.topicIndexKey(origin, topic);
const codes = await this.kv.get(indexKey, 'json') as string[] | null;
if (!codes || codes.length === 0) {
return [];
}
// Fetch all sessions in parallel
const sessionPromises = codes.map(async (code) => {
const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null;
return sessionData;
});
const sessions = await Promise.all(sessionPromises);
// Filter out expired or answered sessions, and null values
const now = Date.now();
const validSessions = sessions.filter(
(session): session is Session =>
session !== null &&
session.expiresAt > now &&
session.answer === undefined
);
// Sort by creation time (newest first)
return validSessions.sort((a, b) => b.createdAt - a.createdAt);
}
async listTopics(origin: string, page: number, limit: number): Promise<{
topics: Array<{ topic: string; count: number }>;
pagination: {
page: number;
limit: number;
total: number;
hasMore: boolean;
};
}> {
// Ensure limit doesn't exceed 1000
const safeLimit = Math.min(Math.max(1, limit), 1000);
const safePage = Math.max(1, page);
const prefix = `index:${origin}:`;
const topicCounts = new Map<string, number>();
// List all index keys for this origin
const list = await this.kv.list({ prefix });
// Process each topic index
for (const key of list.keys) {
// Extract topic from key: "index:{origin}:{topic}"
const topic = key.name.substring(prefix.length);
// Get the session codes for this topic
const codes = await this.kv.get(key.name, 'json') as string[] | null;
if (!codes || codes.length === 0) {
continue;
}
// Fetch sessions to count only valid ones (unexpired and unanswered)
const sessionPromises = codes.map(async (code) => {
const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null;
return sessionData;
});
const sessions = await Promise.all(sessionPromises);
// Count valid sessions
const now = Date.now();
const validCount = sessions.filter(
(session) =>
session !== null &&
session.expiresAt > now &&
session.answer === undefined
).length;
if (validCount > 0) {
topicCounts.set(topic, validCount);
}
}
// Convert to array and sort by topic name
const allTopics = Array.from(topicCounts.entries())
.map(([topic, count]) => ({ topic, count }))
.sort((a, b) => a.topic.localeCompare(b.topic));
// Apply pagination
const total = allTopics.length;
const offset = (safePage - 1) * safeLimit;
const topics = allTopics.slice(offset, offset + safeLimit);
return {
topics,
pagination: {
page: safePage,
limit: safeLimit,
total,
hasMore: offset + topics.length < total,
},
};
}
async getSession(code: string, origin: string): Promise<Session | null> {
const sessionData = await this.kv.get(this.sessionKey(code), 'json') as Session | null;
if (!sessionData) {
return null;
}
// Validate origin and expiration
if (sessionData.origin !== origin || sessionData.expiresAt <= Date.now()) {
return null;
}
return sessionData;
}
async updateSession(code: string, origin: string, update: Partial<Session>): Promise<void> {
const current = await this.getSession(code, origin);
if (!current) {
throw new Error('Session not found or origin mismatch');
}
// Merge updates
const updated: Session = {
...current,
...(update.answer !== undefined && { answer: update.answer }),
...(update.offerCandidates !== undefined && { offerCandidates: update.offerCandidates }),
...(update.answerCandidates !== undefined && { answerCandidates: update.answerCandidates }),
};
// Calculate remaining TTL
const ttl = Math.max(60, Math.floor((updated.expiresAt - Date.now()) / 1000));
// Update the session
await this.kv.put(
this.sessionKey(code),
JSON.stringify(updated),
{ expirationTtl: ttl }
);
}
async deleteSession(code: string): Promise<void> {
await this.kv.delete(this.sessionKey(code));
}
async cleanup(): Promise<void> {
// KV automatically expires keys based on TTL
// No manual cleanup needed
}
async close(): Promise<void> {
// No connection to close for KV
}
}

View File

@@ -28,7 +28,7 @@ export class SQLiteStorage implements Storage {
code TEXT PRIMARY KEY,
origin TEXT NOT NULL,
topic TEXT NOT NULL,
info TEXT NOT NULL CHECK(length(info) <= 1024),
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
@@ -62,19 +62,19 @@ export class SQLiteStorage implements Storage {
return randomUUID();
}
async createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise<string> {
// Validate info length
if (info.length > 1024) {
throw new Error('Info string must be 1024 characters or less');
async createSession(origin: string, topic: string, peerId: string, offer: string, expiresAt: number, customCode?: string): Promise<string> {
// Validate peerId length
if (peerId.length > 1024) {
throw new Error('PeerId string must be 1024 characters or less');
}
let code: string;
let attempts = 0;
const maxAttempts = 10;
// Try to generate a unique code
// Try to generate or use custom code
do {
code = this.generateCode();
code = customCode || this.generateCode();
attempts++;
if (attempts > maxAttempts) {
@@ -83,15 +83,19 @@ export class SQLiteStorage implements Storage {
try {
const stmt = this.db.prepare(`
INSERT INTO sessions (code, origin, topic, info, offer, created_at, expires_at)
INSERT INTO sessions (code, origin, topic, peer_id, offer, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
stmt.run(code, origin, topic, info, offer, Date.now(), expiresAt);
stmt.run(code, origin, topic, peerId, offer, Date.now(), expiresAt);
break;
} catch (err: any) {
// If unique constraint failed, try again
// If unique constraint failed with custom code, throw error
if (err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY') {
if (customCode) {
throw new Error(`Session code '${customCode}' already exists`);
}
// Try again with new generated code
continue;
}
throw err;
@@ -114,7 +118,7 @@ export class SQLiteStorage implements Storage {
code: row.code,
origin: row.origin,
topic: row.topic,
info: row.info,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates),
@@ -189,7 +193,7 @@ export class SQLiteStorage implements Storage {
code: row.code,
origin: row.origin,
topic: row.topic,
info: row.info,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates),

View File

@@ -5,7 +5,7 @@ export interface Session {
code: string;
origin: string;
topic: string;
info: string;
peerId: string;
offer: string;
answer?: string;
offerCandidates: string[];
@@ -23,12 +23,13 @@ export interface Storage {
* Creates a new session with the given offer
* @param origin The Origin header from the request
* @param topic The topic to post the offer to
* @param info User info string (max 1024 chars)
* @param peerId Peer identifier string (max 1024 chars)
* @param offer The WebRTC SDP offer message
* @param expiresAt Unix timestamp when the session should expire
* @param customCode Optional custom code (if not provided, generates UUID)
* @returns The unique session code
*/
createSession(origin: string, topic: string, info: string, offer: string, expiresAt: number): Promise<string>;
createSession(origin: string, topic: string, peerId: string, offer: string, expiresAt: number, customCode?: string): Promise<string>;
/**
* Lists all unanswered sessions for a given origin and topic

View File

@@ -1,11 +1,11 @@
import { createApp } from './app.ts';
import { KVStorage } from './storage/kv.ts';
import { D1Storage } from './storage/d1.ts';
/**
* Cloudflare Workers environment bindings
*/
export interface Env {
SESSIONS: KVNamespace;
DB: D1Database;
SESSION_TIMEOUT?: string;
CORS_ORIGINS?: string;
}
@@ -15,8 +15,8 @@ export interface Env {
*/
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
// Initialize KV storage
const storage = new KVStorage(env.SESSIONS);
// Initialize D1 storage
const storage = new D1Storage(env.DB);
// Parse configuration
const sessionTimeout = env.SESSION_TIMEOUT