Add topics endpoint and refactor to topic-based discovery

- Add GET /topics endpoint with pagination and peer counts
- Refactor offers to support multiple topics per offer
- Add stateless authentication with AES-256-GCM
- Add bloom filter support for peer exclusion
- Update database schema for topic-based discovery
- Add comprehensive API documentation to README

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-14 18:31:17 +01:00
parent 377d12b820
commit 685692dee1
13 changed files with 1660 additions and 440 deletions

166
README.md
View File

@@ -1,20 +1,25 @@
# Rondevu
# Rondevu Server
🎯 **Simple WebRTC peer signaling**
🌐 **Topic-based peer discovery and WebRTC signaling**
Direct peer-to-peer connections via offer/answer exchange.
Scalable peer-to-peer connection establishment with topic-based discovery, stateless authentication, and complete WebRTC signaling.
**Related repositories:**
- [rondevu-client](https://github.com/xtr-dev/rondevu-client) - TypeScript client library
- [rondevu-demo](https://github.com/xtr-dev/rondevu-demo) - Interactive demo
- [@xtr-dev/rondevu-client](https://www.npmjs.com/package/@xtr-dev/rondevu-client) - TypeScript client library
- [rondevu-demo](https://rondevu-demo.pages.dev) - Interactive demo
---
## Rondevu Server
## Features
HTTP signaling server for WebRTC peer connection establishment. Supports SQLite (Node.js/Docker) and Cloudflare D1 (Workers) storage backends.
- **Topic-Based Discovery**: Tag offers with topics (e.g., torrent infohashes) for efficient peer finding
- **Stateless Authentication**: AES-256-GCM encrypted credentials, no server-side sessions
- **Bloom Filters**: Client-side peer exclusion for efficient discovery
- **Multi-Offer Support**: Create multiple offers per peer simultaneously
- **Complete WebRTC Signaling**: Offer/answer exchange and ICE candidate relay
- **Dual Storage**: SQLite (Node.js/Docker) and Cloudflare D1 (Workers) backends
### Quick Start
## Quick Start
**Node.js:**
```bash
@@ -31,37 +36,148 @@ docker build -t rondevu . && docker run -p 3000:3000 -e STORAGE_PATH=:memory: ro
npx wrangler deploy
```
### API
## API Endpoints
```bash
# Create offer
POST /offer {"peerId":"alice","offer":"...","code":"my-room"}
### Public Endpoints
# Send answer/candidates
POST /answer {"code":"my-room","answer":"...","side":"answerer"}
#### `GET /`
Returns server version and info
# Poll for updates
POST /poll {"code":"my-room","side":"offerer"}
#### `GET /health`
Health check endpoint with version
# Health check with version
GET /health
#### `POST /register`
Register a new peer and receive credentials (peerId + secret)
# Version info
GET /
**Response:**
```json
{
"peerId": "f17c195f067255e357232e34cf0735d9",
"secret": "DdorTR8QgSn9yngn+4qqR8cs1aMijvX..."
}
```
### Configuration
#### `GET /topics?limit=50&offset=0`
List all topics with active peer counts (paginated)
**Query Parameters:**
- `limit` (optional): Maximum number of topics to return (default: 50, max: 200)
- `offset` (optional): Number of topics to skip (default: 0)
**Response:**
```json
{
"topics": [
{"topic": "movie-xyz", "activePeers": 42},
{"topic": "torrent-abc", "activePeers": 15}
],
"total": 123,
"limit": 50,
"offset": 0
}
```
#### `GET /offers/by-topic/:topic?limit=50&bloom=...`
Find offers by topic with optional bloom filter exclusion
**Query Parameters:**
- `limit` (optional): Maximum offers to return (default: 50, max: 200)
- `bloom` (optional): Base64-encoded bloom filter to exclude known peers
**Response:**
```json
{
"topic": "movie-xyz",
"offers": [
{
"id": "offer-id",
"peerId": "peer-id",
"sdp": "v=0...",
"topics": ["movie-xyz", "hd-content"],
"expiresAt": 1234567890,
"lastSeen": 1234567890
}
],
"total": 42,
"returned": 10
}
```
#### `GET /peers/:peerId/offers`
View all offers from a specific peer
### Authenticated Endpoints
All authenticated endpoints require `Authorization: Bearer {peerId}:{secret}` header.
#### `POST /offers`
Create one or more offers
**Request:**
```json
{
"offers": [
{
"sdp": "v=0...",
"topics": ["movie-xyz", "hd-content"],
"ttl": 300000
}
]
}
```
#### `GET /offers/mine`
List all offers owned by authenticated peer
#### `PUT /offers/:offerId/heartbeat`
Update last_seen timestamp for an offer
#### `DELETE /offers/:offerId`
Delete a specific offer
#### `POST /offers/:offerId/answer`
Answer an offer (locks it to answerer)
**Request:**
```json
{
"sdp": "v=0..."
}
```
#### `GET /offers/answers`
Poll for answers to your offers
#### `POST /offers/:offerId/ice-candidates`
Post ICE candidates for an offer
**Request:**
```json
{
"candidates": ["candidate:1 1 UDP..."]
}
```
#### `GET /offers/:offerId/ice-candidates?since=1234567890`
Get ICE candidates from the other peer
## Configuration
Environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `PORT` | `3000` | Server port (Node.js/Docker) |
| `OFFER_TIMEOUT` | `60000` | Offer timeout in milliseconds (1 minute) |
| `CORS_ORIGINS` | `*` | Comma-separated allowed origins |
| `STORAGE_PATH` | `./offers.db` | SQLite database path (use `:memory:` for in-memory) |
| `VERSION` | `0.0.1` | Server version (semver) |
| `STORAGE_PATH` | `./rondevu.db` | SQLite database path (use `:memory:` for in-memory) |
| `VERSION` | `0.4.0` | Server version (semver) |
| `AUTH_SECRET` | Random 32-byte hex | Secret key for credential encryption |
| `OFFER_DEFAULT_TTL` | `300000` | Default offer TTL in ms (5 minutes) |
| `OFFER_MIN_TTL` | `60000` | Minimum offer TTL in ms (1 minute) |
| `OFFER_MAX_TTL` | `3600000` | Maximum offer TTL in ms (1 hour) |
| `MAX_OFFERS_PER_REQUEST` | `10` | Maximum offers per create request |
| `MAX_TOPICS_PER_OFFER` | `20` | Maximum topics per offer |
### License
## License
MIT

View File

@@ -1,7 +1,7 @@
{
"name": "@xtr-dev/rondevu-server",
"version": "0.0.1",
"description": "Open signaling and tracking server for peer discovery in distributed P2P applications",
"version": "0.1.0",
"description": "Topic-based peer discovery and signaling server for distributed P2P applications",
"main": "dist/index.js",
"scripts": {
"build": "node build.js",

View File

@@ -1,20 +1,21 @@
import { Hono } from 'hono';
import { cors } from 'hono/cors';
import { Storage } from './storage/types.ts';
import { Config } from './config.ts';
import { createAuthMiddleware, getAuthenticatedPeerId } from './middleware/auth.ts';
import { generatePeerId, encryptPeerId } from './crypto.ts';
import { parseBloomFilter } from './bloom.ts';
import type { Context } from 'hono';
export interface AppConfig {
offerTimeout: number;
corsOrigins: string[];
version?: string;
}
/**
* Creates the Hono application with WebRTC signaling endpoints
* Creates the Hono application with topic-based WebRTC signaling endpoints
*/
export function createApp(storage: Storage, config: AppConfig) {
export function createApp(storage: Storage, config: Config) {
const app = new Hono();
// Create auth middleware
const authMiddleware = createAuthMiddleware(config.authSecret);
// Enable CORS with dynamic origin handling
app.use('/*', cors({
origin: (origin) => {
@@ -29,8 +30,8 @@ export function createApp(storage: Storage, config: AppConfig) {
// Default to first allowed origin
return config.corsOrigins[0];
},
allowMethods: ['GET', 'POST', 'OPTIONS'],
allowHeaders: ['Content-Type', 'Origin'],
allowMethods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowHeaders: ['Content-Type', 'Origin', 'Authorization'],
exposeHeaders: ['Content-Type'],
maxAge: 600,
credentials: true,
@@ -42,7 +43,9 @@ export function createApp(storage: Storage, config: AppConfig) {
*/
app.get('/', (c) => {
return c.json({
version: config.version || 'unknown'
version: config.version,
name: 'Rondevu',
description: 'Topic-based peer discovery and signaling server'
});
});
@@ -54,161 +57,467 @@ export function createApp(storage: Storage, config: AppConfig) {
return c.json({
status: 'ok',
timestamp: Date.now(),
version: config.version || 'unknown'
version: config.version
});
});
/**
* POST /offer
* Creates a new offer and returns a unique code
* Body: { peerId: string, offer: string, code?: string }
* POST /register
* Register a new peer and receive credentials
*/
app.post('/offer', async (c) => {
app.post('/register', async (c) => {
try {
const body = await c.req.json();
const { peerId, offer, code: customCode } = body;
// Generate new peer ID
const peerId = generatePeerId();
if (!peerId || typeof peerId !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: peerId' }, 400);
}
// Encrypt peer ID with server secret (async operation)
const secret = await encryptPeerId(peerId, config.authSecret);
if (peerId.length > 1024) {
return c.json({ error: 'PeerId string must be 1024 characters or less' }, 400);
}
if (!offer || typeof offer !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: offer' }, 400);
}
const expiresAt = Date.now() + config.offerTimeout;
const code = await storage.createOffer(peerId, offer, expiresAt, customCode);
return c.json({ code }, 200);
return c.json({
peerId,
secret
}, 200);
} catch (err) {
console.error('Error creating offer:', err);
// Check if it's a code clash error
if (err instanceof Error && err.message.includes('already exists')) {
return c.json({ error: err.message }, 409);
}
console.error('Error registering peer:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* POST /answer
* Responds to an existing offer or sends ICE candidates
* Body: { code: string, answer?: string, candidate?: string, side: 'offerer' | 'answerer' }
* POST /offers
* Creates one or more offers with topics
* Requires authentication
*/
app.post('/answer', async (c) => {
app.post('/offers', authMiddleware, async (c) => {
try {
const body = await c.req.json();
const { code, answer, candidate, side } = body;
const { offers } = body;
if (!code || typeof code !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: code' }, 400);
if (!Array.isArray(offers) || offers.length === 0) {
return c.json({ error: 'Missing or invalid required parameter: offers (must be non-empty array)' }, 400);
}
if (!side || (side !== 'offerer' && side !== 'answerer')) {
return c.json({ error: 'Invalid or missing parameter: side (must be "offerer" or "answerer")' }, 400);
if (offers.length > config.maxOffersPerRequest) {
return c.json({ error: `Too many offers. Maximum ${config.maxOffersPerRequest} per request` }, 400);
}
if (!answer && !candidate) {
return c.json({ error: 'Missing required parameter: answer or candidate' }, 400);
const peerId = getAuthenticatedPeerId(c);
// Validate and prepare offers
const offerRequests = [];
for (const offer of offers) {
// Validate SDP
if (!offer.sdp || typeof offer.sdp !== 'string') {
return c.json({ error: 'Each offer must have an sdp field' }, 400);
}
if (answer && candidate) {
return c.json({ error: 'Cannot provide both answer and candidate' }, 400);
if (offer.sdp.length > 65536) {
return c.json({ error: 'SDP must be 64KB or less' }, 400);
}
const offer = await storage.getOffer(code);
// Validate topics
if (!Array.isArray(offer.topics) || offer.topics.length === 0) {
return c.json({ error: 'Each offer must have a non-empty topics array' }, 400);
}
if (offer.topics.length > config.maxTopicsPerOffer) {
return c.json({ error: `Too many topics. Maximum ${config.maxTopicsPerOffer} per offer` }, 400);
}
for (const topic of offer.topics) {
if (typeof topic !== 'string' || topic.length === 0 || topic.length > 256) {
return c.json({ error: 'Each topic must be a string between 1 and 256 characters' }, 400);
}
}
// Validate and clamp TTL
let ttl = offer.ttl || config.offerDefaultTtl;
if (ttl < config.offerMinTtl) {
ttl = config.offerMinTtl;
}
if (ttl > config.offerMaxTtl) {
ttl = config.offerMaxTtl;
}
offerRequests.push({
id: offer.id,
peerId,
sdp: offer.sdp,
topics: offer.topics,
expiresAt: Date.now() + ttl,
});
}
// Create offers
const createdOffers = await storage.createOffers(offerRequests);
// Return simplified response
return c.json({
offers: createdOffers.map(o => ({
id: o.id,
peerId: o.peerId,
topics: o.topics,
expiresAt: o.expiresAt
}))
}, 200);
} catch (err) {
console.error('Error creating offers:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* GET /offers/by-topic/:topic
* Find offers by topic with optional bloom filter exclusion
* Public endpoint (no auth required)
*/
app.get('/offers/by-topic/:topic', async (c) => {
try {
const topic = c.req.param('topic');
const bloomParam = c.req.query('bloom');
const limitParam = c.req.query('limit');
const limit = limitParam ? Math.min(parseInt(limitParam, 10), 200) : 50;
// Parse bloom filter if provided
let excludePeerIds: string[] = [];
if (bloomParam) {
const bloom = parseBloomFilter(bloomParam);
if (!bloom) {
return c.json({ error: 'Invalid bloom filter format' }, 400);
}
// Get all offers for topic first
const allOffers = await storage.getOffersByTopic(topic);
// Test each peer ID against bloom filter
const excludeSet = new Set<string>();
for (const offer of allOffers) {
if (bloom.test(offer.peerId)) {
excludeSet.add(offer.peerId);
}
}
excludePeerIds = Array.from(excludeSet);
}
// Get filtered offers
let offers = await storage.getOffersByTopic(topic, excludePeerIds.length > 0 ? excludePeerIds : undefined);
// Apply limit
const total = offers.length;
offers = offers.slice(0, limit);
return c.json({
topic,
offers: offers.map(o => ({
id: o.id,
peerId: o.peerId,
sdp: o.sdp,
topics: o.topics,
expiresAt: o.expiresAt,
lastSeen: o.lastSeen
})),
total: bloomParam ? total + excludePeerIds.length : total,
returned: offers.length
}, 200);
} catch (err) {
console.error('Error fetching offers by topic:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* GET /topics
* List all topics with active peer counts (paginated)
* Public endpoint (no auth required)
*/
app.get('/topics', async (c) => {
try {
const limitParam = c.req.query('limit');
const offsetParam = c.req.query('offset');
const limit = limitParam ? Math.min(parseInt(limitParam, 10), 200) : 50;
const offset = offsetParam ? parseInt(offsetParam, 10) : 0;
const result = await storage.getTopics(limit, offset);
return c.json({
topics: result.topics,
total: result.total,
limit,
offset
}, 200);
} catch (err) {
console.error('Error fetching topics:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* GET /peers/:peerId/offers
* View all offers from a specific peer
* Public endpoint
*/
app.get('/peers/:peerId/offers', async (c) => {
try {
const peerId = c.req.param('peerId');
const offers = await storage.getOffersByPeerId(peerId);
// Collect unique topics
const topicsSet = new Set<string>();
offers.forEach(o => o.topics.forEach(t => topicsSet.add(t)));
return c.json({
peerId,
offers: offers.map(o => ({
id: o.id,
sdp: o.sdp,
topics: o.topics,
expiresAt: o.expiresAt,
lastSeen: o.lastSeen
})),
topics: Array.from(topicsSet)
}, 200);
} catch (err) {
console.error('Error fetching peer offers:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* GET /offers/mine
* List all offers owned by authenticated peer
* Requires authentication
*/
app.get('/offers/mine', authMiddleware, async (c) => {
try {
const peerId = getAuthenticatedPeerId(c);
const offers = await storage.getOffersByPeerId(peerId);
return c.json({
peerId,
offers: offers.map(o => ({
id: o.id,
sdp: o.sdp,
topics: o.topics,
createdAt: o.createdAt,
expiresAt: o.expiresAt,
lastSeen: o.lastSeen,
answererPeerId: o.answererPeerId,
answeredAt: o.answeredAt
}))
}, 200);
} catch (err) {
console.error('Error fetching own offers:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* PUT /offers/:offerId/heartbeat
* Update last_seen timestamp for an offer
* Requires authentication and ownership
*/
app.put('/offers/:offerId/heartbeat', authMiddleware, async (c) => {
try {
const offerId = c.req.param('offerId');
const peerId = getAuthenticatedPeerId(c);
// Verify ownership
const offer = await storage.getOfferById(offerId);
if (!offer) {
return c.json({ error: 'Offer not found or expired' }, 404);
}
if (answer) {
await storage.updateOffer(code, { answer });
if (offer.peerId !== peerId) {
return c.json({ error: 'Not authorized to update this offer' }, 403);
}
if (candidate) {
if (side === 'offerer') {
const updatedCandidates = [...offer.offerCandidates, candidate];
await storage.updateOffer(code, { offerCandidates: updatedCandidates });
} else {
const updatedCandidates = [...offer.answerCandidates, candidate];
await storage.updateOffer(code, { answerCandidates: updatedCandidates });
}
}
const now = Date.now();
await storage.updateOfferLastSeen(offerId, now);
return c.json({ success: true }, 200);
return c.json({
id: offerId,
lastSeen: now
}, 200);
} catch (err) {
console.error('Error handling answer:', err);
console.error('Error updating offer heartbeat:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* POST /poll
* Polls for offer data (offer, answer, ICE candidates)
* Body: { code: string, side: 'offerer' | 'answerer' }
* DELETE /offers/:offerId
* Delete a specific offer
* Requires authentication and ownership
*/
app.post('/poll', async (c) => {
app.delete('/offers/:offerId', authMiddleware, async (c) => {
try {
const offerId = c.req.param('offerId');
const peerId = getAuthenticatedPeerId(c);
const deleted = await storage.deleteOffer(offerId, peerId);
if (!deleted) {
return c.json({ error: 'Offer not found or not authorized' }, 404);
}
return c.json({ deleted: true }, 200);
} catch (err) {
console.error('Error deleting offer:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* POST /offers/:offerId/answer
* Answer a specific offer (locks it to answerer)
* Requires authentication
*/
app.post('/offers/:offerId/answer', authMiddleware, async (c) => {
try {
const offerId = c.req.param('offerId');
const peerId = getAuthenticatedPeerId(c);
const body = await c.req.json();
const { code, side } = body;
const { sdp } = body;
if (!code || typeof code !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: code' }, 400);
if (!sdp || typeof sdp !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: sdp' }, 400);
}
if (!side || (side !== 'offerer' && side !== 'answerer')) {
return c.json({ error: 'Invalid or missing parameter: side (must be "offerer" or "answerer")' }, 400);
if (sdp.length > 65536) {
return c.json({ error: 'SDP must be 64KB or less' }, 400);
}
const offer = await storage.getOffer(code);
const result = await storage.answerOffer(offerId, peerId, sdp);
if (!result.success) {
return c.json({ error: result.error }, 400);
}
return c.json({
offerId,
answererId: peerId,
answeredAt: Date.now()
}, 200);
} catch (err) {
console.error('Error answering offer:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* GET /offers/answers
* Poll for answers to all of authenticated peer's offers
* Requires authentication (offerer)
*/
app.get('/offers/answers', authMiddleware, async (c) => {
try {
const peerId = getAuthenticatedPeerId(c);
const offers = await storage.getAnsweredOffers(peerId);
return c.json({
answers: offers.map(o => ({
offerId: o.id,
answererId: o.answererPeerId,
sdp: o.answerSdp,
answeredAt: o.answeredAt,
topics: o.topics
}))
}, 200);
} catch (err) {
console.error('Error fetching answers:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* POST /offers/:offerId/ice-candidates
* Post ICE candidates for an offer
* Requires authentication (must be offerer or answerer)
*/
app.post('/offers/:offerId/ice-candidates', authMiddleware, async (c) => {
try {
const offerId = c.req.param('offerId');
const peerId = getAuthenticatedPeerId(c);
const body = await c.req.json();
const { candidates } = body;
if (!Array.isArray(candidates) || candidates.length === 0) {
return c.json({ error: 'Missing or invalid required parameter: candidates (must be non-empty array)' }, 400);
}
// Verify offer exists and caller is offerer or answerer
const offer = await storage.getOfferById(offerId);
if (!offer) {
return c.json({ error: 'Offer not found or expired' }, 404);
}
if (side === 'offerer') {
return c.json({
answer: offer.answer || null,
answerCandidates: offer.answerCandidates,
});
let role: 'offerer' | 'answerer';
if (offer.peerId === peerId) {
role = 'offerer';
} else if (offer.answererPeerId === peerId) {
role = 'answerer';
} else {
return c.json({
offer: offer.offer,
offerCandidates: offer.offerCandidates,
});
return c.json({ error: 'Not authorized to post ICE candidates for this offer' }, 403);
}
const added = await storage.addIceCandidates(offerId, peerId, role, candidates);
return c.json({
offerId,
candidatesAdded: added
}, 200);
} catch (err) {
console.error('Error polling offer:', err);
console.error('Error adding ICE candidates:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});
/**
* POST /leave
* Ends a session by deleting the offer
* Body: { code: string }
* GET /offers/:offerId/ice-candidates
* Poll for ICE candidates from the other peer
* Requires authentication (must be offerer or answerer)
*/
app.post('/leave', async (c) => {
app.get('/offers/:offerId/ice-candidates', authMiddleware, async (c) => {
try {
const body = await c.req.json();
const { code } = body;
const offerId = c.req.param('offerId');
const peerId = getAuthenticatedPeerId(c);
const sinceParam = c.req.query('since');
if (!code || typeof code !== 'string') {
return c.json({ error: 'Missing or invalid required parameter: code' }, 400);
const since = sinceParam ? parseInt(sinceParam, 10) : undefined;
// Verify offer exists and caller is offerer or answerer
const offer = await storage.getOfferById(offerId);
if (!offer) {
return c.json({ error: 'Offer not found or expired' }, 404);
}
await storage.deleteOffer(code);
let targetRole: 'offerer' | 'answerer';
if (offer.peerId === peerId) {
// Offerer wants answerer's candidates
targetRole = 'answerer';
} else if (offer.answererPeerId === peerId) {
// Answerer wants offerer's candidates
targetRole = 'offerer';
} else {
return c.json({ error: 'Not authorized to view ICE candidates for this offer' }, 403);
}
return c.json({ success: true }, 200);
const candidates = await storage.getIceCandidates(offerId, targetRole, since);
return c.json({
offerId,
candidates: candidates.map(c => ({
candidate: c.candidate,
peerId: c.peerId,
role: c.role,
createdAt: c.createdAt
}))
}, 200);
} catch (err) {
console.error('Error leaving session:', err);
console.error('Error fetching ICE candidates:', err);
return c.json({ error: 'Internal server error' }, 500);
}
});

62
src/bloom.ts Normal file
View File

@@ -0,0 +1,62 @@
/**
* Bloom filter utility for testing if peer IDs might be in a set
* Used to filter out known peers from discovery results
*/
export class BloomFilter {
private bits: Uint8Array;
private size: number;
private numHashes: number;
/**
* Creates a bloom filter from a base64 encoded bit array
*/
constructor(base64Data: string, numHashes: number = 3) {
// Decode base64 to buffer
const buffer = Buffer.from(base64Data, 'base64');
this.bits = new Uint8Array(buffer);
this.size = this.bits.length * 8;
this.numHashes = numHashes;
}
/**
* Test if a peer ID might be in the filter
* Returns true if possibly in set, false if definitely not in set
*/
test(peerId: string): boolean {
for (let i = 0; i < this.numHashes; i++) {
const hash = this.hash(peerId, i);
const index = hash % this.size;
const byteIndex = Math.floor(index / 8);
const bitIndex = index % 8;
if (!(this.bits[byteIndex] & (1 << bitIndex))) {
return false;
}
}
return true;
}
/**
* Simple hash function (FNV-1a variant)
*/
private hash(str: string, seed: number): number {
let hash = 2166136261 ^ seed;
for (let i = 0; i < str.length; i++) {
hash ^= str.charCodeAt(i);
hash += (hash << 1) + (hash << 4) + (hash << 7) + (hash << 8) + (hash << 24);
}
return hash >>> 0;
}
}
/**
* Helper to parse bloom filter from base64 string
*/
export function parseBloomFilter(base64: string): BloomFilter | null {
try {
return new BloomFilter(base64);
} catch {
return null;
}
}

View File

@@ -1,3 +1,5 @@
import { generateSecretKey } from './crypto.ts';
/**
* Application configuration
* Reads from environment variables with sensible defaults
@@ -6,23 +8,44 @@ export interface Config {
port: number;
storageType: 'sqlite' | 'memory';
storagePath: string;
offerTimeout: number;
corsOrigins: string[];
version: string;
authSecret: string;
offerDefaultTtl: number;
offerMaxTtl: number;
offerMinTtl: number;
cleanupInterval: number;
maxOffersPerRequest: number;
maxTopicsPerOffer: number;
}
/**
* Loads configuration from environment variables
*/
export function loadConfig(): Config {
// Generate or load auth secret
let authSecret = process.env.AUTH_SECRET;
if (!authSecret) {
authSecret = generateSecretKey();
console.warn('WARNING: No AUTH_SECRET provided. Generated temporary secret:', authSecret);
console.warn('All peer credentials will be invalidated on server restart.');
console.warn('Set AUTH_SECRET environment variable to persist credentials across restarts.');
}
return {
port: parseInt(process.env.PORT || '3000', 10),
storageType: (process.env.STORAGE_TYPE || 'sqlite') as 'sqlite' | 'memory',
storagePath: process.env.STORAGE_PATH || ':memory:',
offerTimeout: parseInt(process.env.OFFER_TIMEOUT || '60000', 10),
corsOrigins: process.env.CORS_ORIGINS
? process.env.CORS_ORIGINS.split(',').map(o => o.trim())
: ['*'],
version: process.env.VERSION || 'unknown',
authSecret,
offerDefaultTtl: parseInt(process.env.OFFER_DEFAULT_TTL || '60000', 10),
offerMaxTtl: parseInt(process.env.OFFER_MAX_TTL || '86400000', 10),
offerMinTtl: parseInt(process.env.OFFER_MIN_TTL || '60000', 10),
cleanupInterval: parseInt(process.env.CLEANUP_INTERVAL || '60000', 10),
maxOffersPerRequest: parseInt(process.env.MAX_OFFERS_PER_REQUEST || '100', 10),
maxTopicsPerOffer: parseInt(process.env.MAX_TOPICS_PER_OFFER || '50', 10),
};
}

149
src/crypto.ts Normal file
View File

@@ -0,0 +1,149 @@
/**
* Crypto utilities for stateless peer authentication
* Uses Web Crypto API for compatibility with both Node.js and Cloudflare Workers
*/
const ALGORITHM = 'AES-GCM';
const IV_LENGTH = 12; // 96 bits for GCM
const KEY_LENGTH = 32; // 256 bits
/**
* Generates a random peer ID (16 bytes = 32 hex chars)
*/
export function generatePeerId(): string {
const bytes = crypto.getRandomValues(new Uint8Array(16));
return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('');
}
/**
* Generates a random secret key for encryption (32 bytes = 64 hex chars)
*/
export function generateSecretKey(): string {
const bytes = crypto.getRandomValues(new Uint8Array(KEY_LENGTH));
return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('');
}
/**
* Convert hex string to Uint8Array
*/
function hexToBytes(hex: string): Uint8Array {
const bytes = new Uint8Array(hex.length / 2);
for (let i = 0; i < hex.length; i += 2) {
bytes[i / 2] = parseInt(hex.substring(i, i + 2), 16);
}
return bytes;
}
/**
* Convert Uint8Array to base64 string
*/
function bytesToBase64(bytes: Uint8Array): string {
const binString = Array.from(bytes, (byte) =>
String.fromCodePoint(byte)
).join('');
return btoa(binString);
}
/**
* Convert base64 string to Uint8Array
*/
function base64ToBytes(base64: string): Uint8Array {
const binString = atob(base64);
return Uint8Array.from(binString, (char) => char.codePointAt(0)!);
}
/**
* Encrypts a peer ID using the server secret key
* Returns base64-encoded encrypted data (IV + ciphertext)
*/
export async function encryptPeerId(peerId: string, secretKeyHex: string): Promise<string> {
const keyBytes = hexToBytes(secretKeyHex);
if (keyBytes.length !== KEY_LENGTH) {
throw new Error(`Secret key must be ${KEY_LENGTH * 2} hex characters (${KEY_LENGTH} bytes)`);
}
// Import key
const key = await crypto.subtle.importKey(
'raw',
keyBytes,
{ name: ALGORITHM, length: 256 },
false,
['encrypt']
);
// Generate random IV
const iv = crypto.getRandomValues(new Uint8Array(IV_LENGTH));
// Encrypt peer ID
const encoder = new TextEncoder();
const data = encoder.encode(peerId);
const encrypted = await crypto.subtle.encrypt(
{ name: ALGORITHM, iv },
key,
data
);
// Combine IV + ciphertext and encode as base64
const combined = new Uint8Array(iv.length + encrypted.byteLength);
combined.set(iv, 0);
combined.set(new Uint8Array(encrypted), iv.length);
return bytesToBase64(combined);
}
/**
* Decrypts an encrypted peer ID secret
* Returns the plaintext peer ID or throws if decryption fails
*/
export async function decryptPeerId(encryptedSecret: string, secretKeyHex: string): Promise<string> {
try {
const keyBytes = hexToBytes(secretKeyHex);
if (keyBytes.length !== KEY_LENGTH) {
throw new Error(`Secret key must be ${KEY_LENGTH * 2} hex characters (${KEY_LENGTH} bytes)`);
}
// Decode base64
const combined = base64ToBytes(encryptedSecret);
// Extract IV and ciphertext
const iv = combined.slice(0, IV_LENGTH);
const ciphertext = combined.slice(IV_LENGTH);
// Import key
const key = await crypto.subtle.importKey(
'raw',
keyBytes,
{ name: ALGORITHM, length: 256 },
false,
['decrypt']
);
// Decrypt
const decrypted = await crypto.subtle.decrypt(
{ name: ALGORITHM, iv },
key,
ciphertext
);
const decoder = new TextDecoder();
return decoder.decode(decrypted);
} catch (err) {
throw new Error('Failed to decrypt peer ID: invalid secret or secret key');
}
}
/**
* Validates that a peer ID and secret match
* Returns true if valid, false otherwise
*/
export async function validateCredentials(peerId: string, encryptedSecret: string, secretKey: string): Promise<boolean> {
try {
const decryptedPeerId = await decryptPeerId(encryptedSecret, secretKey);
return decryptedPeerId === peerId;
} catch {
return false;
}
}

View File

@@ -15,7 +15,12 @@ async function main() {
port: config.port,
storageType: config.storageType,
storagePath: config.storagePath,
offerTimeout: `${config.offerTimeout}ms`,
offerDefaultTtl: `${config.offerDefaultTtl}ms`,
offerMaxTtl: `${config.offerMaxTtl}ms`,
offerMinTtl: `${config.offerMinTtl}ms`,
cleanupInterval: `${config.cleanupInterval}ms`,
maxOffersPerRequest: config.maxOffersPerRequest,
maxTopicsPerOffer: config.maxTopicsPerOffer,
corsOrigins: config.corsOrigins,
version: config.version,
});
@@ -29,11 +34,20 @@ async function main() {
throw new Error('Unsupported storage type');
}
const app = createApp(storage, {
offerTimeout: config.offerTimeout,
corsOrigins: config.corsOrigins,
version: config.version,
});
// Start periodic cleanup of expired offers
const cleanupInterval = setInterval(async () => {
try {
const now = Date.now();
const deleted = await storage.deleteExpiredOffers(now);
if (deleted > 0) {
console.log(`Cleanup: Deleted ${deleted} expired offer(s)`);
}
} catch (err) {
console.error('Cleanup error:', err);
}
}, config.cleanupInterval);
const app = createApp(storage, config);
const server = serve({
fetch: app.fetch,
@@ -41,18 +55,18 @@ async function main() {
});
console.log(`Server running on http://localhost:${config.port}`);
console.log('Ready to accept connections');
process.on('SIGINT', async () => {
// Graceful shutdown handler
const shutdown = async () => {
console.log('\nShutting down gracefully...');
clearInterval(cleanupInterval);
await storage.close();
process.exit(0);
});
};
process.on('SIGTERM', async () => {
console.log('\nShutting down gracefully...');
await storage.close();
process.exit(0);
});
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
}
main().catch((err) => {

51
src/middleware/auth.ts Normal file
View File

@@ -0,0 +1,51 @@
import { Context, Next } from 'hono';
import { validateCredentials } from '../crypto.ts';
/**
* Authentication middleware for Rondevu
* Validates Bearer token in format: {peerId}:{encryptedSecret}
*/
export function createAuthMiddleware(authSecret: string) {
return async (c: Context, next: Next) => {
const authHeader = c.req.header('Authorization');
if (!authHeader) {
return c.json({ error: 'Missing Authorization header' }, 401);
}
// Expect format: Bearer {peerId}:{secret}
const parts = authHeader.split(' ');
if (parts.length !== 2 || parts[0] !== 'Bearer') {
return c.json({ error: 'Invalid Authorization header format. Expected: Bearer {peerId}:{secret}' }, 401);
}
const credentials = parts[1].split(':');
if (credentials.length !== 2) {
return c.json({ error: 'Invalid credentials format. Expected: {peerId}:{secret}' }, 401);
}
const [peerId, encryptedSecret] = credentials;
// Validate credentials (async operation)
const isValid = await validateCredentials(peerId, encryptedSecret, authSecret);
if (!isValid) {
return c.json({ error: 'Invalid credentials' }, 401);
}
// Attach peer ID to context for use in handlers
c.set('peerId', peerId);
await next();
};
}
/**
* Helper to get authenticated peer ID from context
*/
export function getAuthenticatedPeerId(c: Context): string {
const peerId = c.get('peerId');
if (!peerId) {
throw new Error('No authenticated peer ID in context');
}
return peerId;
}

View File

@@ -1,4 +1,4 @@
import { Storage, Offer } from './types.ts';
import { Storage, Offer, IceCandidate, CreateOfferRequest, TopicInfo } from './types.ts';
// Generate a UUID v4
function generateUUID(): string {
@@ -6,7 +6,8 @@ function generateUUID(): string {
}
/**
* D1 storage adapter for offer management using Cloudflare D1
* D1 storage adapter for topic-based offer management using Cloudflare D1
* NOTE: This implementation is a placeholder and needs to be fully tested
*/
export class D1Storage implements Storage {
private db: D1Database;
@@ -20,161 +21,337 @@ export class D1Storage implements Storage {
}
/**
* Initializes database schema
* Initializes database schema with new topic-based structure
* This should be run once during setup, not on every request
*/
async initializeDatabase(): Promise<void> {
await this.db.exec(`
CREATE TABLE IF NOT EXISTS offers (
code TEXT PRIMARY KEY,
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
answer_candidates TEXT NOT NULL DEFAULT '[]',
id TEXT PRIMARY KEY,
peer_id TEXT NOT NULL,
sdp TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
expires_at INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
answerer_peer_id TEXT,
answer_sdp TEXT,
answered_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_offers_expires_at ON offers(expires_at);
CREATE INDEX IF NOT EXISTS idx_offers_peer ON offers(peer_id);
CREATE INDEX IF NOT EXISTS idx_offers_expires ON offers(expires_at);
CREATE INDEX IF NOT EXISTS idx_offers_last_seen ON offers(last_seen);
CREATE INDEX IF NOT EXISTS idx_offers_answerer ON offers(answerer_peer_id);
CREATE TABLE IF NOT EXISTS offer_topics (
offer_id TEXT NOT NULL,
topic TEXT NOT NULL,
PRIMARY KEY (offer_id, topic),
FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_topics_topic ON offer_topics(topic);
CREATE INDEX IF NOT EXISTS idx_topics_offer ON offer_topics(offer_id);
CREATE TABLE IF NOT EXISTS ice_candidates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
offer_id TEXT NOT NULL,
peer_id TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('offerer', 'answerer')),
candidate TEXT NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_ice_offer ON ice_candidates(offer_id);
CREATE INDEX IF NOT EXISTS idx_ice_peer ON ice_candidates(peer_id);
CREATE INDEX IF NOT EXISTS idx_ice_created ON ice_candidates(created_at);
`);
}
async createOffer(
peerId: string,
offer: string,
expiresAt: number,
customCode?: string
): Promise<string> {
let code: string;
let attempts = 0;
const maxAttempts = 10;
async createOffers(offers: CreateOfferRequest[]): Promise<Offer[]> {
const created: Offer[] = [];
// Generate unique code or use custom
do {
code = customCode || generateUUID();
attempts++;
// D1 doesn't support true transactions yet, so we do this sequentially
for (const offer of offers) {
const id = offer.id || generateUUID();
const now = Date.now();
if (attempts > maxAttempts) {
throw new Error('Failed to generate unique offer code');
}
try {
// Insert offer
await this.db.prepare(`
INSERT INTO offers (code, peer_id, offer, created_at, expires_at)
VALUES (?, ?, ?, ?, ?)
`).bind(code, peerId, offer, Date.now(), expiresAt).run();
INSERT INTO offers (id, peer_id, sdp, created_at, expires_at, last_seen)
VALUES (?, ?, ?, ?, ?, ?)
`).bind(id, offer.peerId, offer.sdp, now, offer.expiresAt, now).run();
break;
} catch (err: any) {
// If unique constraint failed with custom code, throw error
if (err.message?.includes('UNIQUE constraint failed')) {
if (customCode) {
throw new Error(`Offer code '${customCode}' already exists`);
}
// Try again with new generated code
continue;
}
throw err;
}
} while (true);
return code;
// Insert topics
for (const topic of offer.topics) {
await this.db.prepare(`
INSERT INTO offer_topics (offer_id, topic)
VALUES (?, ?)
`).bind(id, topic).run();
}
async getOffer(code: string): Promise<Offer | null> {
try {
created.push({
id,
peerId: offer.peerId,
sdp: offer.sdp,
topics: offer.topics,
createdAt: now,
expiresAt: offer.expiresAt,
lastSeen: now,
});
}
return created;
}
async getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise<Offer[]> {
let query = `
SELECT DISTINCT o.*
FROM offers o
INNER JOIN offer_topics ot ON o.id = ot.offer_id
WHERE ot.topic = ? AND o.expires_at > ?
`;
const params: any[] = [topic, Date.now()];
if (excludePeerIds && excludePeerIds.length > 0) {
const placeholders = excludePeerIds.map(() => '?').join(',');
query += ` AND o.peer_id NOT IN (${placeholders})`;
params.push(...excludePeerIds);
}
query += ' ORDER BY o.last_seen DESC';
const result = await this.db.prepare(query).bind(...params).all();
if (!result.results) {
return [];
}
return Promise.all(result.results.map(row => this.rowToOffer(row as any)));
}
async getOffersByPeerId(peerId: string): Promise<Offer[]> {
const result = await this.db.prepare(`
SELECT * FROM offers
WHERE code = ? AND expires_at > ?
`).bind(code, Date.now()).first();
WHERE peer_id = ? AND expires_at > ?
ORDER BY last_seen DESC
`).bind(peerId, Date.now()).all();
if (!result.results) {
return [];
}
return Promise.all(result.results.map(row => this.rowToOffer(row as any)));
}
async getOfferById(offerId: string): Promise<Offer | null> {
const result = await this.db.prepare(`
SELECT * FROM offers
WHERE id = ? AND expires_at > ?
`).bind(offerId, Date.now()).first();
if (!result) {
return null;
}
const row: any = result;
return {
code: row.code,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates || '[]'),
answerCandidates: JSON.parse(row.answer_candidates || '[]'),
createdAt: row.created_at,
expiresAt: row.expires_at,
};
} catch (error) {
console.error('[D1] getOffer error:', error);
throw error;
}
return this.rowToOffer(result as any);
}
async updateOffer(code: string, update: Partial<Offer>): Promise<void> {
// Verify offer exists
const current = await this.getOffer(code);
if (!current) {
throw new Error('Offer not found');
}
// Build update query dynamically based on what fields are being updated
const updates: string[] = [];
const values: any[] = [];
if (update.answer !== undefined) {
updates.push('answer = ?');
values.push(update.answer);
}
if (update.offerCandidates !== undefined) {
updates.push('offer_candidates = ?');
values.push(JSON.stringify(update.offerCandidates));
}
if (update.answerCandidates !== undefined) {
updates.push('answer_candidates = ?');
values.push(JSON.stringify(update.answerCandidates));
}
if (updates.length === 0) {
return; // Nothing to update
}
// Add WHERE clause values
values.push(code);
// D1 provides strong consistency, so this update is atomic and immediately visible
const query = `
UPDATE offers
SET ${updates.join(', ')}
WHERE code = ?
`;
await this.db.prepare(query).bind(...values).run();
}
async deleteOffer(code: string): Promise<void> {
async updateOfferLastSeen(offerId: string, lastSeen: number): Promise<void> {
await this.db.prepare(`
DELETE FROM offers WHERE code = ?
`).bind(code).run();
UPDATE offers
SET last_seen = ?
WHERE id = ? AND expires_at > ?
`).bind(lastSeen, offerId, Date.now()).run();
}
async cleanupExpiredOffers(): Promise<number> {
async deleteOffer(offerId: string, ownerPeerId: string): Promise<boolean> {
const result = await this.db.prepare(`
DELETE FROM offers WHERE expires_at <= ?
`).bind(Date.now()).run();
DELETE FROM offers
WHERE id = ? AND peer_id = ?
`).bind(offerId, ownerPeerId).run();
return (result.meta.changes || 0) > 0;
}
async deleteExpiredOffers(now: number): Promise<number> {
const result = await this.db.prepare(`
DELETE FROM offers WHERE expires_at < ?
`).bind(now).run();
return result.meta.changes || 0;
}
async cleanup(): Promise<void> {
await this.cleanupExpiredOffers();
async answerOffer(
offerId: string,
answererPeerId: string,
answerSdp: string
): Promise<{ success: boolean; error?: string }> {
// Check if offer exists and is not expired
const offer = await this.getOfferById(offerId);
if (!offer) {
return {
success: false,
error: 'Offer not found or expired'
};
}
// Check if offer already has an answerer
if (offer.answererPeerId) {
return {
success: false,
error: 'Offer already answered'
};
}
// Update offer with answer
const result = await this.db.prepare(`
UPDATE offers
SET answerer_peer_id = ?, answer_sdp = ?, answered_at = ?
WHERE id = ? AND answerer_peer_id IS NULL
`).bind(answererPeerId, answerSdp, Date.now(), offerId).run();
if ((result.meta.changes || 0) === 0) {
return {
success: false,
error: 'Offer already answered (race condition)'
};
}
return { success: true };
}
async getAnsweredOffers(offererPeerId: string): Promise<Offer[]> {
const result = await this.db.prepare(`
SELECT * FROM offers
WHERE peer_id = ? AND answerer_peer_id IS NOT NULL AND expires_at > ?
ORDER BY answered_at DESC
`).bind(offererPeerId, Date.now()).all();
if (!result.results) {
return [];
}
return Promise.all(result.results.map(row => this.rowToOffer(row as any)));
}
async addIceCandidates(
offerId: string,
peerId: string,
role: 'offerer' | 'answerer',
candidates: string[]
): Promise<number> {
// D1 doesn't have transactions, so insert one by one
for (const candidate of candidates) {
await this.db.prepare(`
INSERT INTO ice_candidates (offer_id, peer_id, role, candidate, created_at)
VALUES (?, ?, ?, ?, ?)
`).bind(offerId, peerId, role, candidate, Date.now()).run();
}
return candidates.length;
}
async getIceCandidates(
offerId: string,
targetRole: 'offerer' | 'answerer',
since?: number
): Promise<IceCandidate[]> {
let query = `
SELECT * FROM ice_candidates
WHERE offer_id = ? AND role = ?
`;
const params: any[] = [offerId, targetRole];
if (since !== undefined) {
query += ' AND created_at > ?';
params.push(since);
}
query += ' ORDER BY created_at ASC';
const result = await this.db.prepare(query).bind(...params).all();
if (!result.results) {
return [];
}
return result.results.map((row: any) => ({
id: row.id,
offerId: row.offer_id,
peerId: row.peer_id,
role: row.role,
candidate: row.candidate,
createdAt: row.created_at,
}));
}
async getTopics(limit: number, offset: number): Promise<{
topics: TopicInfo[];
total: number;
}> {
// Get total count of topics with active offers
const countResult = await this.db.prepare(`
SELECT COUNT(DISTINCT ot.topic) as count
FROM offer_topics ot
INNER JOIN offers o ON ot.offer_id = o.id
WHERE o.expires_at > ?
`).bind(Date.now()).first();
const total = (countResult as any)?.count || 0;
// Get topics with peer counts (paginated)
const topicsResult = await this.db.prepare(`
SELECT
ot.topic,
COUNT(DISTINCT o.peer_id) as active_peers
FROM offer_topics ot
INNER JOIN offers o ON ot.offer_id = o.id
WHERE o.expires_at > ?
GROUP BY ot.topic
ORDER BY active_peers DESC, ot.topic ASC
LIMIT ? OFFSET ?
`).bind(Date.now(), limit, offset).all();
const topics = (topicsResult.results || []).map((row: any) => ({
topic: row.topic,
activePeers: row.active_peers,
}));
return { topics, total };
}
async close(): Promise<void> {
// D1 doesn't require explicit connection closing
// Connections are managed by the Cloudflare Workers runtime
}
/**
* Helper method to convert database row to Offer object with topics
*/
private async rowToOffer(row: any): Promise<Offer> {
// Get topics for this offer
const topicResult = await this.db.prepare(`
SELECT topic FROM offer_topics WHERE offer_id = ?
`).bind(row.id).all();
const topics = topicResult.results?.map((t: any) => t.topic) || [];
return {
id: row.id,
peerId: row.peer_id,
sdp: row.sdp,
topics,
createdAt: row.created_at,
expiresAt: row.expires_at,
lastSeen: row.last_seen,
answererPeerId: row.answerer_peer_id || undefined,
answerSdp: row.answer_sdp || undefined,
answeredAt: row.answered_at || undefined,
};
}
}

View File

@@ -1,9 +1,9 @@
import Database from 'better-sqlite3';
import { randomUUID } from 'crypto';
import { Storage, Offer } from './types.ts';
import { Storage, Offer, IceCandidate, CreateOfferRequest, TopicInfo } from './types.ts';
/**
* SQLite storage adapter for offer management
* SQLite storage adapter for topic-based offer management
* Supports both file-based and in-memory databases
*/
export class SQLiteStorage implements Storage {
@@ -16,167 +16,356 @@ export class SQLiteStorage implements Storage {
constructor(path: string = ':memory:') {
this.db = new Database(path);
this.initializeDatabase();
this.startCleanupInterval();
}
/**
* Initializes database schema
* Initializes database schema with new topic-based structure
*/
private initializeDatabase(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS offers (
code TEXT PRIMARY KEY,
peer_id TEXT NOT NULL CHECK(length(peer_id) <= 1024),
offer TEXT NOT NULL,
answer TEXT,
offer_candidates TEXT NOT NULL DEFAULT '[]',
answer_candidates TEXT NOT NULL DEFAULT '[]',
id TEXT PRIMARY KEY,
peer_id TEXT NOT NULL,
sdp TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
expires_at INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
answerer_peer_id TEXT,
answer_sdp TEXT,
answered_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_offers_expires_at ON offers(expires_at);
CREATE INDEX IF NOT EXISTS idx_offers_peer ON offers(peer_id);
CREATE INDEX IF NOT EXISTS idx_offers_expires ON offers(expires_at);
CREATE INDEX IF NOT EXISTS idx_offers_last_seen ON offers(last_seen);
CREATE INDEX IF NOT EXISTS idx_offers_answerer ON offers(answerer_peer_id);
CREATE TABLE IF NOT EXISTS offer_topics (
offer_id TEXT NOT NULL,
topic TEXT NOT NULL,
PRIMARY KEY (offer_id, topic),
FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_topics_topic ON offer_topics(topic);
CREATE INDEX IF NOT EXISTS idx_topics_offer ON offer_topics(offer_id);
CREATE TABLE IF NOT EXISTS ice_candidates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
offer_id TEXT NOT NULL,
peer_id TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('offerer', 'answerer')),
candidate TEXT NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY (offer_id) REFERENCES offers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_ice_offer ON ice_candidates(offer_id);
CREATE INDEX IF NOT EXISTS idx_ice_peer ON ice_candidates(peer_id);
CREATE INDEX IF NOT EXISTS idx_ice_created ON ice_candidates(created_at);
`);
// Enable foreign keys
this.db.pragma('foreign_keys = ON');
}
/**
* Starts periodic cleanup of expired offers
*/
private startCleanupInterval(): void {
// Run cleanup every minute
setInterval(() => {
this.cleanup().catch(err => {
console.error('Cleanup error:', err);
async createOffers(offers: CreateOfferRequest[]): Promise<Offer[]> {
const created: Offer[] = [];
// Use transaction for atomic creation
const transaction = this.db.transaction((offers: CreateOfferRequest[]) => {
const offerStmt = this.db.prepare(`
INSERT INTO offers (id, peer_id, sdp, created_at, expires_at, last_seen)
VALUES (?, ?, ?, ?, ?, ?)
`);
const topicStmt = this.db.prepare(`
INSERT INTO offer_topics (offer_id, topic)
VALUES (?, ?)
`);
for (const offer of offers) {
const id = offer.id || randomUUID();
const now = Date.now();
// Insert offer
offerStmt.run(
id,
offer.peerId,
offer.sdp,
now,
offer.expiresAt,
now
);
// Insert topics
for (const topic of offer.topics) {
topicStmt.run(id, topic);
}
created.push({
id,
peerId: offer.peerId,
sdp: offer.sdp,
topics: offer.topics,
createdAt: now,
expiresAt: offer.expiresAt,
lastSeen: now,
});
}, 60000);
}
});
transaction(offers);
return created;
}
/**
* Generates a unique code using UUID
*/
private generateCode(): string {
return randomUUID();
async getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise<Offer[]> {
let query = `
SELECT DISTINCT o.*
FROM offers o
INNER JOIN offer_topics ot ON o.id = ot.offer_id
WHERE ot.topic = ? AND o.expires_at > ?
`;
const params: any[] = [topic, Date.now()];
if (excludePeerIds && excludePeerIds.length > 0) {
const placeholders = excludePeerIds.map(() => '?').join(',');
query += ` AND o.peer_id NOT IN (${placeholders})`;
params.push(...excludePeerIds);
}
async createOffer(peerId: string, offer: string, expiresAt: number, customCode?: string): Promise<string> {
// Validate peerId length
if (peerId.length > 1024) {
throw new Error('PeerId string must be 1024 characters or less');
query += ' ORDER BY o.last_seen DESC';
const stmt = this.db.prepare(query);
const rows = stmt.all(...params) as any[];
return Promise.all(rows.map(row => this.rowToOffer(row)));
}
let code: string;
let attempts = 0;
const maxAttempts = 10;
// Try to generate or use custom code
do {
code = customCode || this.generateCode();
attempts++;
if (attempts > maxAttempts) {
throw new Error('Failed to generate unique offer code');
}
try {
async getOffersByPeerId(peerId: string): Promise<Offer[]> {
const stmt = this.db.prepare(`
INSERT INTO offers (code, peer_id, offer, created_at, expires_at)
VALUES (?, ?, ?, ?, ?)
SELECT * FROM offers
WHERE peer_id = ? AND expires_at > ?
ORDER BY last_seen DESC
`);
stmt.run(code, peerId, offer, Date.now(), expiresAt);
break;
} catch (err: any) {
// If unique constraint failed with custom code, throw error
if (err.code === 'SQLITE_CONSTRAINT_PRIMARYKEY') {
if (customCode) {
throw new Error(`Offer code '${customCode}' already exists`);
}
// Try again with new generated code
continue;
}
throw err;
}
} while (true);
return code;
const rows = stmt.all(peerId, Date.now()) as any[];
return Promise.all(rows.map(row => this.rowToOffer(row)));
}
async getOffer(code: string): Promise<Offer | null> {
async getOfferById(offerId: string): Promise<Offer | null> {
const stmt = this.db.prepare(`
SELECT * FROM offers WHERE code = ? AND expires_at > ?
SELECT * FROM offers
WHERE id = ? AND expires_at > ?
`);
const row = stmt.get(code, Date.now()) as any;
const row = stmt.get(offerId, Date.now()) as any;
if (!row) {
return null;
}
return this.rowToOffer(row);
}
async updateOfferLastSeen(offerId: string, lastSeen: number): Promise<void> {
const stmt = this.db.prepare(`
UPDATE offers
SET last_seen = ?
WHERE id = ? AND expires_at > ?
`);
stmt.run(lastSeen, offerId, Date.now());
}
async deleteOffer(offerId: string, ownerPeerId: string): Promise<boolean> {
const stmt = this.db.prepare(`
DELETE FROM offers
WHERE id = ? AND peer_id = ?
`);
const result = stmt.run(offerId, ownerPeerId);
return result.changes > 0;
}
async deleteExpiredOffers(now: number): Promise<number> {
const stmt = this.db.prepare('DELETE FROM offers WHERE expires_at < ?');
const result = stmt.run(now);
return result.changes;
}
async answerOffer(
offerId: string,
answererPeerId: string,
answerSdp: string
): Promise<{ success: boolean; error?: string }> {
// Check if offer exists and is not expired
const offer = await this.getOfferById(offerId);
if (!offer) {
return {
code: row.code,
peerId: row.peer_id,
offer: row.offer,
answer: row.answer || undefined,
offerCandidates: JSON.parse(row.offer_candidates),
answerCandidates: JSON.parse(row.answer_candidates),
createdAt: row.created_at,
expiresAt: row.expires_at,
success: false,
error: 'Offer not found or expired'
};
}
async updateOffer(code: string, update: Partial<Offer>): Promise<void> {
const current = await this.getOffer(code);
if (!current) {
throw new Error('Offer not found');
// Check if offer already has an answerer
if (offer.answererPeerId) {
return {
success: false,
error: 'Offer already answered'
};
}
const updates: string[] = [];
const values: any[] = [];
if (update.answer !== undefined) {
updates.push('answer = ?');
values.push(update.answer);
}
if (update.offerCandidates !== undefined) {
updates.push('offer_candidates = ?');
values.push(JSON.stringify(update.offerCandidates));
}
if (update.answerCandidates !== undefined) {
updates.push('answer_candidates = ?');
values.push(JSON.stringify(update.answerCandidates));
}
if (updates.length === 0) {
return;
}
values.push(code);
// Update offer with answer
const stmt = this.db.prepare(`
UPDATE offers SET ${updates.join(', ')} WHERE code = ?
UPDATE offers
SET answerer_peer_id = ?, answer_sdp = ?, answered_at = ?
WHERE id = ? AND answerer_peer_id IS NULL
`);
stmt.run(...values);
const result = stmt.run(answererPeerId, answerSdp, Date.now(), offerId);
if (result.changes === 0) {
return {
success: false,
error: 'Offer already answered (race condition)'
};
}
async deleteOffer(code: string): Promise<void> {
const stmt = this.db.prepare('DELETE FROM offers WHERE code = ?');
stmt.run(code);
return { success: true };
}
async cleanup(): Promise<void> {
const stmt = this.db.prepare('DELETE FROM offers WHERE expires_at <= ?');
const result = stmt.run(Date.now());
async getAnsweredOffers(offererPeerId: string): Promise<Offer[]> {
const stmt = this.db.prepare(`
SELECT * FROM offers
WHERE peer_id = ? AND answerer_peer_id IS NOT NULL AND expires_at > ?
ORDER BY answered_at DESC
`);
if (result.changes > 0) {
console.log(`Cleaned up ${result.changes} expired offer(s)`);
const rows = stmt.all(offererPeerId, Date.now()) as any[];
return Promise.all(rows.map(row => this.rowToOffer(row)));
}
async addIceCandidates(
offerId: string,
peerId: string,
role: 'offerer' | 'answerer',
candidates: string[]
): Promise<number> {
const stmt = this.db.prepare(`
INSERT INTO ice_candidates (offer_id, peer_id, role, candidate, created_at)
VALUES (?, ?, ?, ?, ?)
`);
const transaction = this.db.transaction((candidates: string[]) => {
for (const candidate of candidates) {
stmt.run(offerId, peerId, role, candidate, Date.now());
}
});
transaction(candidates);
return candidates.length;
}
async getIceCandidates(
offerId: string,
targetRole: 'offerer' | 'answerer',
since?: number
): Promise<IceCandidate[]> {
let query = `
SELECT * FROM ice_candidates
WHERE offer_id = ? AND role = ?
`;
const params: any[] = [offerId, targetRole];
if (since !== undefined) {
query += ' AND created_at > ?';
params.push(since);
}
query += ' ORDER BY created_at ASC';
const stmt = this.db.prepare(query);
const rows = stmt.all(...params) as any[];
return rows.map(row => ({
id: row.id,
offerId: row.offer_id,
peerId: row.peer_id,
role: row.role,
candidate: row.candidate,
createdAt: row.created_at,
}));
}
async getTopics(limit: number, offset: number): Promise<{
topics: TopicInfo[];
total: number;
}> {
// Get total count of topics with active offers
const countStmt = this.db.prepare(`
SELECT COUNT(DISTINCT ot.topic) as count
FROM offer_topics ot
INNER JOIN offers o ON ot.offer_id = o.id
WHERE o.expires_at > ?
`);
const countRow = countStmt.get(Date.now()) as any;
const total = countRow.count;
// Get topics with peer counts (paginated)
const topicsStmt = this.db.prepare(`
SELECT
ot.topic,
COUNT(DISTINCT o.peer_id) as active_peers
FROM offer_topics ot
INNER JOIN offers o ON ot.offer_id = o.id
WHERE o.expires_at > ?
GROUP BY ot.topic
ORDER BY active_peers DESC, ot.topic ASC
LIMIT ? OFFSET ?
`);
const rows = topicsStmt.all(Date.now(), limit, offset) as any[];
const topics = rows.map(row => ({
topic: row.topic,
activePeers: row.active_peers,
}));
return { topics, total };
}
async close(): Promise<void> {
this.db.close();
}
/**
* Helper method to convert database row to Offer object with topics
*/
private async rowToOffer(row: any): Promise<Offer> {
// Get topics for this offer
const topicStmt = this.db.prepare(`
SELECT topic FROM offer_topics WHERE offer_id = ?
`);
const topicRows = topicStmt.all(row.id) as any[];
const topics = topicRows.map(t => t.topic);
return {
id: row.id,
peerId: row.peer_id,
sdp: row.sdp,
topics,
createdAt: row.created_at,
expiresAt: row.expires_at,
lastSeen: row.last_seen,
answererPeerId: row.answerer_peer_id || undefined,
answerSdp: row.answer_sdp || undefined,
answeredAt: row.answered_at || undefined,
};
}
}

View File

@@ -1,57 +1,163 @@
/**
* Represents a WebRTC signaling offer
* Represents a WebRTC signaling offer with topic-based discovery
*/
export interface Offer {
code: string;
id: string;
peerId: string;
offer: string;
answer?: string;
offerCandidates: string[];
answerCandidates: string[];
sdp: string;
topics: string[];
createdAt: number;
expiresAt: number;
lastSeen: number;
answererPeerId?: string;
answerSdp?: string;
answeredAt?: number;
}
/**
* Represents an ICE candidate for WebRTC signaling
*/
export interface IceCandidate {
id: number;
offerId: string;
peerId: string;
role: 'offerer' | 'answerer';
candidate: string;
createdAt: number;
}
/**
* Represents a topic with active peer count
*/
export interface TopicInfo {
topic: string;
activePeers: number;
}
/**
* Request to create a new offer
*/
export interface CreateOfferRequest {
id?: string;
peerId: string;
sdp: string;
topics: string[];
expiresAt: number;
}
/**
* Storage interface for offer management
* Storage interface for offer management with topic-based discovery
* Implementations can use different backends (SQLite, D1, Memory, etc.)
*/
export interface Storage {
/**
* Creates a new offer
* @param peerId Peer identifier string (max 1024 chars)
* @param offer The WebRTC SDP offer message
* @param expiresAt Unix timestamp when the offer should expire
* @param customCode Optional custom code (if not provided, generates UUID)
* @returns The unique offer code
* Creates one or more offers
* @param offers Array of offer creation requests
* @returns Array of created offers with IDs
*/
createOffer(peerId: string, offer: string, expiresAt: number, customCode?: string): Promise<string>;
createOffers(offers: CreateOfferRequest[]): Promise<Offer[]>;
/**
* Retrieves an offer by its code
* @param code The offer code
* Retrieves offers by topic with optional peer ID exclusion
* @param topic Topic to search for
* @param excludePeerIds Optional array of peer IDs to exclude
* @returns Array of offers matching the topic
*/
getOffersByTopic(topic: string, excludePeerIds?: string[]): Promise<Offer[]>;
/**
* Retrieves all offers from a specific peer
* @param peerId Peer identifier
* @returns Array of offers from the peer
*/
getOffersByPeerId(peerId: string): Promise<Offer[]>;
/**
* Retrieves a specific offer by ID
* @param offerId Offer identifier
* @returns The offer if found, null otherwise
*/
getOffer(code: string): Promise<Offer | null>;
getOfferById(offerId: string): Promise<Offer | null>;
/**
* Updates an existing offer with new data
* @param code The offer code
* @param update Partial offer data to update
* Updates the last_seen timestamp for an offer (heartbeat)
* @param offerId Offer identifier
* @param lastSeen New last_seen timestamp
*/
updateOffer(code: string, update: Partial<Offer>): Promise<void>;
updateOfferLastSeen(offerId: string, lastSeen: number): Promise<void>;
/**
* Deletes an offer
* @param code The offer code
* Deletes an offer (with ownership verification)
* @param offerId Offer identifier
* @param ownerPeerId Peer ID of the owner (for verification)
* @returns true if deleted, false if not found or not owned
*/
deleteOffer(code: string): Promise<void>;
deleteOffer(offerId: string, ownerPeerId: string): Promise<boolean>;
/**
* Removes expired offers
* Should be called periodically to clean up old data
* Deletes all expired offers
* @param now Current timestamp
* @returns Number of offers deleted
*/
cleanup(): Promise<void>;
deleteExpiredOffers(now: number): Promise<number>;
/**
* Answers an offer (locks it to the answerer)
* @param offerId Offer identifier
* @param answererPeerId Answerer's peer ID
* @param answerSdp WebRTC answer SDP
* @returns Success status and optional error message
*/
answerOffer(offerId: string, answererPeerId: string, answerSdp: string): Promise<{
success: boolean;
error?: string;
}>;
/**
* Retrieves all answered offers for a specific offerer
* @param offererPeerId Offerer's peer ID
* @returns Array of answered offers
*/
getAnsweredOffers(offererPeerId: string): Promise<Offer[]>;
/**
* Adds ICE candidates for an offer
* @param offerId Offer identifier
* @param peerId Peer ID posting the candidates
* @param role Role of the peer (offerer or answerer)
* @param candidates Array of ICE candidate strings
* @returns Number of candidates added
*/
addIceCandidates(
offerId: string,
peerId: string,
role: 'offerer' | 'answerer',
candidates: string[]
): Promise<number>;
/**
* Retrieves ICE candidates for an offer
* @param offerId Offer identifier
* @param targetRole Role to retrieve candidates for (offerer or answerer)
* @param since Optional timestamp - only return candidates after this time
* @returns Array of ICE candidates
*/
getIceCandidates(
offerId: string,
targetRole: 'offerer' | 'answerer',
since?: number
): Promise<IceCandidate[]>;
/**
* Retrieves topics with active peer counts (paginated)
* @param limit Maximum number of topics to return
* @param offset Number of topics to skip
* @returns Object with topics array and total count
*/
getTopics(limit: number, offset: number): Promise<{
topics: TopicInfo[];
total: number;
}>;
/**
* Closes the storage connection and releases resources

View File

@@ -1,12 +1,19 @@
import { createApp } from './app.ts';
import { D1Storage } from './storage/d1.ts';
import { generateSecretKey } from './crypto.ts';
import { Config } from './config.ts';
/**
* Cloudflare Workers environment bindings
*/
export interface Env {
DB: D1Database;
OFFER_TIMEOUT?: string;
AUTH_SECRET?: string;
OFFER_DEFAULT_TTL?: string;
OFFER_MAX_TTL?: string;
OFFER_MIN_TTL?: string;
MAX_OFFERS_PER_REQUEST?: string;
MAX_TOPICS_PER_OFFER?: string;
CORS_ORIGINS?: string;
VERSION?: string;
}
@@ -19,21 +26,29 @@ export default {
// Initialize D1 storage
const storage = new D1Storage(env.DB);
// Parse configuration
const offerTimeout = env.OFFER_TIMEOUT
? parseInt(env.OFFER_TIMEOUT, 10)
: 60000; // 1 minute default
// Generate or use provided auth secret
const authSecret = env.AUTH_SECRET || generateSecretKey();
const corsOrigins = env.CORS_ORIGINS
// Build config from environment
const config: Config = {
port: 0, // Not used in Workers
storageType: 'sqlite', // D1 is SQLite-compatible
storagePath: '', // Not used with D1
corsOrigins: env.CORS_ORIGINS
? env.CORS_ORIGINS.split(',').map(o => o.trim())
: ['*'];
: ['*'],
version: env.VERSION || 'unknown',
authSecret,
offerDefaultTtl: env.OFFER_DEFAULT_TTL ? parseInt(env.OFFER_DEFAULT_TTL, 10) : 60000,
offerMaxTtl: env.OFFER_MAX_TTL ? parseInt(env.OFFER_MAX_TTL, 10) : 86400000,
offerMinTtl: env.OFFER_MIN_TTL ? parseInt(env.OFFER_MIN_TTL, 10) : 60000,
cleanupInterval: 60000, // Not used in Workers (scheduled handler instead)
maxOffersPerRequest: env.MAX_OFFERS_PER_REQUEST ? parseInt(env.MAX_OFFERS_PER_REQUEST, 10) : 100,
maxTopicsPerOffer: env.MAX_TOPICS_PER_OFFER ? parseInt(env.MAX_TOPICS_PER_OFFER, 10) : 50,
};
// Create Hono app
const app = createApp(storage, {
offerTimeout,
corsOrigins,
version: env.VERSION || 'unknown',
});
const app = createApp(storage, config);
// Handle request
return app.fetch(request, env, ctx);
@@ -41,15 +56,15 @@ export default {
/**
* Scheduled handler for cron triggers
* Runs every minute to clean up expired offers
* Runs periodically to clean up expired offers
*/
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
const storage = new D1Storage(env.DB);
const now = Date.now();
try {
// Delete expired offers using the storage method
const deletedCount = await storage.cleanupExpiredOffers();
// Delete expired offers
const deletedCount = await storage.deleteExpiredOffers(now);
console.log(`Cleaned up ${deletedCount} expired offers at ${new Date(now).toISOString()}`);
} catch (error) {

View File

@@ -1,6 +1,7 @@
name = "rondevu"
main = "src/worker.ts"
compatibility_date = "2024-01-01"
compatibility_flags = ["nodejs_compat"]
# D1 Database binding
[[d1_databases]]
@@ -10,9 +11,17 @@ database_id = "b94e3f71-816d-455b-a89d-927fa49532d0"
# Environment variables
[vars]
OFFER_TIMEOUT = "60000" # 1 minute in milliseconds
OFFER_DEFAULT_TTL = "60000" # Default offer TTL: 1 minute
OFFER_MAX_TTL = "86400000" # Max offer TTL: 24 hours
OFFER_MIN_TTL = "60000" # Min offer TTL: 1 minute
MAX_OFFERS_PER_REQUEST = "100" # Max offers per request
MAX_TOPICS_PER_OFFER = "50" # Max topics per offer
CORS_ORIGINS = "*" # Comma-separated list of allowed origins
VERSION = "0.0.1" # Semantic version
VERSION = "0.1.0" # Semantic version
# AUTH_SECRET should be set as a secret, not a var
# Run: npx wrangler secret put AUTH_SECRET
# Enter a 64-character hex string (32 bytes)
# Build configuration
[build]