From a480fa3ba4e658609b77d4d0bdcde43860877df9 Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sun, 14 Dec 2025 16:52:57 +0100 Subject: [PATCH] Add durable WebRTC connections with auto-reconnect and message buffering (v0.18.8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add connection state machine with proper lifecycle management - Implement automatic reconnection with exponential backoff - Add message buffering during disconnections - Create RondevuConnection base class with state tracking - Create OffererConnection and AnswererConnection classes - Fix ICE polling lifecycle (now stops when connected) - Add fillOffers() semaphore to prevent exceeding maxOffers - Implement answer fingerprinting to prevent duplicate processing - Add dual ICE state monitoring (iceConnectionState + connectionState) - Fix data channel handler timing issues - Add comprehensive event system (20+ events) - Add connection timeouts and proper cleanup Breaking changes: - connectToService() now returns AnswererConnection instead of ConnectionContext - connection:opened event signature changed: (offerId, dc) → (offerId, connection) - Direct DataChannel access replaced with connection wrapper API See MIGRATION.md for upgrade guide. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- MIGRATION.md | 832 ++++++++++++++++--------------------- package.json | 2 +- src/answerer-connection.ts | 183 ++++++++ src/connection-config.ts | 64 +++ src/connection-events.ts | 102 +++++ src/connection.ts | 567 +++++++++++++++++++++++++ src/exponential-backoff.ts | 59 +++ src/index.ts | 32 ++ src/message-buffer.ts | 125 ++++++ src/offerer-connection.ts | 213 ++++++++++ src/rondevu.ts | 525 +++++++++-------------- 11 files changed, 1905 insertions(+), 799 deletions(-) create mode 100644 src/answerer-connection.ts create mode 100644 src/connection-config.ts create mode 100644 src/connection-events.ts create mode 100644 src/connection.ts create mode 100644 src/exponential-backoff.ts create mode 100644 src/message-buffer.ts create mode 100644 src/offerer-connection.ts diff --git a/MIGRATION.md b/MIGRATION.md index 067803b..7741271 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -1,547 +1,441 @@ -# Migration Guide: v0.8.x → v0.9.0 +# Migration Guide: v0.18.x → v0.18.8 -This guide helps you migrate from Rondevu Client v0.8.x to v0.9.0. +Version 0.18.8 introduces significant improvements to connection durability and reliability. While we've maintained backward compatibility where possible, there are some breaking changes to be aware of. -## Overview +## Overview of Changes -v0.9.0 is a **breaking change** that completely replaces low-level APIs with high-level durable connections featuring automatic reconnection and message queuing. +### New Features +- **Automatic reconnection** with exponential backoff +- **Message buffering** during disconnections +- **Connection state machine** with proper lifecycle management +- **Rich event system** for connection monitoring +- **ICE polling lifecycle** (stops when connected, no more resource leaks) -### What's New +### Breaking Changes +- `connectToService()` now returns `AnswererConnection` instead of `ConnectionContext` +- `connection:opened` event signature changed for offerer side +- Direct DataChannel access replaced with connection wrapper API -✅ **Durable Connections**: Automatic reconnection on network drops -✅ **Message Queuing**: Messages sent during disconnections are queued and flushed on reconnect -✅ **Durable Channels**: RTCDataChannel wrappers that survive connection drops -✅ **TTL Auto-Refresh**: Services automatically republish before expiration -✅ **Simplified API**: Direct methods on main client instead of nested APIs +--- -### What's Removed +## Migration Steps -❌ **Low-level APIs**: `client.services.*`, `client.discovery.*`, `client.createPeer()` no longer exported -❌ **Manual Connection Management**: No need to handle WebRTC peer lifecycle manually -❌ **Service Handles**: Replaced with DurableService instances +### 1. Answerer Side (connectToService) -## Breaking Changes +#### Old API (v0.18.7 and earlier) -### 1. Service Exposure - -#### v0.8.x (Old) ```typescript -import { Rondevu } from '@xtr-dev/rondevu-client'; +const context = await rondevu.connectToService({ + serviceFqn: 'chat:1.0.0@alice', + onConnection: ({ dc, pc, peerUsername }) => { + console.log('Connected to', peerUsername) -const client = new Rondevu(); -await client.register(); + dc.addEventListener('message', (event) => { + console.log('Received:', event.data) + }) -const handle = await client.services.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - isPublic: true, - handler: (channel, peer) => { - channel.onmessage = (e) => { - console.log('Received:', e.data); - channel.send(`Echo: ${e.data}`); - }; + dc.addEventListener('open', () => { + dc.send('Hello!') + }) } -}); +}) -// Unpublish -await handle.unpublish(); +// Access peer connection +context.pc.getStats() ``` -#### v0.9.0 (New) +#### New API (v0.18.8) + ```typescript -import { Rondevu } from '@xtr-dev/rondevu-client'; - -const client = new Rondevu(); -await client.register(); - -const service = await client.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - isPublic: true, - poolSize: 10, // NEW: Handle multiple concurrent connections - handler: (channel, connectionId) => { - // NEW: DurableChannel with event emitters - channel.on('message', (data) => { - console.log('Received:', data); - channel.send(`Echo: ${data}`); - }); +const connection = await rondevu.connectToService({ + serviceFqn: 'chat:1.0.0@alice', + connectionConfig: { + reconnectEnabled: true, // Optional: enable auto-reconnect + bufferEnabled: true, // Optional: enable message buffering + connectionTimeout: 30000 // Optional: connection timeout (ms) } -}); - -// NEW: Start the service -await service.start(); - -// NEW: Stop the service -await service.stop(); -``` - -**Key Differences:** -- `client.services.exposeService()` → `client.exposeService()` -- Returns `DurableService` instead of `ServiceHandle` -- Handler receives `DurableChannel` instead of `RTCDataChannel` -- Handler receives `connectionId` string instead of `RondevuPeer` -- DurableChannel uses `.on('message', ...)` instead of `.onmessage = ...` -- Must call `service.start()` to begin accepting connections -- Use `service.stop()` instead of `handle.unpublish()` - -### 2. Connecting to Services - -#### v0.8.x (Old) -```typescript -// Connect by username + FQN -const { peer, channel } = await client.discovery.connect( - 'alice', - 'chat@1.0.0' -); - -channel.onmessage = (e) => { - console.log('Received:', e.data); -}; - -channel.onopen = () => { - channel.send('Hello!'); -}; - -peer.on('connected', () => { - console.log('Connected'); -}); - -peer.on('failed', (error) => { - console.error('Failed:', error); -}); -``` - -#### v0.9.0 (New) -```typescript -// Connect by username + FQN -const connection = await client.connect('alice', 'chat@1.0.0', { - maxReconnectAttempts: 10 // NEW: Configurable reconnection -}); - -// NEW: Create durable channel -const channel = connection.createChannel('main'); - -channel.on('message', (data) => { - console.log('Received:', data); -}); - -channel.on('open', () => { - channel.send('Hello!'); -}); - -// NEW: Connection lifecycle events -connection.on('connected', () => { - console.log('Connected'); -}); - -connection.on('reconnecting', (attempt, max, delay) => { - console.log(`Reconnecting (${attempt}/${max})...`); -}); - -connection.on('failed', (error) => { - console.error('Failed permanently:', error); -}); - -// NEW: Must explicitly connect -await connection.connect(); -``` - -**Key Differences:** -- `client.discovery.connect()` → `client.connect()` -- Returns `DurableConnection` instead of `{ peer, channel }` -- Must create channels with `connection.createChannel()` -- Must call `connection.connect()` to establish connection -- Automatic reconnection with configurable retry limits -- Messages sent during disconnection are automatically queued - -### 3. Connecting by UUID - -#### v0.8.x (Old) -```typescript -const { peer, channel } = await client.discovery.connectByUuid('service-uuid'); - -channel.onmessage = (e) => { - console.log('Received:', e.data); -}; -``` - -#### v0.9.0 (New) -```typescript -const connection = await client.connectByUuid('service-uuid', { - maxReconnectAttempts: 5 -}); - -const channel = connection.createChannel('main'); - -channel.on('message', (data) => { - console.log('Received:', data); -}); - -await connection.connect(); -``` - -**Key Differences:** -- `client.discovery.connectByUuid()` → `client.connectByUuid()` -- Returns `DurableConnection` instead of `{ peer, channel }` -- Must create channels and connect explicitly - -### 4. Multi-Connection Services (Offer Pooling) - -#### v0.8.x (Old) -```typescript -const handle = await client.services.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - poolSize: 5, - pollingInterval: 2000, - handler: (channel, peer, connectionId) => { - console.log(`Connection: ${connectionId}`); - }, - onPoolStatus: (status) => { - console.log('Pool status:', status); - } -}); - -const status = handle.getStatus(); -await handle.addOffers(3); -``` - -#### v0.9.0 (New) -```typescript -const service = await client.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - poolSize: 5, // SAME: Pool size - pollingInterval: 2000, // SAME: Polling interval - handler: (channel, connectionId) => { - console.log(`Connection: ${connectionId}`); - } -}); - -await service.start(); - -// Get active connections -const connections = service.getActiveConnections(); +}) // Listen for connection events -service.on('connection', (connectionId) => { - console.log('New connection:', connectionId); -}); +connection.on('connected', () => { + console.log('Connected!') + connection.send('Hello!') +}) + +connection.on('message', (data) => { + console.log('Received:', data) +}) + +// Optional: monitor reconnection +connection.on('reconnecting', (attempt) => { + console.log(`Reconnecting, attempt ${attempt}`) +}) + +connection.on('reconnect:success', () => { + console.log('Reconnection successful!') +}) + +// Access peer connection if needed +const pc = connection.getPeerConnection() +const dc = connection.getDataChannel() ``` -**Key Differences:** -- `onPoolStatus` callback removed (use `service.on('connection')` instead) -- `handle.getStatus()` replaced with `service.getActiveConnections()` -- `handle.addOffers()` removed (pool auto-manages offers) -- Handler receives `DurableChannel` instead of `RTCDataChannel` +**Key Changes:** +- ❌ Removed `onConnection` callback +- ✅ Use event listeners instead: `connection.on('connected', ...)` +- ❌ Removed direct `dc.send()` access +- ✅ Use `connection.send()` for automatic buffering support +- ✅ Added automatic reconnection and message buffering -## Feature Comparison +--- -| Feature | v0.8.x | v0.9.0 | -|---------|--------|--------| -| Service exposure | `client.services.exposeService()` | `client.exposeService()` | -| Connection | `client.discovery.connect()` | `client.connect()` | -| Connection by UUID | `client.discovery.connectByUuid()` | `client.connectByUuid()` | -| Channel type | `RTCDataChannel` | `DurableChannel` | -| Event handling | `.onmessage`, `.onopen`, etc. | `.on('message')`, `.on('open')`, etc. | -| Automatic reconnection | ❌ No | ✅ Yes (configurable) | -| Message queuing | ❌ No | ✅ Yes (during disconnections) | -| TTL auto-refresh | ❌ No | ✅ Yes (configurable) | -| Peer lifecycle | Manual | Automatic | -| Connection pooling | ✅ Yes | ✅ Yes (same API) | +### 2. Offerer Side (publishService) -## API Mapping - -### Removed Exports - -These are no longer exported in v0.9.0: +#### Old API (v0.18.7 and earlier) ```typescript -// ❌ Removed -import { - RondevuServices, - RondevuDiscovery, - RondevuPeer, - ServiceHandle, - PooledServiceHandle, - ConnectResult -} from '@xtr-dev/rondevu-client'; +await rondevu.publishService({ + service: 'chat:1.0.0', + maxOffers: 5 +}) + +await rondevu.startFilling() + +// Handle connections +rondevu.on('connection:opened', (offerId, dc) => { + console.log('New connection:', offerId) + + dc.addEventListener('message', (event) => { + console.log('Received:', event.data) + }) + + dc.send('Welcome!') +}) ``` -### New Exports - -These are new in v0.9.0: +#### New API (v0.18.8) ```typescript -// ✅ New -import { - DurableConnection, - DurableChannel, - DurableService, - DurableConnectionState, - DurableChannelState, - DurableConnectionConfig, - DurableChannelConfig, - DurableServiceConfig, - DurableConnectionEvents, - DurableChannelEvents, - DurableServiceEvents, - ConnectionInfo, - ServiceInfo, - QueuedMessage -} from '@xtr-dev/rondevu-client'; -``` - -### Unchanged Exports - -These work the same in both versions: - -```typescript -// ✅ Unchanged -import { - Rondevu, - RondevuAuth, - RondevuUsername, - Credentials, - UsernameClaimResult, - UsernameCheckResult -} from '@xtr-dev/rondevu-client'; -``` - -## Configuration Options - -### New Connection Options - -v0.9.0 adds extensive configuration for automatic reconnection and message queuing: - -```typescript -const connection = await client.connect('alice', 'chat@1.0.0', { - // Reconnection - maxReconnectAttempts: 10, // default: 10 - reconnectBackoffBase: 1000, // default: 1000ms - reconnectBackoffMax: 30000, // default: 30000ms (30 seconds) - reconnectJitter: 0.2, // default: 0.2 (±20%) - connectionTimeout: 30000, // default: 30000ms - - // Message queuing - maxQueueSize: 1000, // default: 1000 messages - maxMessageAge: 60000, // default: 60000ms (1 minute) - - // WebRTC - rtcConfig: { - iceServers: [...] +await rondevu.publishService({ + service: 'chat:1.0.0', + maxOffers: 5, + connectionConfig: { + reconnectEnabled: true, + bufferEnabled: true } -}); +}) + +await rondevu.startFilling() + +// Handle connections - signature changed! +rondevu.on('connection:opened', (offerId, connection) => { + console.log('New connection:', offerId) + + connection.on('message', (data) => { + console.log('Received:', data) + }) + + connection.on('disconnected', () => { + console.log('Connection lost, will auto-reconnect') + }) + + connection.send('Welcome!') +}) ``` -### New Service Options +**Key Changes:** +- ⚠️ Event signature changed: `(offerId, dc)` → `(offerId, connection)` +- ❌ Removed direct DataChannel access +- ✅ Use `connection.send()` and `connection.on('message', ...)` +- ✅ Connection object provides lifecycle events -Services can now auto-refresh TTL: +--- + +## New Connection Configuration + +All connection-related options are now configured via `connectionConfig`: ```typescript -const service = await client.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', +interface ConnectionConfig { + // Timeouts + connectionTimeout: number // Default: 30000ms (30s) + iceGatheringTimeout: number // Default: 10000ms (10s) - // TTL auto-refresh (NEW) - ttl: 300000, // default: 300000ms (5 minutes) - ttlRefreshMargin: 0.2, // default: 0.2 (refresh at 80% of TTL) + // Reconnection + reconnectEnabled: boolean // Default: true + maxReconnectAttempts: number // Default: 5 + reconnectBackoffBase: number // Default: 1000ms + reconnectBackoffMax: number // Default: 30000ms (30s) - // All connection options also apply to incoming connections - maxReconnectAttempts: 10, - maxQueueSize: 1000, - // ... -}); + // Message buffering + bufferEnabled: boolean // Default: true + maxBufferSize: number // Default: 100 messages + maxBufferAge: number // Default: 60000ms (1 min) + + // Debug + debug: boolean // Default: false +} ``` -## Migration Checklist +### Example Usage -- [ ] Replace `client.services.exposeService()` with `client.exposeService()` -- [ ] Add `await service.start()` after creating service -- [ ] Replace `handle.unpublish()` with `service.stop()` -- [ ] Replace `client.discovery.connect()` with `client.connect()` -- [ ] Replace `client.discovery.connectByUuid()` with `client.connectByUuid()` -- [ ] Create channels with `connection.createChannel()` instead of receiving them directly -- [ ] Add `await connection.connect()` to establish connection -- [ ] Update handlers from `(channel, peer, connectionId?)` to `(channel, connectionId)` -- [ ] Replace `.onmessage` with `.on('message', ...)` -- [ ] Replace `.onopen` with `.on('open', ...)` -- [ ] Replace `.onclose` with `.on('close', ...)` -- [ ] Replace `.onerror` with `.on('error', ...)` -- [ ] Add reconnection event handlers (`connection.on('reconnecting', ...)`) -- [ ] Review and configure reconnection options if needed -- [ ] Review and configure message queue limits if needed -- [ ] Update TypeScript imports to use new types -- [ ] Test automatic reconnection behavior -- [ ] Test message queuing during disconnections +```typescript +const connection = await rondevu.connectToService({ + serviceFqn: 'chat:1.0.0@alice', + connectionConfig: { + // Disable auto-reconnect if you want manual control + reconnectEnabled: false, + + // Disable buffering if messages are time-sensitive + bufferEnabled: false, + + // Increase timeout for slow networks + connectionTimeout: 60000, + + // Reduce retry attempts + maxReconnectAttempts: 3 + } +}) +``` + +--- + +## New Event System + +### Connection Lifecycle Events + +```typescript +connection.on('state:changed', ({ oldState, newState, reason }) => {}) +connection.on('connecting', () => {}) +connection.on('connected', () => {}) +connection.on('disconnected', (reason) => {}) +connection.on('failed', (error) => {}) +connection.on('closed', (reason) => {}) +``` + +### Reconnection Events + +```typescript +connection.on('reconnect:scheduled', ({ attempt, delay, maxAttempts }) => {}) +connection.on('reconnect:attempting', (attempt) => {}) +connection.on('reconnect:success', () => {}) +connection.on('reconnect:failed', (error) => {}) +connection.on('reconnect:exhausted', (attempts) => {}) +``` + +### Message Events + +```typescript +connection.on('message', (data) => {}) +connection.on('message:sent', (data, buffered) => {}) +connection.on('message:buffered', (data) => {}) +connection.on('message:replayed', (message) => {}) +connection.on('message:buffer:overflow', (discardedMessage) => {}) +``` + +### ICE Events + +```typescript +connection.on('ice:candidate:local', (candidate) => {}) +connection.on('ice:candidate:remote', (candidate) => {}) +connection.on('ice:connection:state', (state) => {}) +connection.on('ice:polling:started', () => {}) +connection.on('ice:polling:stopped', () => {}) +``` + +--- ## Common Migration Patterns -### Pattern 1: Simple Echo Service +### Pattern 1: Simple Message Handler -#### Before (v0.8.x) +**Before:** ```typescript -await client.services.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'echo@1.0.0', - handler: (channel) => { - channel.onmessage = (e) => { - channel.send(`Echo: ${e.data}`); - }; - } -}); +dc.addEventListener('message', (event) => { + console.log(event.data) +}) +dc.send('Hello') ``` -#### After (v0.9.0) +**After:** ```typescript -const service = await client.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'echo@1.0.0', - handler: (channel) => { - channel.on('message', (data) => { - channel.send(`Echo: ${data}`); - }); - } -}); - -await service.start(); +connection.on('message', (data) => { + console.log(data) +}) +connection.send('Hello') ``` -### Pattern 2: Connection with Error Handling +--- -#### Before (v0.8.x) +### Pattern 2: Connection State Monitoring + +**Before:** ```typescript -try { - const { peer, channel } = await client.discovery.connect('alice', 'chat@1.0.0'); - - channel.onopen = () => { - channel.send('Hello!'); - }; - - peer.on('failed', (error) => { - console.error('Connection failed:', error); - // Manual reconnection logic here - }); -} catch (error) { - console.error('Failed to connect:', error); +pc.oniceconnectionstatechange = () => { + console.log('ICE state:', pc.iceConnectionState) } ``` -#### After (v0.9.0) +**After:** ```typescript -const connection = await client.connect('alice', 'chat@1.0.0', { - maxReconnectAttempts: 5 -}); +connection.on('ice:connection:state', (state) => { + console.log('ICE state:', state) +}) -const channel = connection.createChannel('main'); +// Or use higher-level events +connection.on('connected', () => console.log('Connected!')) +connection.on('disconnected', () => console.log('Disconnected!')) +``` -channel.on('open', () => { - channel.send('Hello!'); -}); +--- -connection.on('reconnecting', (attempt, max, delay) => { - console.log(`Reconnecting (${attempt}/${max}) in ${delay}ms`); -}); +### Pattern 3: Handling Connection Failures -connection.on('failed', (error) => { - console.error('Connection failed permanently:', error); -}); - -try { - await connection.connect(); -} catch (error) { - console.error('Initial connection failed:', error); +**Before:** +```typescript +pc.oniceconnectionstatechange = () => { + if (pc.iceConnectionState === 'failed') { + // Manual reconnection logic + pc.close() + await setupNewConnection() + } } ``` -### Pattern 3: Multi-User Chat Server - -#### Before (v0.8.x) +**After:** ```typescript -const connections = new Map(); +// Automatic reconnection built-in! +connection.on('reconnecting', (attempt) => { + console.log(`Reconnecting... attempt ${attempt}`) +}) -await client.services.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - poolSize: 10, - handler: (channel, peer, connectionId) => { - connections.set(connectionId, channel); +connection.on('reconnect:success', () => { + console.log('Back online!') +}) - channel.onmessage = (e) => { - // Broadcast to all - for (const [id, ch] of connections) { - if (id !== connectionId) { - ch.send(e.data); - } - } - }; - - channel.onclose = () => { - connections.delete(connectionId); - }; - } -}); +connection.on('reconnect:exhausted', (attempts) => { + console.log(`Failed after ${attempts} attempts`) + // Fallback logic here +}) ``` -#### After (v0.9.0) +--- + +### Pattern 4: Accessing Raw RTCPeerConnection/DataChannel + +If you need low-level access: + ```typescript -const channels = new Map(); +const connection = await rondevu.connectToService({ ... }) -const service = await client.exposeService({ - username: 'alice', - privateKey: keypair.privateKey, - serviceFqn: 'chat@1.0.0', - poolSize: 10, - handler: (channel, connectionId) => { - channels.set(connectionId, channel); +// Get raw objects if needed +const pc = connection.getPeerConnection() +const dc = connection.getDataChannel() - channel.on('message', (data) => { - // Broadcast to all - for (const [id, ch] of channels) { - if (id !== connectionId) { - ch.send(data); - } - } - }); - - channel.on('close', () => { - channels.delete(connectionId); - }); - } -}); - -await service.start(); - -// Optional: Track connections -service.on('connection', (connectionId) => { - console.log(`User ${connectionId} joined`); -}); - -service.on('disconnection', (connectionId) => { - console.log(`User ${connectionId} left`); -}); +// Use them directly (bypasses buffering/reconnection features) +if (dc) { + dc.addEventListener('message', (event) => { + console.log(event.data) + }) +} ``` -## Benefits of Migration +**Note:** Using raw DataChannel bypasses automatic buffering and reconnection features. -1. **Reliability**: Automatic reconnection handles network hiccups transparently -2. **Simplicity**: No need to manage WebRTC peer lifecycle manually -3. **Durability**: Messages sent during disconnections are queued and delivered when connection restores -4. **Uptime**: Services automatically refresh TTL before expiration -5. **Type Safety**: Better TypeScript types with DurableChannel event emitters -6. **Debugging**: Queue size monitoring, connection state tracking, and detailed events +--- -## Getting Help +## Backward Compatibility Notes -If you encounter issues during migration: -1. Check the [README](./README.md) for complete API documentation -2. Review the examples for common patterns -3. Open an issue on [GitHub](https://github.com/xtr-dev/rondevu-client/issues) +### What Still Works +✅ `publishService()` API (just add `connectionConfig` optionally) +✅ `findService()` API (unchanged) +✅ All RondevuAPI methods (unchanged) +✅ ICE server presets (unchanged) +✅ Username and keypair management (unchanged) + +### What Changed +⚠️ `connectToService()` return type: `ConnectionContext` → `AnswererConnection` +⚠️ `connection:opened` event signature: `(offerId, dc)` → `(offerId, connection)` +⚠️ Direct DataChannel access replaced with connection wrapper + +### What's New +✨ Automatic reconnection with exponential backoff +✨ Message buffering during disconnections +✨ Rich event system (20+ events) +✨ Connection state machine +✨ ICE polling lifecycle management (no more resource leaks) + +--- + +## Troubleshooting + +### Issue: "connection.send is not a function" + +You're trying to use the old `dc.send()` API. Update to: + +```typescript +// Old +dc.send('Hello') + +// New +connection.send('Hello') +``` + +--- + +### Issue: "Cannot read property 'addEventListener' of undefined" + +You're trying to access `dc` directly. Update to event listeners: + +```typescript +// Old +dc.addEventListener('message', (event) => { + console.log(event.data) +}) + +// New +connection.on('message', (data) => { + console.log(data) +}) +``` + +--- + +### Issue: Messages not being delivered + +Check if buffering is enabled and connection is established: + +```typescript +connection.on('connected', () => { + // Only send after connected + connection.send('Hello') +}) + +// Monitor buffer +connection.on('message:buffered', (data) => { + console.log('Message buffered, will send when reconnected') +}) +``` + +--- + +## Need Help? + +- Check the updated README for full API documentation +- See examples in the `demo/` directory +- File issues at: https://github.com/xtr-dev/rondevu/issues + +--- + +## Summary Checklist + +When migrating from v0.18.7 to v0.18.8: + +- [ ] Update `connectToService()` to use returned `AnswererConnection` +- [ ] Replace `dc.addEventListener('message', ...)` with `connection.on('message', ...)` +- [ ] Replace `dc.send()` with `connection.send()` +- [ ] Update `connection:opened` event handler signature +- [ ] Consider adding reconnection event handlers +- [ ] Optionally configure `connectionConfig` for your use case +- [ ] Test connection resilience (disconnect network, should auto-reconnect) +- [ ] Remove manual reconnection logic (now built-in) diff --git a/package.json b/package.json index fdf8360..f696472 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@xtr-dev/rondevu-client", - "version": "0.18.7", + "version": "0.18.8", "description": "TypeScript client for Rondevu with durable WebRTC connections, automatic reconnection, and message queuing", "type": "module", "main": "dist/index.js", diff --git a/src/answerer-connection.ts b/src/answerer-connection.ts new file mode 100644 index 0000000..1aac8f1 --- /dev/null +++ b/src/answerer-connection.ts @@ -0,0 +1,183 @@ +/** + * Answerer-side WebRTC connection with answer creation and offer processing + */ + +import { RondevuConnection } from './connection.js' +import { ConnectionState } from './connection-events.js' +import { RondevuAPI } from './api.js' +import { ConnectionConfig } from './connection-config.js' + +export interface AnswererOptions { + api: RondevuAPI + serviceFqn: string + offerId: string + offerSdp: string + rtcConfig?: RTCConfiguration + config?: Partial +} + +/** + * Answerer connection - processes offers and creates answers + */ +export class AnswererConnection extends RondevuConnection { + private api: RondevuAPI + private serviceFqn: string + private offerId: string + private offerSdp: string + + constructor(options: AnswererOptions) { + super(options.rtcConfig, options.config) + this.api = options.api + this.serviceFqn = options.serviceFqn + this.offerId = options.offerId + this.offerSdp = options.offerSdp + } + + /** + * Initialize the connection by processing offer and creating answer + */ + async initialize(): Promise { + this.debug('Initializing answerer connection') + + // Create peer connection + this.createPeerConnection() + if (!this.pc) throw new Error('Peer connection not created') + + // Setup ondatachannel handler BEFORE setting remote description + // This is critical to avoid race conditions + this.pc.ondatachannel = (event) => { + this.debug('Received data channel') + this.dc = event.channel + this.setupDataChannelHandlers(this.dc) + } + + // Start connection timeout + this.startConnectionTimeout() + + // Set remote description (offer) + await this.pc.setRemoteDescription({ + type: 'offer', + sdp: this.offerSdp, + }) + + this.transitionTo(ConnectionState.SIGNALING, 'Offer received, creating answer') + + // Create and set local description (answer) + const answer = await this.pc.createAnswer() + await this.pc.setLocalDescription(answer) + + this.debug('Answer created, sending to server') + + // Send answer to server + await this.api.answerOffer(this.serviceFqn, this.offerId, answer.sdp!) + + this.debug('Answer sent successfully') + } + + /** + * Handle local ICE candidate generation + */ + protected onLocalIceCandidate(candidate: RTCIceCandidate): void { + this.debug('Generated local ICE candidate') + + // For answerer, we add ICE candidates to the offer + // The server will make them available for the offerer to poll + this.api + .addOfferIceCandidates(this.serviceFqn, this.offerId, [ + { + candidate: candidate.candidate, + sdpMLineIndex: candidate.sdpMLineIndex, + sdpMid: candidate.sdpMid, + }, + ]) + .catch((error) => { + this.debug('Failed to send ICE candidate:', error) + }) + } + + /** + * Poll for remote ICE candidates (from offerer) + */ + protected pollIceCandidates(): void { + this.api + .getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastIcePollTime) + .then((result) => { + if (result.candidates.length > 0) { + this.debug(`Received ${result.candidates.length} remote ICE candidates`) + + for (const iceCandidate of result.candidates) { + // Only process ICE candidates from the offerer + if (iceCandidate.role === 'offerer' && iceCandidate.candidate && this.pc) { + const candidate = iceCandidate.candidate + this.pc + .addIceCandidate(new RTCIceCandidate(candidate)) + .then(() => { + this.emit('ice:candidate:remote', new RTCIceCandidate(candidate)) + }) + .catch((error) => { + this.debug('Failed to add ICE candidate:', error) + }) + } + + // Update last poll time + if (iceCandidate.createdAt > this.lastIcePollTime) { + this.lastIcePollTime = iceCandidate.createdAt + } + } + } + }) + .catch((error) => { + this.debug('Failed to poll ICE candidates:', error) + }) + } + + /** + * Attempt to reconnect + */ + protected attemptReconnect(): void { + this.debug('Attempting to reconnect') + + // For answerer, we need to fetch a new offer and create a new answer + // Clean up old connection + if (this.pc) { + this.pc.close() + this.pc = null + } + if (this.dc) { + this.dc.close() + this.dc = null + } + + // Fetch new offer from service + this.api + .getService(this.serviceFqn) + .then((service) => { + if (!service || !service.offers || service.offers.length === 0) { + throw new Error('No offers available for reconnection') + } + + // Pick a random offer + const offer = service.offers[Math.floor(Math.random() * service.offers.length)] + this.offerId = offer.offerId + this.offerSdp = offer.sdp + + // Reinitialize with new offer + return this.initialize() + }) + .then(() => { + this.emit('reconnect:success') + }) + .catch((error) => { + this.debug('Reconnection failed:', error) + this.emit('reconnect:failed', error as Error) + this.scheduleReconnect() + }) + } + + /** + * Get the offer ID we're answering + */ + getOfferId(): string { + return this.offerId + } +} diff --git a/src/connection-config.ts b/src/connection-config.ts new file mode 100644 index 0000000..b02816c --- /dev/null +++ b/src/connection-config.ts @@ -0,0 +1,64 @@ +/** + * Connection configuration interfaces and defaults + */ + +export interface ConnectionConfig { + // Timeouts + connectionTimeout: number // Maximum time to wait for connection establishment (ms) + iceGatheringTimeout: number // Maximum time to wait for ICE gathering to complete (ms) + + // Reconnection + reconnectEnabled: boolean // Enable automatic reconnection on failures + maxReconnectAttempts: number // Maximum number of reconnection attempts (0 = infinite) + reconnectBackoffBase: number // Base delay for exponential backoff (ms) + reconnectBackoffMax: number // Maximum delay between reconnection attempts (ms) + reconnectJitter: number // Jitter factor for backoff (0-1, adds randomness to prevent thundering herd) + + // Message buffering + bufferEnabled: boolean // Enable automatic message buffering during disconnections + maxBufferSize: number // Maximum number of messages to buffer + maxBufferAge: number // Maximum age of buffered messages (ms) + preserveBufferOnClose: boolean // Keep buffer on explicit close (vs. clearing it) + + // ICE polling + icePollingInterval: number // Interval for polling remote ICE candidates (ms) + icePollingTimeout: number // Maximum time to poll for ICE candidates (ms) + + // Debug + debug: boolean // Enable debug logging +} + +export const DEFAULT_CONNECTION_CONFIG: ConnectionConfig = { + // Timeouts + connectionTimeout: 30000, // 30 seconds + iceGatheringTimeout: 10000, // 10 seconds + + // Reconnection + reconnectEnabled: true, + maxReconnectAttempts: 5, // 5 attempts before giving up + reconnectBackoffBase: 1000, // Start with 1 second + reconnectBackoffMax: 30000, // Cap at 30 seconds + reconnectJitter: 0.1, // 10% jitter + + // Message buffering + bufferEnabled: true, + maxBufferSize: 100, // 100 messages + maxBufferAge: 60000, // 1 minute + preserveBufferOnClose: false, // Clear buffer on close + + // ICE polling + icePollingInterval: 500, // Poll every 500ms + icePollingTimeout: 30000, // Stop polling after 30s + + // Debug + debug: false, +} + +export function mergeConnectionConfig( + userConfig?: Partial +): ConnectionConfig { + return { + ...DEFAULT_CONNECTION_CONFIG, + ...userConfig, + } +} diff --git a/src/connection-events.ts b/src/connection-events.ts new file mode 100644 index 0000000..ec64969 --- /dev/null +++ b/src/connection-events.ts @@ -0,0 +1,102 @@ +/** + * TypeScript event type definitions for RondevuConnection + */ + +export enum ConnectionState { + INITIALIZING = 'initializing', // Creating peer connection + GATHERING = 'gathering', // ICE gathering in progress + SIGNALING = 'signaling', // Exchanging offer/answer + CHECKING = 'checking', // ICE connectivity checks + CONNECTING = 'connecting', // ICE connection attempts + CONNECTED = 'connected', // Data channel open, working + DISCONNECTED = 'disconnected', // Temporarily disconnected + RECONNECTING = 'reconnecting', // Attempting reconnection + FAILED = 'failed', // Connection failed + CLOSED = 'closed', // Connection closed permanently +} + +export interface BufferedMessage { + id: string + data: string | ArrayBuffer | Blob + timestamp: number + attempts: number +} + +export interface ReconnectInfo { + attempt: number + delay: number + maxAttempts: number +} + +export interface StateChangeInfo { + oldState: ConnectionState + newState: ConnectionState + reason?: string +} + +/** + * Event map for RondevuConnection + * Maps event names to their payload types + */ +export interface ConnectionEventMap { + // Lifecycle events + 'state:changed': [StateChangeInfo] + 'connecting': [] + 'connected': [] + 'disconnected': [reason?: string] + 'failed': [error: Error] + 'closed': [reason?: string] + + // Reconnection events + 'reconnect:scheduled': [ReconnectInfo] + 'reconnect:attempting': [attempt: number] + 'reconnect:success': [] + 'reconnect:failed': [error: Error] + 'reconnect:exhausted': [attempts: number] + + // Message events + 'message': [data: string | ArrayBuffer | Blob] + 'message:sent': [data: string | ArrayBuffer | Blob, buffered: boolean] + 'message:buffered': [data: string | ArrayBuffer | Blob] + 'message:replayed': [message: BufferedMessage] + 'message:buffer:overflow': [discardedMessage: BufferedMessage] + 'message:buffer:expired': [message: BufferedMessage] + + // ICE events + 'ice:candidate:local': [candidate: RTCIceCandidate | null] + 'ice:candidate:remote': [candidate: RTCIceCandidate | null] + 'ice:connection:state': [state: RTCIceConnectionState] + 'ice:gathering:state': [state: RTCIceGatheringState] + 'ice:polling:started': [] + 'ice:polling:stopped': [] + + // Answer processing events (offerer only) + 'answer:processed': [offerId: string, answererId: string] + 'answer:duplicate': [offerId: string] + + // Data channel events + 'datachannel:open': [] + 'datachannel:close': [] + 'datachannel:error': [error: Event] + + // Cleanup events + 'cleanup:started': [] + 'cleanup:complete': [] + + // Connection events (mirrors RTCPeerConnection.connectionState) + 'connection:state': [state: RTCPeerConnectionState] + + // Timeout events + 'connection:timeout': [] + 'ice:gathering:timeout': [] +} + +/** + * Helper type to extract event names from the event map + */ +export type ConnectionEventName = keyof ConnectionEventMap + +/** + * Helper type to extract event arguments for a specific event + */ +export type ConnectionEventArgs = ConnectionEventMap[T] diff --git a/src/connection.ts b/src/connection.ts new file mode 100644 index 0000000..cf940f9 --- /dev/null +++ b/src/connection.ts @@ -0,0 +1,567 @@ +/** + * Base connection class with state machine, reconnection, and message buffering + */ + +import { EventEmitter } from 'eventemitter3' +import { ConnectionConfig, mergeConnectionConfig } from './connection-config.js' +import { + ConnectionState, + ConnectionEventMap, + ConnectionEventName, + ConnectionEventArgs, + BufferedMessage, +} from './connection-events.js' +import { ExponentialBackoff } from './exponential-backoff.js' +import { MessageBuffer } from './message-buffer.js' + +/** + * Abstract base class for WebRTC connections with durability features + */ +export abstract class RondevuConnection extends EventEmitter { + protected pc: RTCPeerConnection | null = null + protected dc: RTCDataChannel | null = null + protected state: ConnectionState = ConnectionState.INITIALIZING + protected config: ConnectionConfig + + // Message buffering + protected messageBuffer: MessageBuffer | null = null + + // Reconnection + protected backoff: ExponentialBackoff | null = null + protected reconnectTimeout: ReturnType | null = null + protected reconnectAttempts = 0 + + // Timeouts + protected connectionTimeout: ReturnType | null = null + protected iceGatheringTimeout: ReturnType | null = null + + // ICE polling + protected icePollingInterval: ReturnType | null = null + protected lastIcePollTime = 0 + + // Answer fingerprinting (for offerer) + protected answerProcessed = false + protected answerSdpFingerprint: string | null = null + + constructor( + protected rtcConfig?: RTCConfiguration, + userConfig?: Partial + ) { + super() + this.config = mergeConnectionConfig(userConfig) + + // Initialize message buffer if enabled + if (this.config.bufferEnabled) { + this.messageBuffer = new MessageBuffer({ + maxSize: this.config.maxBufferSize, + maxAge: this.config.maxBufferAge, + }) + } + + // Initialize backoff if reconnection enabled + if (this.config.reconnectEnabled) { + this.backoff = new ExponentialBackoff({ + base: this.config.reconnectBackoffBase, + max: this.config.reconnectBackoffMax, + jitter: this.config.reconnectJitter, + }) + } + } + + /** + * Transition to a new state and emit events + */ + protected transitionTo(newState: ConnectionState, reason?: string): void { + if (this.state === newState) return + + const oldState = this.state + this.state = newState + + this.debug(`State transition: ${oldState} → ${newState}${reason ? ` (${reason})` : ''}`) + + this.emit('state:changed', { oldState, newState, reason }) + + // Emit specific lifecycle events + switch (newState) { + case ConnectionState.CONNECTING: + this.emit('connecting') + break + case ConnectionState.CONNECTED: + this.emit('connected') + break + case ConnectionState.DISCONNECTED: + this.emit('disconnected', reason) + break + case ConnectionState.FAILED: + this.emit('failed', new Error(reason || 'Connection failed')) + break + case ConnectionState.CLOSED: + this.emit('closed', reason) + break + } + } + + /** + * Create and configure RTCPeerConnection + */ + protected createPeerConnection(): RTCPeerConnection { + this.pc = new RTCPeerConnection(this.rtcConfig) + + // Setup event handlers BEFORE any signaling + this.pc.onicecandidate = (event) => this.handleIceCandidate(event) + this.pc.oniceconnectionstatechange = () => this.handleIceConnectionStateChange() + this.pc.onconnectionstatechange = () => this.handleConnectionStateChange() + this.pc.onicegatheringstatechange = () => this.handleIceGatheringStateChange() + + return this.pc + } + + /** + * Setup data channel event handlers + */ + protected setupDataChannelHandlers(dc: RTCDataChannel): void { + dc.onopen = () => this.handleDataChannelOpen() + dc.onclose = () => this.handleDataChannelClose() + dc.onerror = (error) => this.handleDataChannelError(error) + dc.onmessage = (event) => this.handleMessage(event) + } + + /** + * Handle local ICE candidate generation + */ + protected handleIceCandidate(event: RTCPeerConnectionIceEvent): void { + this.emit('ice:candidate:local', event.candidate) + if (event.candidate) { + this.onLocalIceCandidate(event.candidate) + } + } + + /** + * Handle ICE connection state changes (primary state driver) + */ + protected handleIceConnectionStateChange(): void { + if (!this.pc) return + + const iceState = this.pc.iceConnectionState + this.emit('ice:connection:state', iceState) + this.debug(`ICE connection state: ${iceState}`) + + switch (iceState) { + case 'checking': + if (this.state === ConnectionState.SIGNALING) { + this.transitionTo(ConnectionState.CHECKING, 'ICE checking started') + } + this.startIcePolling() + break + + case 'connected': + case 'completed': + this.stopIcePolling() + // Wait for data channel to open before transitioning to CONNECTED + if (this.dc?.readyState === 'open') { + this.transitionTo(ConnectionState.CONNECTED, 'ICE connected and data channel open') + this.onConnected() + } + break + + case 'disconnected': + if (this.state === ConnectionState.CONNECTED) { + this.transitionTo(ConnectionState.DISCONNECTED, 'ICE disconnected') + this.scheduleReconnect() + } + break + + case 'failed': + this.stopIcePolling() + this.transitionTo(ConnectionState.FAILED, 'ICE connection failed') + this.scheduleReconnect() + break + + case 'closed': + this.stopIcePolling() + this.transitionTo(ConnectionState.CLOSED, 'ICE connection closed') + break + } + } + + /** + * Handle connection state changes (backup validation) + */ + protected handleConnectionStateChange(): void { + if (!this.pc) return + + const connState = this.pc.connectionState + this.emit('connection:state', connState) + this.debug(`Connection state: ${connState}`) + + // Connection state provides backup validation + if (connState === 'failed' && this.state !== ConnectionState.FAILED) { + this.transitionTo(ConnectionState.FAILED, 'PeerConnection failed') + this.scheduleReconnect() + } else if (connState === 'closed' && this.state !== ConnectionState.CLOSED) { + this.transitionTo(ConnectionState.CLOSED, 'PeerConnection closed') + } + } + + /** + * Handle ICE gathering state changes + */ + protected handleIceGatheringStateChange(): void { + if (!this.pc) return + + const gatheringState = this.pc.iceGatheringState + this.emit('ice:gathering:state', gatheringState) + this.debug(`ICE gathering state: ${gatheringState}`) + + if (gatheringState === 'gathering' && this.state === ConnectionState.INITIALIZING) { + this.transitionTo(ConnectionState.GATHERING, 'ICE gathering started') + this.startIceGatheringTimeout() + } else if (gatheringState === 'complete') { + this.clearIceGatheringTimeout() + } + } + + /** + * Handle data channel open event + */ + protected handleDataChannelOpen(): void { + this.debug('Data channel opened') + this.emit('datachannel:open') + + // Only transition to CONNECTED if ICE is also connected + if (this.pc && (this.pc.iceConnectionState === 'connected' || this.pc.iceConnectionState === 'completed')) { + this.transitionTo(ConnectionState.CONNECTED, 'Data channel opened and ICE connected') + this.onConnected() + } + } + + /** + * Handle data channel close event + */ + protected handleDataChannelClose(): void { + this.debug('Data channel closed') + this.emit('datachannel:close') + + if (this.state === ConnectionState.CONNECTED) { + this.transitionTo(ConnectionState.DISCONNECTED, 'Data channel closed') + this.scheduleReconnect() + } + } + + /** + * Handle data channel error event + */ + protected handleDataChannelError(error: Event): void { + this.debug('Data channel error:', error) + this.emit('datachannel:error', error) + } + + /** + * Handle incoming message + */ + protected handleMessage(event: MessageEvent): void { + this.emit('message', event.data) + } + + /** + * Called when connection is successfully established + */ + protected onConnected(): void { + this.clearConnectionTimeout() + this.reconnectAttempts = 0 + this.backoff?.reset() + + // Replay buffered messages + if (this.messageBuffer && !this.messageBuffer.isEmpty()) { + const messages = this.messageBuffer.getValid() + this.debug(`Replaying ${messages.length} buffered messages`) + + for (const message of messages) { + try { + this.sendDirect(message.data) + this.emit('message:replayed', message) + this.messageBuffer.remove(message.id) + } catch (error) { + this.debug('Failed to replay message:', error) + } + } + + // Remove expired messages + const expired = this.messageBuffer.getExpired() + for (const msg of expired) { + this.emit('message:buffer:expired', msg) + } + } + } + + /** + * Start ICE candidate polling + */ + protected startIcePolling(): void { + if (this.icePollingInterval) return + + this.debug('Starting ICE polling') + this.emit('ice:polling:started') + + this.lastIcePollTime = Date.now() + + this.icePollingInterval = setInterval(() => { + const elapsed = Date.now() - this.lastIcePollTime + if (elapsed > this.config.icePollingTimeout) { + this.debug('ICE polling timeout') + this.stopIcePolling() + return + } + + this.pollIceCandidates() + }, this.config.icePollingInterval) + } + + /** + * Stop ICE candidate polling + */ + protected stopIcePolling(): void { + if (!this.icePollingInterval) return + + this.debug('Stopping ICE polling') + clearInterval(this.icePollingInterval) + this.icePollingInterval = null + this.emit('ice:polling:stopped') + } + + /** + * Start connection timeout + */ + protected startConnectionTimeout(): void { + this.clearConnectionTimeout() + + this.connectionTimeout = setTimeout(() => { + if (this.state !== ConnectionState.CONNECTED) { + this.debug('Connection timeout') + this.emit('connection:timeout') + this.transitionTo(ConnectionState.FAILED, 'Connection timeout') + this.scheduleReconnect() + } + }, this.config.connectionTimeout) + } + + /** + * Clear connection timeout + */ + protected clearConnectionTimeout(): void { + if (this.connectionTimeout) { + clearTimeout(this.connectionTimeout) + this.connectionTimeout = null + } + } + + /** + * Start ICE gathering timeout + */ + protected startIceGatheringTimeout(): void { + this.clearIceGatheringTimeout() + + this.iceGatheringTimeout = setTimeout(() => { + if (this.pc && this.pc.iceGatheringState !== 'complete') { + this.debug('ICE gathering timeout') + this.emit('ice:gathering:timeout') + } + }, this.config.iceGatheringTimeout) + } + + /** + * Clear ICE gathering timeout + */ + protected clearIceGatheringTimeout(): void { + if (this.iceGatheringTimeout) { + clearTimeout(this.iceGatheringTimeout) + this.iceGatheringTimeout = null + } + } + + /** + * Schedule reconnection attempt + */ + protected scheduleReconnect(): void { + if (!this.config.reconnectEnabled || !this.backoff) return + + // Check if we've exceeded max attempts + if (this.config.maxReconnectAttempts > 0 && this.reconnectAttempts >= this.config.maxReconnectAttempts) { + this.debug('Max reconnection attempts reached') + this.emit('reconnect:exhausted', this.reconnectAttempts) + return + } + + const delay = this.backoff.next() + this.reconnectAttempts++ + + this.debug(`Scheduling reconnection attempt ${this.reconnectAttempts} in ${delay}ms`) + + this.emit('reconnect:scheduled', { + attempt: this.reconnectAttempts, + delay, + maxAttempts: this.config.maxReconnectAttempts, + }) + + this.transitionTo(ConnectionState.RECONNECTING, `Attempt ${this.reconnectAttempts}`) + + this.reconnectTimeout = setTimeout(() => { + this.emit('reconnect:attempting', this.reconnectAttempts) + this.attemptReconnect() + }, delay) + } + + /** + * Cancel scheduled reconnection + */ + protected cancelReconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = null + } + } + + /** + * Send a message directly (bypasses buffer) + */ + protected sendDirect(data: string | ArrayBuffer | Blob): void { + if (!this.dc || this.dc.readyState !== 'open') { + throw new Error('Data channel is not open') + } + + // Handle different data types explicitly + this.dc.send(data as any) + } + + /** + * Send a message with automatic buffering + */ + send(data: string | ArrayBuffer | Blob): void { + if (this.state === ConnectionState.CONNECTED && this.dc?.readyState === 'open') { + // Send directly + try { + this.sendDirect(data) + this.emit('message:sent', data, false) + } catch (error) { + this.debug('Failed to send message:', error) + this.bufferMessage(data) + } + } else { + // Buffer for later + this.bufferMessage(data) + } + } + + /** + * Buffer a message for later delivery + */ + protected bufferMessage(data: string | ArrayBuffer | Blob): void { + if (!this.messageBuffer) { + this.debug('Message buffering disabled, message dropped') + return + } + + if (this.messageBuffer.isFull()) { + const oldest = this.messageBuffer.getAll()[0] + this.emit('message:buffer:overflow', oldest) + } + + const message = this.messageBuffer.add(data) + this.emit('message:buffered', data) + this.emit('message:sent', data, true) + this.debug(`Message buffered (${this.messageBuffer.size()}/${this.config.maxBufferSize})`) + } + + /** + * Get current connection state + */ + getState(): ConnectionState { + return this.state + } + + /** + * Get the data channel + */ + getDataChannel(): RTCDataChannel | null { + return this.dc + } + + /** + * Get the peer connection + */ + getPeerConnection(): RTCPeerConnection | null { + return this.pc + } + + /** + * Close the connection + */ + close(): void { + this.debug('Closing connection') + this.transitionTo(ConnectionState.CLOSED, 'User requested close') + this.cleanup() + } + + /** + * Complete cleanup of all resources + */ + protected cleanup(): void { + this.debug('Cleaning up connection') + this.emit('cleanup:started') + + // Clear all timeouts + this.clearConnectionTimeout() + this.clearIceGatheringTimeout() + this.cancelReconnect() + + // Stop ICE polling + this.stopIcePolling() + + // Close data channel + if (this.dc) { + this.dc.onopen = null + this.dc.onclose = null + this.dc.onerror = null + this.dc.onmessage = null + + if (this.dc.readyState !== 'closed') { + this.dc.close() + } + this.dc = null + } + + // Close peer connection + if (this.pc) { + this.pc.onicecandidate = null + this.pc.oniceconnectionstatechange = null + this.pc.onconnectionstatechange = null + this.pc.onicegatheringstatechange = null + + if (this.pc.connectionState !== 'closed') { + this.pc.close() + } + this.pc = null + } + + // Clear message buffer if not preserving + if (this.messageBuffer && !this.config.preserveBufferOnClose) { + this.messageBuffer.clear() + } + + this.emit('cleanup:complete') + } + + /** + * Debug logging helper + */ + protected debug(...args: any[]): void { + if (this.config.debug) { + console.log('[RondevuConnection]', ...args) + } + } + + // Abstract methods to be implemented by subclasses + protected abstract onLocalIceCandidate(candidate: RTCIceCandidate): void + protected abstract pollIceCandidates(): void + protected abstract attemptReconnect(): void +} diff --git a/src/exponential-backoff.ts b/src/exponential-backoff.ts new file mode 100644 index 0000000..8e71053 --- /dev/null +++ b/src/exponential-backoff.ts @@ -0,0 +1,59 @@ +/** + * Exponential backoff utility for connection reconnection + */ + +export interface BackoffConfig { + base: number // Base delay in milliseconds + max: number // Maximum delay in milliseconds + jitter: number // Jitter factor (0-1) to add randomness +} + +export class ExponentialBackoff { + private attempt: number = 0 + + constructor(private config: BackoffConfig) { + if (config.jitter < 0 || config.jitter > 1) { + throw new Error('Jitter must be between 0 and 1') + } + } + + /** + * Calculate the next delay based on the current attempt number + * Formula: min(base * 2^attempt, max) with jitter + */ + next(): number { + const exponentialDelay = this.config.base * Math.pow(2, this.attempt) + const cappedDelay = Math.min(exponentialDelay, this.config.max) + + // Add jitter: delay ± (jitter * delay) + const jitterAmount = cappedDelay * this.config.jitter + const jitter = (Math.random() * 2 - 1) * jitterAmount // Random value between -jitterAmount and +jitterAmount + const finalDelay = Math.max(0, cappedDelay + jitter) + + this.attempt++ + return Math.round(finalDelay) + } + + /** + * Get the current attempt number + */ + getAttempt(): number { + return this.attempt + } + + /** + * Reset the backoff state + */ + reset(): void { + this.attempt = 0 + } + + /** + * Peek at what the next delay would be without incrementing + */ + peek(): number { + const exponentialDelay = this.config.base * Math.pow(2, this.attempt) + const cappedDelay = Math.min(exponentialDelay, this.config.max) + return cappedDelay + } +} diff --git a/src/index.ts b/src/index.ts index cbdce88..cefc1ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,15 @@ export { Rondevu, RondevuError, NetworkError, ValidationError, ConnectionError } export { RondevuAPI } from './api.js' export { RpcBatcher } from './rpc-batcher.js' +// Export connection classes +export { RondevuConnection } from './connection.js' +export { OffererConnection } from './offerer-connection.js' +export { AnswererConnection } from './answerer-connection.js' + +// Export utilities +export { ExponentialBackoff } from './exponential-backoff.js' +export { MessageBuffer } from './message-buffer.js' + // Export crypto adapters export { WebCryptoAdapter } from './web-crypto-adapter.js' export { NodeCryptoAdapter } from './node-crypto-adapter.js' @@ -41,3 +50,26 @@ export type { export type { CryptoAdapter } from './crypto-adapter.js' +// Export connection types +export type { + ConnectionConfig, +} from './connection-config.js' + +export type { + ConnectionState, + BufferedMessage, + ReconnectInfo, + StateChangeInfo, + ConnectionEventMap, + ConnectionEventName, + ConnectionEventArgs, +} from './connection-events.js' + +export type { + OffererOptions, +} from './offerer-connection.js' + +export type { + AnswererOptions, +} from './answerer-connection.js' + diff --git a/src/message-buffer.ts b/src/message-buffer.ts new file mode 100644 index 0000000..bb8bfd5 --- /dev/null +++ b/src/message-buffer.ts @@ -0,0 +1,125 @@ +/** + * Message buffering system for storing messages during disconnections + */ + +import { BufferedMessage } from './connection-events.js' + +export interface MessageBufferConfig { + maxSize: number // Maximum number of messages to buffer + maxAge: number // Maximum age of messages in milliseconds +} + +export class MessageBuffer { + private buffer: BufferedMessage[] = [] + private messageIdCounter = 0 + + constructor(private config: MessageBufferConfig) {} + + /** + * Add a message to the buffer + * Returns the buffered message with metadata + */ + add(data: string | ArrayBuffer | Blob): BufferedMessage { + const message: BufferedMessage = { + id: `msg_${Date.now()}_${this.messageIdCounter++}`, + data, + timestamp: Date.now(), + attempts: 0, + } + + // Check if buffer is full + if (this.buffer.length >= this.config.maxSize) { + // Remove oldest message + const discarded = this.buffer.shift() + if (discarded) { + return message // Signal overflow by returning the new message + } + } + + this.buffer.push(message) + return message + } + + /** + * Get all messages in the buffer + */ + getAll(): BufferedMessage[] { + return [...this.buffer] + } + + /** + * Get messages that haven't exceeded max age + */ + getValid(): BufferedMessage[] { + const now = Date.now() + return this.buffer.filter((msg) => now - msg.timestamp < this.config.maxAge) + } + + /** + * Get and remove expired messages + */ + getExpired(): BufferedMessage[] { + const now = Date.now() + const expired: BufferedMessage[] = [] + this.buffer = this.buffer.filter((msg) => { + if (now - msg.timestamp >= this.config.maxAge) { + expired.push(msg) + return false + } + return true + }) + return expired + } + + /** + * Remove a specific message by ID + */ + remove(messageId: string): BufferedMessage | null { + const index = this.buffer.findIndex((msg) => msg.id === messageId) + if (index === -1) return null + + const [removed] = this.buffer.splice(index, 1) + return removed + } + + /** + * Clear all messages from the buffer + */ + clear(): BufferedMessage[] { + const cleared = [...this.buffer] + this.buffer = [] + return cleared + } + + /** + * Increment attempt count for a message + */ + incrementAttempt(messageId: string): boolean { + const message = this.buffer.find((msg) => msg.id === messageId) + if (!message) return false + + message.attempts++ + return true + } + + /** + * Get the current size of the buffer + */ + size(): number { + return this.buffer.length + } + + /** + * Check if buffer is empty + */ + isEmpty(): boolean { + return this.buffer.length === 0 + } + + /** + * Check if buffer is full + */ + isFull(): boolean { + return this.buffer.length >= this.config.maxSize + } +} diff --git a/src/offerer-connection.ts b/src/offerer-connection.ts new file mode 100644 index 0000000..889cc86 --- /dev/null +++ b/src/offerer-connection.ts @@ -0,0 +1,213 @@ +/** + * Offerer-side WebRTC connection with offer creation and answer processing + */ + +import { RondevuConnection } from './connection.js' +import { ConnectionState } from './connection-events.js' +import { RondevuAPI } from './api.js' +import { ConnectionConfig } from './connection-config.js' + +export interface OffererOptions { + api: RondevuAPI + serviceFqn: string + offerId: string + pc: RTCPeerConnection // Accept already-created peer connection + dc?: RTCDataChannel // Accept already-created data channel (optional) + config?: Partial +} + +/** + * Offerer connection - manages already-created offers and waits for answers + */ +export class OffererConnection extends RondevuConnection { + private api: RondevuAPI + private serviceFqn: string + private offerId: string + + constructor(options: OffererOptions) { + super(undefined, options.config) // rtcConfig not needed, PC already created + this.api = options.api + this.serviceFqn = options.serviceFqn + this.offerId = options.offerId + + // Use the already-created peer connection and data channel + this.pc = options.pc + this.dc = options.dc || null + } + + /** + * Initialize the connection - setup handlers for already-created offer + */ + async initialize(): Promise { + this.debug('Initializing offerer connection') + + if (!this.pc) throw new Error('Peer connection not provided') + + // Setup peer connection event handlers + this.pc.onicecandidate = (event) => this.handleIceCandidate(event) + this.pc.oniceconnectionstatechange = () => this.handleIceConnectionStateChange() + this.pc.onconnectionstatechange = () => this.handleConnectionStateChange() + this.pc.onicegatheringstatechange = () => this.handleIceGatheringStateChange() + + // Setup data channel handlers if we have one + if (this.dc) { + this.setupDataChannelHandlers(this.dc) + } + + // Start connection timeout + this.startConnectionTimeout() + + // Transition to signaling state (offer already created and published) + this.transitionTo(ConnectionState.SIGNALING, 'Offer published, waiting for answer') + } + + /** + * Process an answer from the answerer + */ + async processAnswer(sdp: string, answererId: string): Promise { + if (!this.pc) { + this.debug('Cannot process answer: peer connection not initialized') + return + } + + // Generate SDP fingerprint for deduplication + const fingerprint = await this.hashSdp(sdp) + + // Check for duplicate answer + if (this.answerProcessed) { + if (this.answerSdpFingerprint === fingerprint) { + this.debug('Duplicate answer detected (same fingerprint), skipping') + this.emit('answer:duplicate', this.offerId) + return + } else { + throw new Error('Received different answer after already processing one (protocol violation)') + } + } + + // Validate state + if (this.state !== ConnectionState.SIGNALING && this.state !== ConnectionState.CHECKING) { + this.debug(`Cannot process answer in state ${this.state}`) + return + } + + // Mark as processed BEFORE setRemoteDescription to prevent race conditions + this.answerProcessed = true + this.answerSdpFingerprint = fingerprint + + try { + await this.pc.setRemoteDescription({ + type: 'answer', + sdp, + }) + + this.debug(`Answer processed successfully from ${answererId}`) + this.emit('answer:processed', this.offerId, answererId) + } catch (error) { + // Reset flags on error so we can try again + this.answerProcessed = false + this.answerSdpFingerprint = null + this.debug('Failed to set remote description:', error) + throw error + } + } + + /** + * Generate a hash fingerprint of SDP for deduplication + */ + private async hashSdp(sdp: string): Promise { + // Simple hash using built-in crypto if available + if (typeof crypto !== 'undefined' && crypto.subtle) { + const encoder = new TextEncoder() + const data = encoder.encode(sdp) + const hashBuffer = await crypto.subtle.digest('SHA-256', data) + const hashArray = Array.from(new Uint8Array(hashBuffer)) + return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('') + } else { + // Fallback: use simple string hash + let hash = 0 + for (let i = 0; i < sdp.length; i++) { + const char = sdp.charCodeAt(i) + hash = (hash << 5) - hash + char + hash = hash & hash + } + return hash.toString(16) + } + } + + /** + * Handle local ICE candidate generation + */ + protected onLocalIceCandidate(candidate: RTCIceCandidate): void { + this.debug('Generated local ICE candidate') + + // Send ICE candidate to server + this.api + .addOfferIceCandidates(this.serviceFqn, this.offerId, [ + { + candidate: candidate.candidate, + sdpMLineIndex: candidate.sdpMLineIndex, + sdpMid: candidate.sdpMid, + }, + ]) + .catch((error) => { + this.debug('Failed to send ICE candidate:', error) + }) + } + + /** + * Poll for remote ICE candidates + */ + protected pollIceCandidates(): void { + this.api + .getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastIcePollTime) + .then((result) => { + if (result.candidates.length > 0) { + this.debug(`Received ${result.candidates.length} remote ICE candidates`) + + for (const iceCandidate of result.candidates) { + if (iceCandidate.candidate && this.pc) { + const candidate = iceCandidate.candidate + this.pc + .addIceCandidate(new RTCIceCandidate(candidate)) + .then(() => { + this.emit('ice:candidate:remote', new RTCIceCandidate(candidate)) + }) + .catch((error) => { + this.debug('Failed to add ICE candidate:', error) + }) + } + + // Update last poll time + if (iceCandidate.createdAt > this.lastIcePollTime) { + this.lastIcePollTime = iceCandidate.createdAt + } + } + } + }) + .catch((error) => { + this.debug('Failed to poll ICE candidates:', error) + }) + } + + /** + * Attempt to reconnect + * + * Note: For offerer connections, reconnection is handled by the Rondevu instance + * creating a new offer via fillOffers(). This method is a no-op. + */ + protected attemptReconnect(): void { + this.debug('Reconnection not applicable for offerer - new offer will be created by Rondevu instance') + + // Offerer reconnection is handled externally by Rondevu.fillOffers() + // which creates entirely new offers. We don't reconnect the same offer. + // Just emit failure and let the parent handle it. + this.emit('reconnect:failed', new Error('Offerer reconnection handled by parent')) + } + + /** + * Get the offer ID + */ + getOfferId(): string { + return this.offerId + } +} diff --git a/src/rondevu.ts b/src/rondevu.ts index ba0b6b2..46cea28 100644 --- a/src/rondevu.ts +++ b/src/rondevu.ts @@ -1,6 +1,9 @@ import { RondevuAPI, Keypair, IceCandidate, BatcherOptions } from './api.js' import { CryptoAdapter } from './crypto-adapter.js' import { EventEmitter } from 'eventemitter3' +import { OffererConnection } from './offerer-connection.js' +import { AnswererConnection } from './answerer-connection.js' +import { ConnectionConfig } from './connection-config.js' // ICE server preset names export type IceServerPreset = 'ipv4-turn' | 'hostname-turns' | 'google-stun' | 'relay-only' @@ -83,6 +86,7 @@ export interface PublishServiceOptions { maxOffers: number // Maximum number of concurrent offers to maintain offerFactory?: OfferFactory // Optional: custom offer creation (defaults to simple data channel) ttl?: number // Time-to-live for offers in milliseconds (default: 300000) + connectionConfig?: Partial // Optional: connection durability configuration } export interface ConnectionContext { @@ -97,8 +101,8 @@ export interface ConnectToServiceOptions { serviceFqn?: string // Full FQN like 'chat:2.0.0@alice' service?: string // Service without username (for discovery) username?: string // Target username (combined with service) - onConnection?: (context: ConnectionContext) => void | Promise // Called when data channel opens rtcConfig?: RTCConfiguration // Optional: override default ICE servers + connectionConfig?: Partial // Optional: connection durability configuration } export interface ActiveOffer { @@ -178,14 +182,13 @@ export class ConnectionError extends RondevuError { } /** - * Rondevu - Complete WebRTC signaling client + * Rondevu - Complete WebRTC signaling client with durable connections * - * Provides a unified API for: - * - Implicit username claiming (auto-claimed on first authenticated request) - * - Service publishing with automatic signature generation - * - Service discovery (direct, random, paginated) - * - WebRTC signaling (offer/answer exchange, ICE relay) - * - Keypair management + * v1.0.0 introduces breaking changes: + * - connectToService() now returns AnswererConnection instead of ConnectionContext + * - Automatic reconnection and message buffering built-in + * - Connection objects expose .send() method instead of raw DataChannel + * - Rich event system for connection lifecycle (connected, disconnected, reconnecting, etc.) * * @example * ```typescript @@ -196,39 +199,39 @@ export class ConnectionError extends RondevuError { * iceServers: 'ipv4-turn' // Use preset: 'ipv4-turn', 'hostname-turns', 'google-stun', or 'relay-only' * }) * - * // Or use custom ICE servers - * const rondevu2 = await Rondevu.connect({ - * apiUrl: 'https://signal.example.com', - * username: 'bob', - * iceServers: [ - * { urls: 'stun:stun.l.google.com:19302' }, - * { urls: 'turn:turn.example.com:3478', username: 'user', credential: 'pass' } - * ] - * }) - * * // Publish a service with automatic offer management * await rondevu.publishService({ * service: 'chat:2.0.0', - * maxOffers: 5, // Maintain up to 5 concurrent offers - * offerFactory: async (pc) => { - * // pc is created by Rondevu with ICE handlers already attached - * const dc = pc.createDataChannel('chat') - * const offer = await pc.createOffer() - * await pc.setLocalDescription(offer) - * return { dc, offer } - * } + * maxOffers: 5 // Maintain up to 5 concurrent offers * }) * * // Start accepting connections (auto-fills offers and polls) * await rondevu.startFilling() * - * // Access active connections - * for (const offer of rondevu.getActiveOffers()) { - * offer.dc?.addEventListener('message', (e) => console.log(e.data)) - * } + * // Listen for connections (v1.0.0 API) + * rondevu.on('connection:opened', (offerId, connection) => { + * connection.on('connected', () => console.log('Connected!')) + * connection.on('message', (data) => console.log('Received:', data)) + * connection.send('Hello!') + * }) * - * // Stop when done - * rondevu.stopFilling() + * // Connect to a service (v1.0.0 - returns AnswererConnection) + * const connection = await rondevu.connectToService({ + * serviceFqn: 'chat:2.0.0@bob' + * }) + * + * connection.on('connected', () => { + * console.log('Connected!') + * connection.send('Hello!') + * }) + * + * connection.on('message', (data) => { + * console.log('Received:', data) + * }) + * + * connection.on('reconnecting', (attempt) => { + * console.log(`Reconnecting, attempt ${attempt}`) + * }) * ``` */ export class Rondevu extends EventEmitter { @@ -253,13 +256,14 @@ export class Rondevu extends EventEmitter { private maxOffers = 0 private offerFactory: OfferFactory | null = null private ttl = Rondevu.DEFAULT_TTL_MS - private activeOffers = new Map() + private activeConnections = new Map() + private connectionConfig?: Partial // Polling private filling = false + private fillingSemaphore = false // Semaphore to prevent concurrent fillOffers calls private pollingInterval: ReturnType | null = null private lastPollTimestamp = 0 - private isPolling = false // Guard against concurrent poll execution private constructor( apiUrl: string, @@ -433,62 +437,30 @@ export class Rondevu extends EventEmitter { * ```typescript * await rondevu.publishService({ * service: 'chat:2.0.0', - * maxOffers: 5 + * maxOffers: 5, + * connectionConfig: { + * reconnectEnabled: true, + * bufferEnabled: true + * } * }) * await rondevu.startFilling() * ``` */ async publishService(options: PublishServiceOptions): Promise { - const { service, maxOffers, offerFactory, ttl } = options + const { service, maxOffers, offerFactory, ttl, connectionConfig } = options this.currentService = service this.maxOffers = maxOffers this.offerFactory = offerFactory || this.defaultOfferFactory.bind(this) this.ttl = ttl || Rondevu.DEFAULT_TTL_MS + this.connectionConfig = connectionConfig this.debug(`Publishing service: ${service} with maxOffers: ${maxOffers}`) this.usernameClaimed = true } /** - * Set up ICE candidate handler to send candidates to the server - * - * Note: This is used by connectToService() where the offerId is already known. - * For createOffer(), we use inline ICE handling with early candidate queuing - * since the offerId isn't available until after the factory completes. - */ - private setupIceCandidateHandler( - pc: RTCPeerConnection, - serviceFqn: string, - offerId: string - ): void { - pc.onicecandidate = async (event) => { - if (event.candidate) { - try { - // Handle both browser and Node.js (wrtc) environments - // Browser: candidate.toJSON() exists - // Node.js wrtc: candidate is already a plain object - const candidateData = typeof event.candidate.toJSON === 'function' - ? event.candidate.toJSON() - : event.candidate - - // Emit local ICE candidate event - this.emit('ice:candidate:local', offerId, candidateData) - - await this.api.addOfferIceCandidates( - serviceFqn, - offerId, - [candidateData] - ) - } catch (err) { - console.error('[Rondevu] Failed to send ICE candidate:', err) - } - } - } - } - - /** - * Create a single offer and publish it to the server + * Create a single offer and publish it to the server using OffererConnection */ private async createOffer(): Promise { if (!this.currentService || !this.offerFactory) { @@ -504,45 +476,10 @@ export class Rondevu extends EventEmitter { this.debug('Creating new offer...') - // 1. Create the RTCPeerConnection - Rondevu controls this to set up handlers early + // 1. Create RTCPeerConnection using factory (for now, keep compatibility) const pc = new RTCPeerConnection(rtcConfig) - // 2. Set up ICE candidate handler with queuing BEFORE the factory runs - // This ensures we capture all candidates, even those generated immediately - // when setLocalDescription() is called in the factory - const earlyIceCandidates: RTCIceCandidateInit[] = [] - let offerId: string | undefined - - pc.onicecandidate = async (event) => { - if (event.candidate) { - // Handle both browser and Node.js (wrtc) environments - const candidateData = typeof event.candidate.toJSON === 'function' - ? event.candidate.toJSON() - : event.candidate - - // Emit local ICE candidate event - if (offerId) { - this.emit('ice:candidate:local', offerId, candidateData) - } - - if (offerId) { - // We have the offerId, send directly - try { - await this.api.addOfferIceCandidates(serviceFqn, offerId, [candidateData]) - } catch (err) { - console.error('[Rondevu] Failed to send ICE candidate:', err) - } - } else { - // Queue for later - we don't have the offerId yet - this.debug('Queuing early ICE candidate') - earlyIceCandidates.push(candidateData) - } - } - } - - // 3. Call the factory with the pc - factory creates data channel and offer - // When factory calls setLocalDescription(), ICE gathering starts and - // candidates are captured by the handler we set up above + // 2. Call the factory to create offer let dc: RTCDataChannel | undefined let offer: RTCSessionDescriptionInit try { @@ -550,12 +487,11 @@ export class Rondevu extends EventEmitter { dc = factoryResult.dc offer = factoryResult.offer } catch (err) { - // Clean up the connection if factory fails pc.close() throw err } - // 4. Publish to server to get offerId + // 3. Publish to server to get offerId const result = await this.api.publishService({ serviceFqn, offers: [{ sdp: offer.sdp! }], @@ -564,68 +500,77 @@ export class Rondevu extends EventEmitter { message: '', }) - offerId = result.offers[0].offerId + const offerId = result.offers[0].offerId - // 5. Store active offer - this.activeOffers.set(offerId, { - offerId, + // 4. Create OffererConnection instance with already-created PC and DC + const connection = new OffererConnection({ + api: this.api, serviceFqn, - pc, - dc, - answered: false, - createdAt: Date.now() + offerId, + pc, // Pass the peer connection from factory + dc, // Pass the data channel from factory + config: { + ...this.connectionConfig, + debug: this.debugEnabled, + }, }) + // Setup connection event handlers + connection.on('connected', () => { + this.debug(`Connection established for offer ${offerId}`) + this.emit('connection:opened', offerId, connection) + }) + + connection.on('failed', (error) => { + this.debug(`Connection failed for offer ${offerId}:`, error) + this.activeConnections.delete(offerId) + this.fillOffers() // Replace failed offer + }) + + connection.on('closed', () => { + this.debug(`Connection closed for offer ${offerId}`) + this.activeConnections.delete(offerId) + this.fillOffers() // Replace closed offer + }) + + // Store active connection + this.activeConnections.set(offerId, connection) + + // Initialize the connection + await connection.initialize() + this.debug(`Offer created: ${offerId}`) this.emit('offer:created', offerId, serviceFqn) - - // Set up data channel open handler (offerer side) - if (dc) { - dc.onopen = () => { - this.debug(`Data channel opened for offer ${offerId}`) - this.emit('connection:opened', offerId, dc) - } - } - - // 6. Send any queued early ICE candidates - if (earlyIceCandidates.length > 0) { - this.debug(`Sending ${earlyIceCandidates.length} early ICE candidates`) - try { - await this.api.addOfferIceCandidates(serviceFqn, offerId, earlyIceCandidates) - } catch (err) { - console.error('[Rondevu] Failed to send early ICE candidates:', err) - } - } - - // 7. Monitor connection state - pc.onconnectionstatechange = () => { - this.debug(`Offer ${offerId} connection state: ${pc.connectionState}`) - - if (pc.connectionState === 'failed' || pc.connectionState === 'closed') { - this.emit('connection:closed', offerId!) - this.activeOffers.delete(offerId!) - this.fillOffers() // Try to replace failed offer - } - } } /** - * Fill offers to reach maxOffers count + * Fill offers to reach maxOffers count with semaphore protection */ private async fillOffers(): Promise { if (!this.filling || !this.currentService) return - const currentCount = this.activeOffers.size - const needed = this.maxOffers - currentCount + // Semaphore to prevent concurrent fills + if (this.fillingSemaphore) { + this.debug('fillOffers already in progress, skipping') + return + } - this.debug(`Filling offers: current=${currentCount}, needed=${needed}`) + this.fillingSemaphore = true + try { + const currentCount = this.activeConnections.size + const needed = this.maxOffers - currentCount - for (let i = 0; i < needed; i++) { - try { - await this.createOffer() - } catch (err) { - console.error('[Rondevu] Failed to create offer:', err) + this.debug(`Filling offers: current=${currentCount}, needed=${needed}`) + + for (let i = 0; i < needed; i++) { + try { + await this.createOffer() + } catch (err) { + console.error('[Rondevu] Failed to create offer:', err) + } } + } finally { + this.fillingSemaphore = false } } @@ -635,55 +580,26 @@ export class Rondevu extends EventEmitter { private async pollInternal(): Promise { if (!this.filling) return - // Prevent concurrent poll execution to avoid duplicate answer processing - if (this.isPolling) { - this.debug('Poll already in progress, skipping') - return - } - - this.isPolling = true try { const result = await this.api.poll(this.lastPollTimestamp) - // Process answers + // Process answers - delegate to OffererConnections for (const answer of result.answers) { - const activeOffer = this.activeOffers.get(answer.offerId) - if (activeOffer && !activeOffer.answered) { - this.debug(`Received answer for offer ${answer.offerId}`) + const connection = this.activeConnections.get(answer.offerId) + if (connection) { + try { + await connection.processAnswer(answer.sdp, answer.answererId) + this.lastPollTimestamp = Math.max(this.lastPollTimestamp, answer.answeredAt) - await activeOffer.pc.setRemoteDescription({ - type: 'answer', - sdp: answer.sdp - }) - - activeOffer.answered = true - this.lastPollTimestamp = answer.answeredAt - this.emit('offer:answered', answer.offerId, answer.answererId) - - // Create replacement offer - this.fillOffers() - } - } - - // Process ICE candidates - for (const [offerId, candidates] of Object.entries(result.iceCandidates)) { - const activeOffer = this.activeOffers.get(offerId) - if (activeOffer) { - const answererCandidates = candidates.filter(c => c.role === 'answerer') - - for (const item of answererCandidates) { - if (item.candidate) { - this.emit('ice:candidate:remote', offerId, item.candidate, item.role) - await activeOffer.pc.addIceCandidate(new RTCIceCandidate(item.candidate)) - this.lastPollTimestamp = Math.max(this.lastPollTimestamp, item.createdAt) - } + // Create replacement offer + this.fillOffers() + } catch (err) { + this.debug(`Failed to process answer for offer ${answer.offerId}:`, err) } } } } catch (err) { console.error('[Rondevu] Polling error:', err) - } finally { - this.isPolling = false } } @@ -720,7 +636,7 @@ export class Rondevu extends EventEmitter { stopFilling(): void { this.debug('Stopping offer filling and polling') this.filling = false - this.isPolling = false // Reset polling guard + this.fillingSemaphore = false // Stop polling if (this.pollingInterval) { @@ -729,13 +645,12 @@ export class Rondevu extends EventEmitter { } // Close all active connections - for (const [offerId, offer] of this.activeOffers.entries()) { - this.debug(`Closing offer ${offerId}`) - offer.dc?.close() - offer.pc.close() + for (const [offerId, connection] of this.activeConnections.entries()) { + this.debug(`Closing connection ${offerId}`) + connection.close() } - this.activeOffers.clear() + this.activeConnections.clear() } /** @@ -743,17 +658,17 @@ export class Rondevu extends EventEmitter { * @returns Number of active offers */ getOfferCount(): number { - return this.activeOffers.size + return this.activeConnections.size } /** * Check if an offer is currently connected * @param offerId - The offer ID to check - * @returns True if the offer exists and has been answered + * @returns True if the offer exists and is connected */ isConnected(offerId: string): boolean { - const offer = this.activeOffers.get(offerId) - return offer ? offer.answered : false + const connection = this.activeConnections.get(offerId) + return connection ? connection.getState() === 'connected' : false } /** @@ -762,12 +677,11 @@ export class Rondevu extends EventEmitter { */ async disconnectAll(): Promise { this.debug('Disconnecting all offers') - for (const [offerId, offer] of this.activeOffers.entries()) { - this.debug(`Closing offer ${offerId}`) - offer.dc?.close() - offer.pc.close() + for (const [offerId, connection] of this.activeConnections.entries()) { + this.debug(`Closing connection ${offerId}`) + connection.close() } - this.activeOffers.clear() + this.activeConnections.clear() } /** @@ -777,7 +691,7 @@ export class Rondevu extends EventEmitter { getServiceStatus(): { active: boolean; offerCount: number; maxOffers: number; filling: boolean } { return { active: this.currentService !== null, - offerCount: this.activeOffers.size, + offerCount: this.activeConnections.size, maxOffers: this.maxOffers, filling: this.filling } @@ -805,63 +719,43 @@ export class Rondevu extends EventEmitter { } /** - * Start polling for remote ICE candidates - * Returns the polling interval ID - */ - private startIcePolling( - pc: RTCPeerConnection, - serviceFqn: string, - offerId: string - ): ReturnType { - let lastIceTimestamp = 0 - - return setInterval(async () => { - try { - const result = await this.api.getOfferIceCandidates( - serviceFqn, - offerId, - lastIceTimestamp - ) - for (const item of result.candidates) { - if (item.candidate) { - this.emit('ice:candidate:remote', offerId, item.candidate, item.role) - await pc.addIceCandidate(new RTCIceCandidate(item.candidate)) - lastIceTimestamp = item.createdAt - } - } - } catch (err) { - console.error('[Rondevu] Failed to poll ICE candidates:', err) - } - }, Rondevu.POLLING_INTERVAL_MS) - } - - /** - * Automatically connect to a service (answerer side) - * Handles the entire connection flow: discovery, WebRTC setup, answer exchange, ICE candidates + * Connect to a service (answerer side) - v1.0.0 API + * Returns an AnswererConnection with automatic reconnection and buffering + * + * BREAKING CHANGE: This now returns AnswererConnection instead of ConnectionContext * * @example * ```typescript * // Connect to specific user * const connection = await rondevu.connectToService({ * serviceFqn: 'chat:2.0.0@alice', - * onConnection: ({ dc, peerUsername }) => { - * console.log('Connected to', peerUsername) - * dc.addEventListener('message', (e) => console.log(e.data)) - * dc.addEventListener('open', () => dc.send('Hello!')) + * connectionConfig: { + * reconnectEnabled: true, + * bufferEnabled: true * } * }) * + * connection.on('connected', () => { + * console.log('Connected!') + * connection.send('Hello!') + * }) + * + * connection.on('message', (data) => { + * console.log('Received:', data) + * }) + * + * connection.on('reconnecting', (attempt) => { + * console.log(`Reconnecting, attempt ${attempt}`) + * }) + * * // Discover random service * const connection = await rondevu.connectToService({ - * service: 'chat:2.0.0', - * onConnection: ({ dc, peerUsername }) => { - * console.log('Connected to', peerUsername) - * } + * service: 'chat:2.0.0' * }) * ``` */ - async connectToService(options: ConnectToServiceOptions): Promise { - const { onConnection, rtcConfig } = options + async connectToService(options: ConnectToServiceOptions): Promise { + const { rtcConfig, connectionConfig } = options // Validate inputs if (options.serviceFqn !== undefined && typeof options.serviceFqn === 'string' && !options.serviceFqn.trim()) { @@ -878,87 +772,32 @@ export class Rondevu extends EventEmitter { const fqn = await this.resolveServiceFqn(options) this.debug(`Connecting to service: ${fqn}`) - // 1. Get service offer + // Get service offer const serviceData = await this.api.getService(fqn) this.debug(`Found service from @${serviceData.username}`) - // 2. Create RTCPeerConnection + // Create RTCConfiguration const rtcConfiguration = rtcConfig || { iceServers: this.iceServers } - const pc = new RTCPeerConnection(rtcConfiguration) - // 3. Set up data channel handler (answerer receives it from offerer) - let dc: RTCDataChannel | null = null - const dataChannelPromise = new Promise((resolve) => { - pc.ondatachannel = (event) => { - this.debug('Data channel received from offerer') - dc = event.channel - this.emit('connection:opened', serviceData.offerId, dc) - resolve(dc) - } - }) - - // 4. Set up ICE candidate exchange - this.setupIceCandidateHandler(pc, serviceData.serviceFqn, serviceData.offerId) - - // 5. Poll for remote ICE candidates - const icePollInterval = this.startIcePolling(pc, serviceData.serviceFqn, serviceData.offerId) - - // 6. Set remote description - await pc.setRemoteDescription({ - type: 'offer', - sdp: serviceData.sdp - }) - - // 7. Create and send answer - const answer = await pc.createAnswer() - await pc.setLocalDescription(answer) - await this.api.answerOffer( - serviceData.serviceFqn, - serviceData.offerId, - answer.sdp! - ) - - // 8. Wait for data channel to be established - dc = await dataChannelPromise - - // Create connection context - const context: ConnectionContext = { - pc, - dc, + // Create AnswererConnection + const connection = new AnswererConnection({ + api: this.api, serviceFqn: serviceData.serviceFqn, offerId: serviceData.offerId, - peerUsername: serviceData.username - } + offerSdp: serviceData.sdp, + rtcConfig: rtcConfiguration, + config: { + ...connectionConfig, + debug: this.debugEnabled, + }, + }) - // 9. Set up connection state monitoring - pc.onconnectionstatechange = () => { - this.debug(`Connection state: ${pc.connectionState}`) - if (pc.connectionState === 'failed' || pc.connectionState === 'closed') { - clearInterval(icePollInterval) - } - } + // Initialize the connection + await connection.initialize() - // 10. Wait for data channel to open and call onConnection - if (dc.readyState === 'open') { - this.debug('Data channel already open') - if (onConnection) { - await onConnection(context) - } - } else { - await new Promise((resolve) => { - dc!.addEventListener('open', async () => { - this.debug('Data channel opened') - if (onConnection) { - await onConnection(context) - } - resolve() - }) - }) - } - - return context + return connection } // ============================================ @@ -968,8 +807,6 @@ export class Rondevu extends EventEmitter { /** * Find a service - unified discovery method * - * Replaces getService(), discoverService(), and discoverServices() with a single method. - * * @param serviceFqn - Service identifier (e.g., 'chat:1.0.0' or 'chat:1.0.0@alice') * @param options - Discovery options * @@ -1101,6 +938,36 @@ export class Rondevu extends EventEmitter { return this.keypair.publicKey } + /** + * Get active connections (for offerer side) + */ + getActiveConnections(): Map { + return this.activeConnections + } + + /** + * Get all active offers (legacy compatibility) + * @deprecated Use getActiveConnections() instead + */ + getActiveOffers(): ActiveOffer[] { + const offers: ActiveOffer[] = [] + for (const [offerId, connection] of this.activeConnections.entries()) { + const pc = connection.getPeerConnection() + const dc = connection.getDataChannel() + if (pc) { + offers.push({ + offerId, + serviceFqn: this.currentService ? `${this.currentService}@${this.username}` : '', + pc, + dc: dc || undefined, + answered: connection.getState() === 'connected', + createdAt: Date.now(), + }) + } + } + return offers + } + /** * Access to underlying API for advanced operations * @deprecated Use direct methods on Rondevu instance instead