6 Commits

Author SHA1 Message Date
5e673ac993 Add type-safe EventBus with generic event mapping
Implemented EventBus class with full TypeScript type inference:
- Generic type parameter TEvents for event name to payload mapping
- Type-safe on/once/off/emit methods with inferred data types
- Utility methods: clear, listenerCount, eventNames
- Complete JSDoc documentation with usage examples

Added core connection types:
- ConnectionIdentity, ConnectionState, ConnectionInterface
- QueueMessageOptions for message queuing
- Connection composite type

All types and classes exported from main index.

Example usage:
```typescript
interface MyEvents {
  'user:connected': { userId: string; timestamp: number };
  'message:received': string;
}

const bus = new EventBus<MyEvents>();

// TypeScript knows data is { userId: string; timestamp: number }
bus.on('user:connected', (data) => {
  console.log(data.userId, data.timestamp);
});
```

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-07 16:17:52 +01:00
511bac8033 Strip client to minimal skeleton with ConnectionManager
Removed all complex implementations and kept only the essentials:
- Removed durable/ directory (DurableConnection, DurableChannel, etc.)
- Removed peer/ directory (entire state machine)
- Removed service-pool.ts, offer-pool.ts, rondevu.ts
- Removed auth.ts, offers.ts, usernames.ts, event-emitter.ts
- Added empty ConnectionManager class as starting point

The client now contains just:
- src/connection-manager.ts - Empty class skeleton
- src/index.ts - Simple export

This provides a clean slate to rebuild the client with a simpler
architecture focused on core WebRTC connection management.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-07 13:30:55 +01:00
eb2c61bdb8 Release v0.9.2: Fix service pool ICE candidate collection 2025-12-07 11:31:33 +01:00
3139897b25 Fix service pool ICE candidate collection and logging
Fixed critical timing issue where ICE candidates were generated before
the handler was attached, causing them to be lost:

- Set up onicecandidate handler BEFORE setLocalDescription()
- Collect candidates in array while waiting for offer ID
- Send all pending candidates once offer ID is available
- Add detailed logging for service pool ICE candidates
- Log candidate type (host/srflx/relay) for debugging

This ensures all ICE candidates are captured and sent to the signaling
server, and provides visibility into what types of candidates are being
generated (especially important for diagnosing TURN relay issues).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-07 11:31:24 +01:00
a550641993 Release v0.9.1: Add detailed ICE candidate exchange logging 2025-12-07 11:13:32 +01:00
04603cfe2d Add detailed ICE candidate exchange logging
Added comprehensive logging to track WebRTC ICE candidate exchange:
- Log local candidate generation with type (host/srflx/relay)
- Log when candidates are sent to signaling server
- Log remote candidate reception and addition
- Log ICE gathering state changes
- Log ICE connection state changes
- Enhanced ICE error logging with details

This will help diagnose connection issues and TURN server problems.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-07 11:13:24 +01:00
30 changed files with 280 additions and 3993 deletions

120
EVENTBUS_EXAMPLE.md Normal file
View File

@@ -0,0 +1,120 @@
# EventBus Usage Examples
## Type-Safe Event Bus
The `EventBus` class provides fully type-safe event handling with TypeScript type inference.
### Basic Usage
```typescript
import { EventBus } from '@xtr-dev/rondevu-client';
// Define your event mapping
interface AppEvents {
'user:connected': { userId: string; timestamp: number };
'user:disconnected': { userId: string };
'message:received': string;
'connection:error': Error;
}
// Create the event bus
const events = new EventBus<AppEvents>();
// Subscribe to events - TypeScript knows the exact data type!
events.on('user:connected', (data) => {
// data is { userId: string; timestamp: number }
console.log(`User ${data.userId} connected at ${data.timestamp}`);
});
events.on('message:received', (data) => {
// data is string
console.log(data.toUpperCase());
});
// Emit events - TypeScript validates the data type
events.emit('user:connected', {
userId: '123',
timestamp: Date.now()
});
events.emit('message:received', 'Hello World');
// Type errors caught at compile time:
// events.emit('user:connected', 'wrong type'); // ❌ Error!
// events.emit('message:received', { wrong: 'type' }); // ❌ Error!
```
### One-Time Listeners
```typescript
// Subscribe once - handler auto-unsubscribes after first call
events.once('connection:error', (error) => {
console.error('Connection failed:', error.message);
});
```
### Unsubscribing
```typescript
const handler = (data: string) => {
console.log('Message:', data);
};
events.on('message:received', handler);
// Later, unsubscribe
events.off('message:received', handler);
```
### Utility Methods
```typescript
// Clear all handlers for a specific event
events.clear('message:received');
// Clear all handlers for all events
events.clear();
// Get listener count
const count = events.listenerCount('user:connected');
// Get all event names with handlers
const eventNames = events.eventNames();
```
## Connection Events Example
```typescript
interface ConnectionEvents {
'connection:state': { state: 'connected' | 'disconnected' | 'connecting' };
'connection:message': { from: string; data: string | ArrayBuffer };
'connection:error': { code: string; message: string };
}
class ConnectionManager {
private events = new EventBus<ConnectionEvents>();
on<K extends keyof ConnectionEvents>(
event: K,
handler: (data: ConnectionEvents[K]) => void
) {
this.events.on(event, handler);
}
private handleStateChange(state: 'connected' | 'disconnected' | 'connecting') {
this.events.emit('connection:state', { state });
}
private handleMessage(from: string, data: string | ArrayBuffer) {
this.events.emit('connection:message', { from, data });
}
}
```
## Benefits
-**Full type safety** - TypeScript validates event names and data types
-**IntelliSense support** - Auto-completion for event names and data properties
-**Compile-time errors** - Catch type mismatches before runtime
-**Self-documenting** - Event interface serves as documentation
-**Refactoring-friendly** - Rename events or change types with confidence

17
package-lock.json generated
View File

@@ -1,16 +1,16 @@
{
"name": "@xtr-dev/rondevu-client",
"version": "0.7.12",
"version": "0.9.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@xtr-dev/rondevu-client",
"version": "0.7.12",
"version": "0.9.2",
"license": "MIT",
"dependencies": {
"@noble/ed25519": "^3.0.0",
"@xtr-dev/rondevu-client": "^0.5.1"
"@xtr-dev/rondevu-client": "^0.9.2"
},
"devDependencies": {
"typescript": "^5.9.3"
@@ -26,10 +26,13 @@
}
},
"node_modules/@xtr-dev/rondevu-client": {
"version": "0.5.1",
"resolved": "https://registry.npmjs.org/@xtr-dev/rondevu-client/-/rondevu-client-0.5.1.tgz",
"integrity": "sha512-110ejMCizPUPkHwwwNvcdCSZceLaHeFbf1LNkXvbG6pnLBqCf2uoGOOaRkArb7HNNFABFB+HXzm/AVzNdadosw==",
"license": "MIT"
"version": "0.9.2",
"resolved": "https://registry.npmjs.org/@xtr-dev/rondevu-client/-/rondevu-client-0.9.2.tgz",
"integrity": "sha512-DVow5AOPU40dqQtlfQK7J2GNX8dz2/4UzltMqublaPZubbkRYgocvp0b76NQu5F6v150IstMV2N49uxAYqogVw==",
"license": "MIT",
"dependencies": {
"@noble/ed25519": "^3.0.0"
}
},
"node_modules/typescript": {
"version": "5.9.3",

View File

@@ -1,6 +1,6 @@
{
"name": "@xtr-dev/rondevu-client",
"version": "0.9.0",
"version": "0.9.2",
"description": "TypeScript client for Rondevu with durable WebRTC connections, automatic reconnection, and message queuing",
"type": "module",
"main": "dist/index.js",
@@ -27,6 +27,7 @@
"README.md"
],
"dependencies": {
"@noble/ed25519": "^3.0.0"
"@noble/ed25519": "^3.0.0",
"@xtr-dev/rondevu-client": "^0.9.2"
}
}

View File

@@ -1,62 +0,0 @@
export interface Credentials {
peerId: string;
secret: string;
}
// Fetch-compatible function type
export type FetchFunction = (
input: RequestInfo | URL,
init?: RequestInit
) => Promise<Response>;
export class RondevuAuth {
private fetchFn: FetchFunction;
constructor(
private baseUrl: string,
fetchFn?: FetchFunction
) {
// Use provided fetch or fall back to global fetch
this.fetchFn = fetchFn || ((...args) => {
if (typeof globalThis.fetch === 'function') {
return globalThis.fetch(...args);
}
throw new Error(
'fetch is not available. Please provide a fetch implementation in the constructor options.'
);
});
}
/**
* Register a new peer and receive credentials
* Generates a cryptographically random peer ID (128-bit)
* @throws Error if registration fails
*/
async register(): Promise<Credentials> {
const response = await this.fetchFn(`${this.baseUrl}/register`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({}),
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Registration failed: ${error.error || response.statusText}`);
}
const data = await response.json();
return {
peerId: data.peerId,
secret: data.secret,
};
}
/**
* Create Authorization header value
*/
static createAuthHeader(credentials: Credentials): string {
return `Bearer ${credentials.peerId}:${credentials.secret}`;
}
}

View File

@@ -0,0 +1,9 @@
/**
* ConnectionManager - Manages WebRTC peer connections
*/
export class ConnectionManager {
constructor() {
// TODO: Initialize connection manager
}
}

View File

@@ -1,361 +0,0 @@
/**
* DurableChannel - Message queueing wrapper for RTCDataChannel
*
* Provides automatic message queuing during disconnections and transparent
* flushing when the connection is re-established.
*/
import { EventEmitter } from '../event-emitter.js';
import {
DurableChannelState
} from './types.js';
import type {
DurableChannelConfig,
DurableChannelEvents,
QueuedMessage
} from './types.js';
/**
* Default configuration for durable channels
*/
const DEFAULT_CONFIG = {
maxQueueSize: 1000,
maxMessageAge: 60000, // 1 minute
ordered: true,
maxRetransmits: undefined
} as const;
/**
* Durable channel that survives WebRTC peer connection drops
*
* The DurableChannel wraps an RTCDataChannel and provides:
* - Automatic message queuing during disconnections
* - Queue flushing on reconnection
* - Configurable queue size and message age limits
* - RTCDataChannel-compatible API
*
* @example
* ```typescript
* const channel = new DurableChannel('chat', connection, {
* maxQueueSize: 500,
* maxMessageAge: 30000
* });
*
* channel.on('message', (data) => {
* console.log('Received:', data);
* });
*
* channel.on('open', () => {
* channel.send('Hello!');
* });
*
* // Messages sent during disconnection are automatically queued
* channel.send('This will be queued if disconnected');
* ```
*/
export class DurableChannel extends EventEmitter<DurableChannelEvents> {
readonly label: string;
readonly config: DurableChannelConfig;
private _state: DurableChannelState;
private underlyingChannel?: RTCDataChannel;
private messageQueue: QueuedMessage[] = [];
private queueProcessing: boolean = false;
private _bufferedAmountLowThreshold: number = 0;
// Event handlers that need cleanup
private openHandler?: () => void;
private messageHandler?: (event: MessageEvent) => void;
private errorHandler?: (event: Event) => void;
private closeHandler?: () => void;
private bufferedAmountLowHandler?: () => void;
constructor(
label: string,
config?: DurableChannelConfig
) {
super();
this.label = label;
this.config = { ...DEFAULT_CONFIG, ...config };
this._state = DurableChannelState.CONNECTING;
}
/**
* Current channel state
*/
get readyState(): DurableChannelState {
return this._state;
}
/**
* Buffered amount from underlying channel (0 if no channel)
*/
get bufferedAmount(): number {
return this.underlyingChannel?.bufferedAmount ?? 0;
}
/**
* Buffered amount low threshold
*/
get bufferedAmountLowThreshold(): number {
return this._bufferedAmountLowThreshold;
}
set bufferedAmountLowThreshold(value: number) {
this._bufferedAmountLowThreshold = value;
if (this.underlyingChannel) {
this.underlyingChannel.bufferedAmountLowThreshold = value;
}
}
/**
* Send data through the channel
*
* If the channel is open, sends immediately. Otherwise, queues the message
* for delivery when the channel reconnects.
*
* @param data - Data to send
*/
send(data: string | Blob | ArrayBuffer | ArrayBufferView): void {
if (this._state === DurableChannelState.OPEN && this.underlyingChannel) {
// Channel is open - send immediately
try {
this.underlyingChannel.send(data as any);
} catch (error) {
// Send failed - queue the message
this.enqueueMessage(data);
this.emit('error', error as Error);
}
} else if (this._state !== DurableChannelState.CLOSED) {
// Channel is not open but not closed - queue the message
this.enqueueMessage(data);
} else {
// Channel is closed - throw error
throw new Error('Cannot send on closed channel');
}
}
/**
* Close the channel
*/
close(): void {
if (this._state === DurableChannelState.CLOSED ||
this._state === DurableChannelState.CLOSING) {
return;
}
this._state = DurableChannelState.CLOSING;
if (this.underlyingChannel) {
this.underlyingChannel.close();
}
this._state = DurableChannelState.CLOSED;
this.emit('close');
}
/**
* Attach to an underlying RTCDataChannel
*
* This is called when a WebRTC connection is established (or re-established).
* The channel will flush any queued messages and forward events.
*
* @param channel - RTCDataChannel to attach to
* @internal
*/
attachToChannel(channel: RTCDataChannel): void {
// Detach from any existing channel first
this.detachFromChannel();
this.underlyingChannel = channel;
// Set buffered amount low threshold
channel.bufferedAmountLowThreshold = this._bufferedAmountLowThreshold;
// Setup event handlers
this.openHandler = () => {
this._state = DurableChannelState.OPEN;
this.emit('open');
// Flush queued messages
this.flushQueue().catch(error => {
this.emit('error', error);
});
};
this.messageHandler = (event: MessageEvent) => {
this.emit('message', event.data);
};
this.errorHandler = (event: Event) => {
this.emit('error', new Error(`Channel error: ${event.type}`));
};
this.closeHandler = () => {
if (this._state !== DurableChannelState.CLOSING &&
this._state !== DurableChannelState.CLOSED) {
// Unexpected close - transition to connecting (will reconnect)
this._state = DurableChannelState.CONNECTING;
}
};
this.bufferedAmountLowHandler = () => {
this.emit('bufferedAmountLow');
};
// Attach handlers
channel.addEventListener('open', this.openHandler);
channel.addEventListener('message', this.messageHandler);
channel.addEventListener('error', this.errorHandler);
channel.addEventListener('close', this.closeHandler);
channel.addEventListener('bufferedamountlow', this.bufferedAmountLowHandler);
// If channel is already open, trigger open event
if (channel.readyState === 'open') {
this.openHandler();
} else if (channel.readyState === 'connecting') {
this._state = DurableChannelState.CONNECTING;
}
}
/**
* Detach from the underlying RTCDataChannel
*
* This is called when a WebRTC connection drops. The channel remains alive
* and continues queuing messages.
*
* @internal
*/
detachFromChannel(): void {
if (!this.underlyingChannel) {
return;
}
// Remove event listeners
if (this.openHandler) {
this.underlyingChannel.removeEventListener('open', this.openHandler);
}
if (this.messageHandler) {
this.underlyingChannel.removeEventListener('message', this.messageHandler);
}
if (this.errorHandler) {
this.underlyingChannel.removeEventListener('error', this.errorHandler);
}
if (this.closeHandler) {
this.underlyingChannel.removeEventListener('close', this.closeHandler);
}
if (this.bufferedAmountLowHandler) {
this.underlyingChannel.removeEventListener('bufferedamountlow', this.bufferedAmountLowHandler);
}
this.underlyingChannel = undefined;
this._state = DurableChannelState.CONNECTING;
}
/**
* Enqueue a message for later delivery
*/
private enqueueMessage(data: string | Blob | ArrayBuffer | ArrayBufferView): void {
// Prune old messages first
this.pruneOldMessages();
const message: QueuedMessage = {
data,
enqueuedAt: Date.now(),
id: `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
};
this.messageQueue.push(message);
// Handle overflow
const maxQueueSize = this.config.maxQueueSize ?? 1000;
if (this.messageQueue.length > maxQueueSize) {
const excess = this.messageQueue.length - maxQueueSize;
this.messageQueue.splice(0, excess);
this.emit('queueOverflow', excess);
console.warn(
`DurableChannel[${this.label}]: Dropped ${excess} messages due to queue overflow`
);
}
}
/**
* Flush all queued messages through the channel
*/
private async flushQueue(): Promise<void> {
if (this.queueProcessing || !this.underlyingChannel ||
this.underlyingChannel.readyState !== 'open') {
return;
}
this.queueProcessing = true;
try {
// Prune old messages before flushing
this.pruneOldMessages();
// Send all queued messages
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (!message) break;
try {
this.underlyingChannel.send(message.data as any);
} catch (error) {
// Send failed - re-queue message
this.messageQueue.unshift(message);
throw error;
}
// If buffer is getting full, wait for it to drain
if (this.underlyingChannel.bufferedAmount > 16 * 1024 * 1024) { // 16MB
await new Promise<void>((resolve) => {
const checkBuffer = () => {
if (!this.underlyingChannel ||
this.underlyingChannel.bufferedAmount < 8 * 1024 * 1024) {
resolve();
} else {
setTimeout(checkBuffer, 100);
}
};
checkBuffer();
});
}
}
} finally {
this.queueProcessing = false;
}
}
/**
* Remove messages older than maxMessageAge from the queue
*/
private pruneOldMessages(): void {
const maxMessageAge = this.config.maxMessageAge ?? 60000;
if (maxMessageAge === Infinity || maxMessageAge <= 0) {
return;
}
const now = Date.now();
const cutoff = now - maxMessageAge;
const originalLength = this.messageQueue.length;
this.messageQueue = this.messageQueue.filter(msg => msg.enqueuedAt >= cutoff);
const pruned = originalLength - this.messageQueue.length;
if (pruned > 0) {
console.warn(
`DurableChannel[${this.label}]: Pruned ${pruned} old messages (older than ${maxMessageAge}ms)`
);
}
}
/**
* Get the current queue size
*
* @internal
*/
getQueueSize(): number {
return this.messageQueue.length;
}
}

View File

@@ -1,453 +0,0 @@
/**
* DurableConnection - WebRTC connection with automatic reconnection
*
* Manages the WebRTC peer lifecycle and automatically reconnects on
* connection drops with exponential backoff.
*/
import { EventEmitter } from '../event-emitter.js';
import RondevuPeer from '../peer/index.js';
import type { RondevuOffers } from '../offers.js';
import { DurableChannel } from './channel.js';
import { createReconnectionScheduler, type ReconnectionScheduler } from './reconnection.js';
import {
DurableConnectionState
} from './types.js';
import type {
DurableConnectionConfig,
DurableConnectionEvents,
ConnectionInfo
} from './types.js';
/**
* Default configuration for durable connections
*/
const DEFAULT_CONFIG: Required<DurableConnectionConfig> = {
maxReconnectAttempts: 10,
reconnectBackoffBase: 1000,
reconnectBackoffMax: 30000,
reconnectJitter: 0.2,
connectionTimeout: 30000,
maxQueueSize: 1000,
maxMessageAge: 60000,
rtcConfig: {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
]
}
};
/**
* Durable WebRTC connection that automatically reconnects
*
* The DurableConnection manages the lifecycle of a WebRTC peer connection
* and provides:
* - Automatic reconnection with exponential backoff
* - Multiple durable channels that survive reconnections
* - Configurable retry limits and timeouts
* - High-level connection state events
*
* @example
* ```typescript
* const connection = new DurableConnection(
* offersApi,
* { username: 'alice', serviceFqn: 'chat@1.0.0' },
* { maxReconnectAttempts: 5 }
* );
*
* connection.on('connected', () => {
* console.log('Connected!');
* });
*
* connection.on('reconnecting', (attempt, max, delay) => {
* console.log(`Reconnecting... (${attempt}/${max}, retry in ${delay}ms)`);
* });
*
* const channel = connection.createChannel('chat');
* channel.on('message', (data) => {
* console.log('Received:', data);
* });
*
* await connection.connect();
* ```
*/
export class DurableConnection extends EventEmitter<DurableConnectionEvents> {
readonly connectionId: string;
readonly config: Required<DurableConnectionConfig>;
readonly connectionInfo: ConnectionInfo;
private _state: DurableConnectionState;
private currentPeer?: RondevuPeer;
private channels: Map<string, DurableChannel> = new Map();
private reconnectionScheduler?: ReconnectionScheduler;
// Track peer event handlers for cleanup
private peerConnectedHandler?: () => void;
private peerDisconnectedHandler?: () => void;
private peerFailedHandler?: (error: Error) => void;
private peerDataChannelHandler?: (channel: RTCDataChannel) => void;
constructor(
private offersApi: RondevuOffers,
connectionInfo: ConnectionInfo,
config?: DurableConnectionConfig
) {
super();
this.connectionId = `conn-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
this.config = { ...DEFAULT_CONFIG, ...config };
this.connectionInfo = connectionInfo;
this._state = DurableConnectionState.CONNECTING;
}
/**
* Current connection state
*/
getState(): DurableConnectionState {
return this._state;
}
/**
* Check if connection is currently connected
*/
isConnected(): boolean {
return this._state === DurableConnectionState.CONNECTED;
}
/**
* Create a durable channel on this connection
*
* The channel will be created on the current peer connection if available,
* otherwise it will be created when the connection is established.
*
* @param label - Channel label
* @param options - RTCDataChannel init options
* @returns DurableChannel instance
*/
createChannel(label: string, options?: RTCDataChannelInit): DurableChannel {
// Check if channel already exists
if (this.channels.has(label)) {
throw new Error(`Channel with label '${label}' already exists`);
}
// Create durable channel
const durableChannel = new DurableChannel(label, {
maxQueueSize: this.config.maxQueueSize,
maxMessageAge: this.config.maxMessageAge,
ordered: options?.ordered ?? true,
maxRetransmits: options?.maxRetransmits
});
this.channels.set(label, durableChannel);
// If we have a current peer, attach the channel
if (this.currentPeer && this._state === DurableConnectionState.CONNECTED) {
this.createAndAttachChannel(durableChannel, options);
}
return durableChannel;
}
/**
* Get an existing channel by label
*/
getChannel(label: string): DurableChannel | undefined {
return this.channels.get(label);
}
/**
* Establish the initial connection
*
* @returns Promise that resolves when connected
*/
async connect(): Promise<void> {
if (this._state !== DurableConnectionState.CONNECTING) {
throw new Error(`Cannot connect from state: ${this._state}`);
}
try {
await this.establishConnection();
} catch (error) {
this._state = DurableConnectionState.DISCONNECTED;
await this.handleDisconnection();
throw error;
}
}
/**
* Close the connection gracefully
*/
async close(): Promise<void> {
if (this._state === DurableConnectionState.CLOSED) {
return;
}
const previousState = this._state;
this._state = DurableConnectionState.CLOSED;
// Cancel any ongoing reconnection
if (this.reconnectionScheduler) {
this.reconnectionScheduler.cancel();
}
// Close all channels
for (const channel of this.channels.values()) {
channel.close();
}
// Close peer connection
if (this.currentPeer) {
await this.currentPeer.close();
this.currentPeer = undefined;
}
this.emit('state', this._state, previousState);
this.emit('closed');
}
/**
* Establish a WebRTC connection
*/
private async establishConnection(): Promise<void> {
// Create new peer
const peer = new RondevuPeer(this.offersApi, this.config.rtcConfig);
this.currentPeer = peer;
// Setup peer event handlers
this.setupPeerHandlers(peer);
// Determine connection method based on connection info
if (this.connectionInfo.uuid) {
// Connect by UUID
await this.connectByUuid(peer, this.connectionInfo.uuid);
} else if (this.connectionInfo.username && this.connectionInfo.serviceFqn) {
// Connect by username and service FQN
await this.connectByService(peer, this.connectionInfo.username, this.connectionInfo.serviceFqn);
} else {
throw new Error('Invalid connection info: must provide either uuid or (username + serviceFqn)');
}
// Wait for connection with timeout
await this.waitForConnection(peer);
// Connection established
this.transitionToConnected();
}
/**
* Connect to a service by UUID
*/
private async connectByUuid(peer: RondevuPeer, uuid: string): Promise<void> {
// Get service details
const response = await fetch(`${this.offersApi['baseUrl']}/services/${uuid}`);
if (!response.ok) {
throw new Error(`Service not found: ${uuid}`);
}
const service = await response.json();
// Answer the offer
await peer.answer(service.offerId, service.sdp, {
secret: this.offersApi['credentials'].secret,
topics: []
});
}
/**
* Connect to a service by username and service FQN
*/
private async connectByService(peer: RondevuPeer, username: string, serviceFqn: string): Promise<void> {
// Query service to get UUID
const response = await fetch(`${this.offersApi['baseUrl']}/index/${username}/query`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ serviceFqn })
});
if (!response.ok) {
throw new Error(`Service not found: ${username}/${serviceFqn}`);
}
const { uuid } = await response.json();
// Connect by UUID
await this.connectByUuid(peer, uuid);
}
/**
* Wait for peer connection to establish
*/
private async waitForConnection(peer: RondevuPeer): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Connection timeout'));
}, this.config.connectionTimeout);
const onConnected = () => {
clearTimeout(timeout);
peer.off('connected', onConnected);
peer.off('failed', onFailed);
resolve();
};
const onFailed = (error: Error) => {
clearTimeout(timeout);
peer.off('connected', onConnected);
peer.off('failed', onFailed);
reject(error);
};
peer.on('connected', onConnected);
peer.on('failed', onFailed);
});
}
/**
* Setup event handlers for peer
*/
private setupPeerHandlers(peer: RondevuPeer): void {
this.peerConnectedHandler = () => {
// Connection established - will be handled by waitForConnection
};
this.peerDisconnectedHandler = () => {
if (this._state !== DurableConnectionState.CLOSED) {
this.handleDisconnection();
}
};
this.peerFailedHandler = (error: Error) => {
if (this._state !== DurableConnectionState.CLOSED) {
console.error('Peer connection failed:', error);
this.handleDisconnection();
}
};
this.peerDataChannelHandler = (channel: RTCDataChannel) => {
// Find or create durable channel
let durableChannel = this.channels.get(channel.label);
if (!durableChannel) {
// Auto-create channel for incoming data channels
durableChannel = new DurableChannel(channel.label, {
maxQueueSize: this.config.maxQueueSize,
maxMessageAge: this.config.maxMessageAge
});
this.channels.set(channel.label, durableChannel);
}
// Attach the received channel
durableChannel.attachToChannel(channel);
};
peer.on('connected', this.peerConnectedHandler);
peer.on('disconnected', this.peerDisconnectedHandler);
peer.on('failed', this.peerFailedHandler);
peer.on('datachannel', this.peerDataChannelHandler);
}
/**
* Transition to connected state
*/
private transitionToConnected(): void {
const previousState = this._state;
this._state = DurableConnectionState.CONNECTED;
// Reset reconnection scheduler if it exists
if (this.reconnectionScheduler) {
this.reconnectionScheduler.reset();
}
// Attach all channels to the new peer connection
for (const [label, channel] of this.channels) {
if (this.currentPeer) {
this.createAndAttachChannel(channel);
}
}
this.emit('state', this._state, previousState);
this.emit('connected');
}
/**
* Create underlying RTCDataChannel and attach to durable channel
*/
private createAndAttachChannel(
durableChannel: DurableChannel,
options?: RTCDataChannelInit
): void {
if (!this.currentPeer) {
return;
}
// Check if peer already has this channel (received via datachannel event)
// If not, create it
const senders = (this.currentPeer.pc as any).getSenders?.() || [];
const existingChannel = Array.from(senders as RTCRtpSender[])
.map((sender) => (sender as any).channel as RTCDataChannel)
.find(ch => ch && ch.label === durableChannel.label);
if (existingChannel) {
durableChannel.attachToChannel(existingChannel);
} else {
// Create new channel on peer
const rtcChannel = this.currentPeer.createDataChannel(
durableChannel.label,
options
);
durableChannel.attachToChannel(rtcChannel);
}
}
/**
* Handle connection disconnection
*/
private async handleDisconnection(): Promise<void> {
if (this._state === DurableConnectionState.CLOSED ||
this._state === DurableConnectionState.FAILED) {
return;
}
const previousState = this._state;
this._state = DurableConnectionState.RECONNECTING;
this.emit('state', this._state, previousState);
this.emit('disconnected');
// Detach all channels (but keep them alive)
for (const channel of this.channels.values()) {
channel.detachFromChannel();
}
// Close old peer
if (this.currentPeer) {
await this.currentPeer.close();
this.currentPeer = undefined;
}
// Create or use existing reconnection scheduler
if (!this.reconnectionScheduler) {
this.reconnectionScheduler = createReconnectionScheduler({
maxAttempts: this.config.maxReconnectAttempts,
backoffBase: this.config.reconnectBackoffBase,
backoffMax: this.config.reconnectBackoffMax,
jitter: this.config.reconnectJitter,
onReconnect: async () => {
await this.establishConnection();
},
onMaxAttemptsExceeded: (error) => {
const prevState = this._state;
this._state = DurableConnectionState.FAILED;
this.emit('state', this._state, prevState);
this.emit('failed', error, true);
},
onBeforeAttempt: (attempt, max, delay) => {
this.emit('reconnecting', attempt, max, delay);
}
});
}
// Schedule reconnection
this.reconnectionScheduler.schedule();
}
}

View File

@@ -1,200 +0,0 @@
/**
* Reconnection utilities for durable connections
*
* This module provides utilities for managing reconnection logic with
* exponential backoff and jitter.
*/
/**
* Calculate exponential backoff delay with jitter
*
* @param attempt - Current attempt number (0-indexed)
* @param base - Base delay in milliseconds
* @param max - Maximum delay in milliseconds
* @param jitter - Jitter factor (0-1), e.g., 0.2 for ±20%
* @returns Delay in milliseconds with jitter applied
*
* @example
* ```typescript
* calculateBackoff(0, 1000, 30000, 0.2) // ~1000ms ± 20%
* calculateBackoff(1, 1000, 30000, 0.2) // ~2000ms ± 20%
* calculateBackoff(2, 1000, 30000, 0.2) // ~4000ms ± 20%
* calculateBackoff(5, 1000, 30000, 0.2) // ~30000ms ± 20% (capped at max)
* ```
*/
export function calculateBackoff(
attempt: number,
base: number,
max: number,
jitter: number
): number {
// Calculate exponential delay: base * 2^attempt
const exponential = base * Math.pow(2, attempt);
// Cap at maximum
const capped = Math.min(exponential, max);
// Apply jitter: ± (jitter * capped)
const jitterAmount = capped * jitter;
const randomJitter = (Math.random() * 2 - 1) * jitterAmount;
// Return delay with jitter, ensuring it's not negative
return Math.max(0, capped + randomJitter);
}
/**
* Configuration for reconnection scheduler
*/
export interface ReconnectionSchedulerConfig {
/** Maximum number of reconnection attempts */
maxAttempts: number;
/** Base delay for exponential backoff */
backoffBase: number;
/** Maximum delay between attempts */
backoffMax: number;
/** Jitter factor for randomizing delays */
jitter: number;
/** Callback invoked for each reconnection attempt */
onReconnect: () => Promise<void>;
/** Callback invoked when max attempts exceeded */
onMaxAttemptsExceeded: (error: Error) => void;
/** Optional callback invoked before each attempt */
onBeforeAttempt?: (attempt: number, maxAttempts: number, delay: number) => void;
}
/**
* Reconnection scheduler state
*/
export interface ReconnectionScheduler {
/** Current attempt number */
attempt: number;
/** Whether scheduler is active */
active: boolean;
/** Schedule next reconnection attempt */
schedule: () => void;
/** Cancel scheduled reconnection */
cancel: () => void;
/** Reset attempt counter */
reset: () => void;
}
/**
* Create a reconnection scheduler
*
* @param config - Scheduler configuration
* @returns Reconnection scheduler instance
*
* @example
* ```typescript
* const scheduler = createReconnectionScheduler({
* maxAttempts: 10,
* backoffBase: 1000,
* backoffMax: 30000,
* jitter: 0.2,
* onReconnect: async () => {
* await connect();
* },
* onMaxAttemptsExceeded: (error) => {
* console.error('Failed to reconnect:', error);
* },
* onBeforeAttempt: (attempt, max, delay) => {
* console.log(`Reconnecting in ${delay}ms (${attempt}/${max})...`);
* }
* });
*
* // Start reconnection
* scheduler.schedule();
*
* // Cancel reconnection
* scheduler.cancel();
* ```
*/
export function createReconnectionScheduler(
config: ReconnectionSchedulerConfig
): ReconnectionScheduler {
let attempt = 0;
let active = false;
let timer: ReturnType<typeof setTimeout> | undefined;
const schedule = () => {
// Cancel any existing timer
if (timer) {
clearTimeout(timer);
timer = undefined;
}
// Check if max attempts exceeded
if (attempt >= config.maxAttempts) {
active = false;
config.onMaxAttemptsExceeded(
new Error(`Max reconnection attempts exceeded (${config.maxAttempts})`)
);
return;
}
// Calculate delay
const delay = calculateBackoff(
attempt,
config.backoffBase,
config.backoffMax,
config.jitter
);
// Notify before attempt
if (config.onBeforeAttempt) {
config.onBeforeAttempt(attempt + 1, config.maxAttempts, delay);
}
// Mark as active
active = true;
// Schedule reconnection
timer = setTimeout(async () => {
attempt++;
try {
await config.onReconnect();
// Success - reset scheduler
attempt = 0;
active = false;
} catch (error) {
// Failure - schedule next attempt
schedule();
}
}, delay);
};
const cancel = () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
active = false;
};
const reset = () => {
cancel();
attempt = 0;
};
return {
get attempt() {
return attempt;
},
get active() {
return active;
},
schedule,
cancel,
reset
};
}

View File

@@ -1,329 +0,0 @@
/**
* DurableService - Service with automatic TTL refresh
*
* Manages service publishing with automatic reconnection for incoming
* connections and TTL auto-refresh to prevent expiration.
*/
import { EventEmitter } from '../event-emitter.js';
import { ServicePool, type PoolStatus } from '../service-pool.js';
import type { RondevuOffers } from '../offers.js';
import { DurableChannel } from './channel.js';
import type {
DurableServiceConfig,
DurableServiceEvents,
ServiceInfo
} from './types.js';
/**
* Connection handler callback
*/
export type ConnectionHandler = (
channel: DurableChannel,
connectionId: string
) => void | Promise<void>;
/**
* Default configuration for durable services
*/
const DEFAULT_CONFIG = {
isPublic: false,
ttlRefreshMargin: 0.2,
poolSize: 1,
pollingInterval: 2000,
maxReconnectAttempts: 10,
reconnectBackoffBase: 1000,
reconnectBackoffMax: 30000,
reconnectJitter: 0.2,
connectionTimeout: 30000,
maxQueueSize: 1000,
maxMessageAge: 60000,
rtcConfig: {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
]
}
};
/**
* Durable service that automatically refreshes TTL and handles reconnections
*
* The DurableService manages service publishing and provides:
* - Automatic TTL refresh before expiration
* - Durable connections for incoming peers
* - Connection pooling for multiple simultaneous connections
* - High-level connection lifecycle events
*
* @example
* ```typescript
* const service = new DurableService(
* offersApi,
* (channel, connectionId) => {
* channel.on('message', (data) => {
* console.log(`Message from ${connectionId}:`, data);
* channel.send(`Echo: ${data}`);
* });
* },
* {
* username: 'alice',
* privateKey: keypair.privateKey,
* serviceFqn: 'chat@1.0.0',
* poolSize: 10
* }
* );
*
* service.on('published', (serviceId, uuid) => {
* console.log(`Service published: ${uuid}`);
* });
*
* service.on('connection', (connectionId) => {
* console.log(`New connection: ${connectionId}`);
* });
*
* await service.start();
* ```
*/
export class DurableService extends EventEmitter<DurableServiceEvents> {
readonly config: Required<DurableServiceConfig>;
private serviceId?: string;
private uuid?: string;
private expiresAt?: number;
private ttlRefreshTimer?: ReturnType<typeof setTimeout>;
private servicePool?: ServicePool;
private activeChannels: Map<string, DurableChannel> = new Map();
constructor(
private offersApi: RondevuOffers,
private baseUrl: string,
private credentials: { peerId: string; secret: string },
private handler: ConnectionHandler,
config: DurableServiceConfig
) {
super();
this.config = { ...DEFAULT_CONFIG, ...config } as Required<DurableServiceConfig>;
}
/**
* Start the service
*
* Publishes the service and begins accepting connections.
*
* @returns Service information
*/
async start(): Promise<ServiceInfo> {
if (this.servicePool) {
throw new Error('Service already started');
}
// Create and start service pool
this.servicePool = new ServicePool(
this.baseUrl,
this.credentials,
{
username: this.config.username,
privateKey: this.config.privateKey,
serviceFqn: this.config.serviceFqn,
rtcConfig: this.config.rtcConfig,
isPublic: this.config.isPublic,
metadata: this.config.metadata,
ttl: this.config.ttl,
poolSize: this.config.poolSize,
pollingInterval: this.config.pollingInterval,
handler: (channel, peer, connectionId) => {
this.handleNewConnection(channel, connectionId);
},
onPoolStatus: (status) => {
// Could emit pool status event if needed
},
onError: (error, context) => {
this.emit('error', error, context);
}
}
);
const handle = await this.servicePool.start();
// Store service info
this.serviceId = handle.serviceId;
this.uuid = handle.uuid;
this.expiresAt = Date.now() + (this.config.ttl || 300000); // Default 5 minutes
this.emit('published', this.serviceId, this.uuid);
// Schedule TTL refresh
this.scheduleRefresh();
return {
serviceId: this.serviceId,
uuid: this.uuid,
expiresAt: this.expiresAt
};
}
/**
* Stop the service
*
* Unpublishes the service and closes all active connections.
*/
async stop(): Promise<void> {
// Cancel TTL refresh
if (this.ttlRefreshTimer) {
clearTimeout(this.ttlRefreshTimer);
this.ttlRefreshTimer = undefined;
}
// Close all active channels
for (const channel of this.activeChannels.values()) {
channel.close();
}
this.activeChannels.clear();
// Stop service pool
if (this.servicePool) {
await this.servicePool.stop();
this.servicePool = undefined;
}
this.emit('closed');
}
/**
* Get list of active connection IDs
*/
getActiveConnections(): string[] {
return Array.from(this.activeChannels.keys());
}
/**
* Get service information
*/
getServiceInfo(): ServiceInfo | null {
if (!this.serviceId || !this.uuid || !this.expiresAt) {
return null;
}
return {
serviceId: this.serviceId,
uuid: this.uuid,
expiresAt: this.expiresAt
};
}
/**
* Schedule TTL refresh
*/
private scheduleRefresh(): void {
if (!this.expiresAt || !this.config.ttl) {
return;
}
// Cancel existing timer
if (this.ttlRefreshTimer) {
clearTimeout(this.ttlRefreshTimer);
}
// Calculate refresh time (default: refresh at 80% of TTL)
const timeUntilExpiry = this.expiresAt - Date.now();
const refreshMargin = timeUntilExpiry * this.config.ttlRefreshMargin;
const refreshTime = Math.max(0, timeUntilExpiry - refreshMargin);
// Schedule refresh
this.ttlRefreshTimer = setTimeout(() => {
this.refreshServiceTTL().catch(error => {
this.emit('error', error, 'ttl-refresh');
// Retry after short delay
setTimeout(() => this.scheduleRefresh(), 5000);
});
}, refreshTime);
}
/**
* Refresh service TTL
*/
private async refreshServiceTTL(): Promise<void> {
if (!this.serviceId || !this.uuid) {
return;
}
// Delete old service
await this.servicePool?.stop();
// Recreate service pool (this republishes the service)
this.servicePool = new ServicePool(
this.baseUrl,
this.credentials,
{
username: this.config.username,
privateKey: this.config.privateKey,
serviceFqn: this.config.serviceFqn,
rtcConfig: this.config.rtcConfig,
isPublic: this.config.isPublic,
metadata: this.config.metadata,
ttl: this.config.ttl,
poolSize: this.config.poolSize,
pollingInterval: this.config.pollingInterval,
handler: (channel, peer, connectionId) => {
this.handleNewConnection(channel, connectionId);
},
onPoolStatus: (status) => {
// Could emit pool status event if needed
},
onError: (error, context) => {
this.emit('error', error, context);
}
}
);
const handle = await this.servicePool.start();
// Update service info
this.serviceId = handle.serviceId;
this.uuid = handle.uuid;
this.expiresAt = Date.now() + (this.config.ttl || 300000);
this.emit('ttl-refreshed', this.expiresAt);
// Schedule next refresh
this.scheduleRefresh();
}
/**
* Handle new incoming connection
*/
private handleNewConnection(channel: RTCDataChannel, connectionId: string): void {
// Create durable channel
const durableChannel = new DurableChannel(channel.label, {
maxQueueSize: this.config.maxQueueSize,
maxMessageAge: this.config.maxMessageAge
});
// Attach to underlying channel
durableChannel.attachToChannel(channel);
// Track channel
this.activeChannels.set(connectionId, durableChannel);
// Setup cleanup on close
durableChannel.on('close', () => {
this.activeChannels.delete(connectionId);
this.emit('disconnection', connectionId);
});
// Emit connection event
this.emit('connection', connectionId);
// Invoke user handler
try {
const result = this.handler(durableChannel, connectionId);
if (result && typeof result.then === 'function') {
result.catch(error => {
this.emit('error', error, 'handler');
});
}
} catch (error) {
this.emit('error', error as Error, 'handler');
}
}
}

View File

@@ -1,184 +0,0 @@
/**
* Type definitions for durable WebRTC connections
*
* This module defines all interfaces, enums, and types used by the durable
* connection system for automatic reconnection and message queuing.
*/
/**
* Connection state enum
*/
export enum DurableConnectionState {
CONNECTING = 'connecting',
CONNECTED = 'connected',
RECONNECTING = 'reconnecting',
DISCONNECTED = 'disconnected',
FAILED = 'failed',
CLOSED = 'closed'
}
/**
* Channel state enum
*/
export enum DurableChannelState {
CONNECTING = 'connecting',
OPEN = 'open',
CLOSING = 'closing',
CLOSED = 'closed'
}
/**
* Configuration for durable connections
*/
export interface DurableConnectionConfig {
/** Maximum number of reconnection attempts (default: 10) */
maxReconnectAttempts?: number;
/** Base delay for exponential backoff in milliseconds (default: 1000) */
reconnectBackoffBase?: number;
/** Maximum delay between reconnection attempts in milliseconds (default: 30000) */
reconnectBackoffMax?: number;
/** Jitter factor for randomizing reconnection delays (default: 0.2 = ±20%) */
reconnectJitter?: number;
/** Timeout for initial connection attempt in milliseconds (default: 30000) */
connectionTimeout?: number;
/** Maximum number of messages to queue during disconnection (default: 1000) */
maxQueueSize?: number;
/** Maximum age of queued messages in milliseconds (default: 60000) */
maxMessageAge?: number;
/** WebRTC configuration */
rtcConfig?: RTCConfiguration;
}
/**
* Configuration for durable channels
*/
export interface DurableChannelConfig {
/** Maximum number of messages to queue (default: 1000) */
maxQueueSize?: number;
/** Maximum age of queued messages in milliseconds (default: 60000) */
maxMessageAge?: number;
/** Whether messages should be delivered in order (default: true) */
ordered?: boolean;
/** Maximum retransmits for unordered channels (default: undefined) */
maxRetransmits?: number;
}
/**
* Configuration for durable services
*/
export interface DurableServiceConfig extends DurableConnectionConfig {
/** Username that owns the service */
username: string;
/** Private key for signing service operations */
privateKey: string;
/** Fully qualified service name (e.g., com.example.chat@1.0.0) */
serviceFqn: string;
/** Whether the service is publicly discoverable (default: false) */
isPublic?: boolean;
/** Optional metadata for the service */
metadata?: Record<string, any>;
/** Time-to-live for service in milliseconds (default: server default) */
ttl?: number;
/** Margin before TTL expiry to trigger refresh (default: 0.2 = refresh at 80%) */
ttlRefreshMargin?: number;
/** Number of simultaneous open offers to maintain (default: 1) */
poolSize?: number;
/** Polling interval for checking answers in milliseconds (default: 2000) */
pollingInterval?: number;
}
/**
* Queued message structure
*/
export interface QueuedMessage {
/** Message data */
data: string | Blob | ArrayBuffer | ArrayBufferView;
/** Timestamp when message was enqueued */
enqueuedAt: number;
/** Unique message ID */
id: string;
}
/**
* Event type map for DurableConnection
*/
export interface DurableConnectionEvents extends Record<string, (...args: any[]) => void> {
'state': (state: DurableConnectionState, previousState: DurableConnectionState) => void;
'connected': () => void;
'reconnecting': (attempt: number, maxAttempts: number, nextRetryIn: number) => void;
'disconnected': () => void;
'failed': (error: Error, permanent: boolean) => void;
'closed': () => void;
}
/**
* Event type map for DurableChannel
*/
export interface DurableChannelEvents extends Record<string, (...args: any[]) => void> {
'open': () => void;
'message': (data: any) => void;
'error': (error: Error) => void;
'close': () => void;
'bufferedAmountLow': () => void;
'queueOverflow': (droppedCount: number) => void;
}
/**
* Event type map for DurableService
*/
export interface DurableServiceEvents extends Record<string, (...args: any[]) => void> {
'published': (serviceId: string, uuid: string) => void;
'connection': (connectionId: string) => void;
'disconnection': (connectionId: string) => void;
'ttl-refreshed': (expiresAt: number) => void;
'error': (error: Error, context: string) => void;
'closed': () => void;
}
/**
* Information about a durable connection
*/
export interface ConnectionInfo {
/** Username (for username-based connections) */
username?: string;
/** Service FQN (for service-based connections) */
serviceFqn?: string;
/** UUID (for UUID-based connections) */
uuid?: string;
}
/**
* Service information returned when service is published
*/
export interface ServiceInfo {
/** Service ID */
serviceId: string;
/** Service UUID for discovery */
uuid: string;
/** Expiration timestamp */
expiresAt: number;
}

104
src/event-bus.ts Normal file
View File

@@ -0,0 +1,104 @@
/**
* Type-safe EventBus with event name to payload type mapping
*/
type EventHandler<T = any> = (data: T) => void;
/**
* EventBus - Type-safe event emitter with inferred event data types
*
* @example
* interface MyEvents {
* 'user:connected': { userId: string; timestamp: number };
* 'user:disconnected': { userId: string };
* 'message:received': string;
* }
*
* const bus = new EventBus<MyEvents>();
*
* // TypeScript knows data is { userId: string; timestamp: number }
* bus.on('user:connected', (data) => {
* console.log(data.userId, data.timestamp);
* });
*
* // TypeScript knows data is string
* bus.on('message:received', (data) => {
* console.log(data.toUpperCase());
* });
*/
export class EventBus<TEvents extends Record<string, any>> {
private handlers: Map<keyof TEvents, Set<EventHandler>>;
constructor() {
this.handlers = new Map();
}
/**
* Subscribe to an event
*/
on<K extends keyof TEvents>(event: K, handler: EventHandler<TEvents[K]>): void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
}
this.handlers.get(event)!.add(handler);
}
/**
* Subscribe to an event once (auto-unsubscribe after first call)
*/
once<K extends keyof TEvents>(event: K, handler: EventHandler<TEvents[K]>): void {
const wrappedHandler = (data: TEvents[K]) => {
handler(data);
this.off(event, wrappedHandler);
};
this.on(event, wrappedHandler);
}
/**
* Unsubscribe from an event
*/
off<K extends keyof TEvents>(event: K, handler: EventHandler<TEvents[K]>): void {
const eventHandlers = this.handlers.get(event);
if (eventHandlers) {
eventHandlers.delete(handler);
if (eventHandlers.size === 0) {
this.handlers.delete(event);
}
}
}
/**
* Emit an event with data
*/
emit<K extends keyof TEvents>(event: K, data: TEvents[K]): void {
const eventHandlers = this.handlers.get(event);
if (eventHandlers) {
eventHandlers.forEach(handler => handler(data));
}
}
/**
* Remove all handlers for a specific event, or all handlers if no event specified
*/
clear<K extends keyof TEvents>(event?: K): void {
if (event !== undefined) {
this.handlers.delete(event);
} else {
this.handlers.clear();
}
}
/**
* Get count of handlers for an event
*/
listenerCount<K extends keyof TEvents>(event: K): number {
return this.handlers.get(event)?.size ?? 0;
}
/**
* Get all event names that have handlers
*/
eventNames(): Array<keyof TEvents> {
return Array.from(this.handlers.keys());
}
}

View File

@@ -1,109 +0,0 @@
/**
* Type-safe EventEmitter implementation for browser and Node.js compatibility
*
* @template EventMap - A type mapping event names to their handler signatures
*
* @example
* ```typescript
* interface MyEvents {
* 'data': (value: string) => void;
* 'error': (error: Error) => void;
* 'ready': () => void;
* }
*
* class MyClass extends EventEmitter<MyEvents> {
* doSomething() {
* this.emit('data', 'hello'); // Type-safe!
* this.emit('error', new Error('oops')); // Type-safe!
* this.emit('ready'); // Type-safe!
* }
* }
*
* const instance = new MyClass();
* instance.on('data', (value) => {
* console.log(value.toUpperCase()); // 'value' is typed as string
* });
* ```
*/
export class EventEmitter<EventMap extends Record<string, (...args: any[]) => void>> {
private events: Map<keyof EventMap, Set<Function>> = new Map();
/**
* Register an event listener
*/
on<K extends keyof EventMap>(event: K, listener: EventMap[K]): this {
if (!this.events.has(event)) {
this.events.set(event, new Set());
}
this.events.get(event)!.add(listener);
return this;
}
/**
* Register a one-time event listener
*/
once<K extends keyof EventMap>(event: K, listener: EventMap[K]): this {
const onceWrapper = (...args: Parameters<EventMap[K]>) => {
this.off(event, onceWrapper as EventMap[K]);
listener(...args);
};
return this.on(event, onceWrapper as EventMap[K]);
}
/**
* Remove an event listener
*/
off<K extends keyof EventMap>(event: K, listener: EventMap[K]): this {
const listeners = this.events.get(event);
if (listeners) {
listeners.delete(listener);
if (listeners.size === 0) {
this.events.delete(event);
}
}
return this;
}
/**
* Emit an event
*/
protected emit<K extends keyof EventMap>(
event: K,
...args: Parameters<EventMap[K]>
): boolean {
const listeners = this.events.get(event);
if (!listeners || listeners.size === 0) {
return false;
}
listeners.forEach(listener => {
try {
(listener as EventMap[K])(...args);
} catch (err) {
console.error(`Error in ${String(event)} event listener:`, err);
}
});
return true;
}
/**
* Remove all listeners for an event (or all events if not specified)
*/
removeAllListeners<K extends keyof EventMap>(event?: K): this {
if (event !== undefined) {
this.events.delete(event);
} else {
this.events.clear();
}
return this;
}
/**
* Get listener count for an event
*/
listenerCount<K extends keyof EventMap>(event: K): number {
const listeners = this.events.get(event);
return listeners ? listeners.size : 0;
}
}

View File

@@ -1,36 +1,16 @@
/**
* @xtr-dev/rondevu-client
* WebRTC peer signaling and discovery client with durable connections
* WebRTC peer signaling client
*/
// Export main client class
export { Rondevu } from './rondevu.js';
export type { RondevuOptions } from './rondevu.js';
export { ConnectionManager } from './connection-manager.js';
export { EventBus } from './event-bus.js';
// Export authentication
export { RondevuAuth } from './auth.js';
export type { Credentials, FetchFunction } from './auth.js';
// Export username API
export { RondevuUsername } from './usernames.js';
export type { UsernameClaimResult, UsernameCheckResult } from './usernames.js';
// Export durable connection APIs
export { DurableConnection } from './durable/connection.js';
export { DurableChannel } from './durable/channel.js';
export { DurableService } from './durable/service.js';
// Export durable connection types
// Export types
export type {
DurableConnectionState,
DurableChannelState,
DurableConnectionConfig,
DurableChannelConfig,
DurableServiceConfig,
QueuedMessage,
DurableConnectionEvents,
DurableChannelEvents,
DurableServiceEvents,
ConnectionInfo,
ServiceInfo
} from './durable/types.js';
ConnectionIdentity,
ConnectionState,
ConnectionInterface,
Connection,
QueueMessageOptions
} from './types.js';

View File

@@ -1,205 +0,0 @@
import { RondevuOffers, Offer } from './offers.js';
/**
* Represents an offer that has been answered
*/
export interface AnsweredOffer {
offerId: string;
answererId: string;
sdp: string; // Answer SDP
peerConnection: RTCPeerConnection; // Original peer connection
dataChannel?: RTCDataChannel; // Data channel created with offer
answeredAt: number;
}
/**
* Configuration options for the offer pool
*/
export interface OfferPoolOptions {
/** Number of simultaneous open offers to maintain */
poolSize: number;
/** Polling interval in milliseconds (default: 2000ms) */
pollingInterval?: number;
/** Callback invoked when an offer is answered */
onAnswered: (answer: AnsweredOffer) => Promise<void>;
/** Callback to create new offers when refilling the pool */
onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }>;
/** Error handler for pool operations */
onError: (error: Error, context: string) => void;
}
/**
* Manages a pool of offers with automatic polling and refill
*
* The OfferPool maintains a configurable number of simultaneous offers,
* polls for answers periodically, and automatically refills the pool
* when offers are consumed.
*/
export class OfferPool {
private offers: Map<string, Offer> = new Map();
private peerConnections: Map<string, RTCPeerConnection> = new Map();
private dataChannels: Map<string, RTCDataChannel> = new Map();
private polling: boolean = false;
private pollingTimer?: ReturnType<typeof setInterval>;
private lastPollTime: number = 0;
private readonly pollingInterval: number;
constructor(
private offersApi: RondevuOffers,
private options: OfferPoolOptions
) {
this.pollingInterval = options.pollingInterval || 2000;
}
/**
* Add offers to the pool with their peer connections and data channels
*/
async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[], dataChannels?: RTCDataChannel[]): Promise<void> {
for (let i = 0; i < offers.length; i++) {
const offer = offers[i];
this.offers.set(offer.id, offer);
if (peerConnections && peerConnections[i]) {
this.peerConnections.set(offer.id, peerConnections[i]);
}
if (dataChannels && dataChannels[i]) {
this.dataChannels.set(offer.id, dataChannels[i]);
}
}
}
/**
* Start polling for answers
*/
async start(): Promise<void> {
if (this.polling) {
return;
}
this.polling = true;
// Do an immediate poll
await this.poll().catch((error) => {
this.options.onError(error, 'initial-poll');
});
// Start polling interval
this.pollingTimer = setInterval(async () => {
if (this.polling) {
await this.poll().catch((error) => {
this.options.onError(error, 'poll');
});
}
}, this.pollingInterval);
}
/**
* Stop polling for answers
*/
async stop(): Promise<void> {
this.polling = false;
if (this.pollingTimer) {
clearInterval(this.pollingTimer);
this.pollingTimer = undefined;
}
}
/**
* Poll for answers and refill the pool if needed
*/
private async poll(): Promise<void> {
try {
// Get all answers from server
const answers = await this.offersApi.getAnswers();
// Filter for our pool's offers
const myAnswers = answers.filter(a => this.offers.has(a.offerId));
// Process each answer
for (const answer of myAnswers) {
// Get the original offer, peer connection, and data channel
const offer = this.offers.get(answer.offerId);
const pc = this.peerConnections.get(answer.offerId);
const channel = this.dataChannels.get(answer.offerId);
if (!offer || !pc) {
continue; // Offer or peer connection already consumed, skip
}
// Remove from pool BEFORE processing to prevent duplicate processing
this.offers.delete(answer.offerId);
this.peerConnections.delete(answer.offerId);
this.dataChannels.delete(answer.offerId);
// Notify ServicePool with answer, original peer connection, and data channel
await this.options.onAnswered({
offerId: answer.offerId,
answererId: answer.answererId,
sdp: answer.sdp,
peerConnection: pc,
dataChannel: channel,
answeredAt: answer.answeredAt
});
}
// Immediate refill if below pool size
if (this.offers.size < this.options.poolSize) {
const needed = this.options.poolSize - this.offers.size;
try {
const result = await this.options.onRefill(needed);
await this.addOffers(result.offers, result.peerConnections, result.dataChannels);
} catch (refillError) {
this.options.onError(
refillError as Error,
'refill'
);
}
}
this.lastPollTime = Date.now();
} catch (error) {
// Don't crash the pool on errors - let error handler deal with it
this.options.onError(error as Error, 'poll');
}
}
/**
* Get the current number of active offers in the pool
*/
getActiveOfferCount(): number {
return this.offers.size;
}
/**
* Get all active offer IDs
*/
getActiveOfferIds(): string[] {
return Array.from(this.offers.keys());
}
/**
* Get all active peer connections
*/
getActivePeerConnections(): RTCPeerConnection[] {
return Array.from(this.peerConnections.values());
}
/**
* Get the last poll timestamp
*/
getLastPollTime(): number {
return this.lastPollTime;
}
/**
* Check if the pool is currently polling
*/
isPolling(): boolean {
return this.polling;
}
}

View File

@@ -1,321 +0,0 @@
import { Credentials, FetchFunction } from './auth.js';
import { RondevuAuth } from './auth.js';
// Declare Buffer for Node.js compatibility
declare const Buffer: any;
export interface CreateOfferRequest {
sdp: string;
topics: string[];
ttl?: number;
secret?: string;
info?: string;
}
export interface Offer {
id: string;
peerId: string;
sdp: string;
topics: string[];
createdAt?: number;
expiresAt: number;
lastSeen: number;
secret?: string;
hasSecret?: boolean;
info?: string;
answererPeerId?: string;
answerSdp?: string;
answeredAt?: number;
}
export interface IceCandidate {
candidate: any; // Full candidate object as plain JSON - don't enforce structure
peerId: string;
role: 'offerer' | 'answerer';
createdAt: number;
}
export interface TopicInfo {
topic: string;
activePeers: number;
}
export class RondevuOffers {
private fetchFn: FetchFunction;
constructor(
private baseUrl: string,
private credentials: Credentials,
fetchFn?: FetchFunction
) {
// Use provided fetch or fall back to global fetch
this.fetchFn = fetchFn || ((...args) => {
if (typeof globalThis.fetch === 'function') {
return globalThis.fetch(...args);
}
throw new Error(
'fetch is not available. Please provide a fetch implementation in the constructor options.'
);
});
}
/**
* Create one or more offers
*/
async create(offers: CreateOfferRequest[]): Promise<Offer[]> {
const response = await this.fetchFn(`${this.baseUrl}/offers`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
body: JSON.stringify({ offers }),
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to create offers: ${error.error || response.statusText}`);
}
const data = await response.json();
return data.offers;
}
/**
* Find offers by topic with optional bloom filter
*/
async findByTopic(
topic: string,
options?: {
bloomFilter?: Uint8Array;
limit?: number;
}
): Promise<Offer[]> {
const params = new URLSearchParams();
if (options?.bloomFilter) {
// Convert to base64
const binaryString = String.fromCharCode(...Array.from(options.bloomFilter));
const base64 = typeof btoa !== 'undefined'
? btoa(binaryString)
: (typeof Buffer !== 'undefined' ? Buffer.from(options.bloomFilter).toString('base64') : '');
params.set('bloom', base64);
}
if (options?.limit) {
params.set('limit', options.limit.toString());
}
const url = `${this.baseUrl}/offers/by-topic/${encodeURIComponent(topic)}${
params.toString() ? '?' + params.toString() : ''
}`;
const response = await this.fetchFn(url, {
method: 'GET',
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to find offers: ${error.error || response.statusText}`);
}
const data = await response.json();
return data.offers;
}
/**
* Get all offers from a specific peer
*/
async getByPeerId(peerId: string): Promise<{
offers: Offer[];
topics: string[];
}> {
const response = await this.fetchFn(`${this.baseUrl}/peers/${encodeURIComponent(peerId)}/offers`, {
method: 'GET',
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to get peer offers: ${error.error || response.statusText}`);
}
return await response.json();
}
/**
* Get topics with active peer counts (paginated)
*/
async getTopics(options?: {
limit?: number;
offset?: number;
startsWith?: string;
}): Promise<{
topics: TopicInfo[];
total: number;
limit: number;
offset: number;
startsWith?: string;
}> {
const params = new URLSearchParams();
if (options?.limit) {
params.set('limit', options.limit.toString());
}
if (options?.offset) {
params.set('offset', options.offset.toString());
}
if (options?.startsWith) {
params.set('startsWith', options.startsWith);
}
const url = `${this.baseUrl}/topics${
params.toString() ? '?' + params.toString() : ''
}`;
const response = await this.fetchFn(url, {
method: 'GET',
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to get topics: ${error.error || response.statusText}`);
}
return await response.json();
}
/**
* Get own offers
*/
async getMine(): Promise<Offer[]> {
const response = await this.fetchFn(`${this.baseUrl}/offers/mine`, {
method: 'GET',
headers: {
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to get own offers: ${error.error || response.statusText}`);
}
const data = await response.json();
return data.offers;
}
/**
* Delete an offer
*/
async delete(offerId: string): Promise<void> {
const response = await this.fetchFn(`${this.baseUrl}/offers/${encodeURIComponent(offerId)}`, {
method: 'DELETE',
headers: {
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to delete offer: ${error.error || response.statusText}`);
}
}
/**
* Answer an offer
*/
async answer(offerId: string, sdp: string, secret?: string): Promise<void> {
const response = await this.fetchFn(`${this.baseUrl}/offers/${encodeURIComponent(offerId)}/answer`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
body: JSON.stringify({ sdp, secret }),
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to answer offer: ${error.error || response.statusText}`);
}
}
/**
* Get answers to your offers
*/
async getAnswers(): Promise<Array<{
offerId: string;
answererId: string;
sdp: string;
answeredAt: number;
topics: string[];
}>> {
const response = await this.fetchFn(`${this.baseUrl}/offers/answers`, {
method: 'GET',
headers: {
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to get answers: ${error.error || response.statusText}`);
}
const data = await response.json();
return data.answers;
}
/**
* Post ICE candidates for an offer
*/
async addIceCandidates(
offerId: string,
candidates: any[]
): Promise<void> {
const response = await this.fetchFn(`${this.baseUrl}/offers/${encodeURIComponent(offerId)}/ice-candidates`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
body: JSON.stringify({ candidates }),
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to add ICE candidates: ${error.error || response.statusText}`);
}
}
/**
* Get ICE candidates for an offer
*/
async getIceCandidates(offerId: string, since?: number): Promise<IceCandidate[]> {
const params = new URLSearchParams();
if (since !== undefined) {
params.set('since', since.toString());
}
const url = `${this.baseUrl}/offers/${encodeURIComponent(offerId)}/ice-candidates${
params.toString() ? '?' + params.toString() : ''
}`;
const response = await this.fetchFn(url, {
method: 'GET',
headers: {
Authorization: RondevuAuth.createAuthHeader(this.credentials),
},
});
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }));
throw new Error(`Failed to get ICE candidates: ${error.error || response.statusText}`);
}
const data = await response.json();
return data.candidates;
}
}

View File

@@ -1,49 +0,0 @@
import { PeerState } from './state.js';
import type { PeerOptions } from './types.js';
import type RondevuPeer from './index.js';
/**
* Answering an offer and sending to server
*/
export class AnsweringState extends PeerState {
constructor(peer: RondevuPeer) {
super(peer);
}
get name() { return 'answering'; }
async answer(offerId: string, offerSdp: string, options: PeerOptions): Promise<void> {
try {
this.peer.role = 'answerer';
this.peer.offerId = offerId;
// Set remote description
await this.peer.pc.setRemoteDescription({
type: 'offer',
sdp: offerSdp
});
// Create answer
const answer = await this.peer.pc.createAnswer();
// Send answer to server BEFORE setLocalDescription
// This registers us as the answerer so ICE candidates will be accepted
await this.peer.offersApi.answer(offerId, answer.sdp!, options.secret);
// Enable trickle ICE - set up handler before ICE gathering starts
this.setupIceCandidateHandler();
// Set local description - ICE gathering starts here
// Server already knows we're the answerer, so candidates will be accepted
await this.peer.pc.setLocalDescription(answer);
// Transition to exchanging ICE
const { ExchangingIceState } = await import('./exchanging-ice-state.js');
this.peer.setState(new ExchangingIceState(this.peer, offerId, options));
} catch (error) {
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(this.peer, error as Error));
throw error;
}
}
}

View File

@@ -1,12 +0,0 @@
import { PeerState } from './state.js';
/**
* Closed state - connection has been terminated
*/
export class ClosedState extends PeerState {
get name() { return 'closed'; }
cleanup(): void {
this.peer.pc.close();
}
}

View File

@@ -1,13 +0,0 @@
import { PeerState } from './state.js';
/**
* Connected state - peer connection is established
*/
export class ConnectedState extends PeerState {
get name() { return 'connected'; }
cleanup(): void {
// Keep connection alive, but stop any polling
// The peer connection will handle disconnects via onconnectionstatechange
}
}

View File

@@ -1,57 +0,0 @@
import { PeerState } from './state.js';
import type { PeerOptions } from './types.js';
import type RondevuPeer from './index.js';
/**
* Creating offer and sending to server
*/
export class CreatingOfferState extends PeerState {
constructor(peer: RondevuPeer, private options: PeerOptions) {
super(peer);
}
get name() { return 'creating-offer'; }
async createOffer(options: PeerOptions): Promise<string> {
try {
this.peer.role = 'offerer';
// Create data channel if requested
if (options.createDataChannel !== false) {
const channel = this.peer.pc.createDataChannel(
options.dataChannelLabel || 'data'
);
this.peer.emitEvent('datachannel', channel);
}
// Enable trickle ICE - set up handler before ICE gathering starts
// Handler will check this.peer.offerId before sending
this.setupIceCandidateHandler();
// Create WebRTC offer
const offer = await this.peer.pc.createOffer();
await this.peer.pc.setLocalDescription(offer); // ICE gathering starts here
// Send offer to server immediately (don't wait for ICE)
const offers = await this.peer.offersApi.create([{
sdp: offer.sdp!,
topics: options.topics,
ttl: options.ttl || 300000,
secret: options.secret
}]);
const offerId = offers[0].id;
this.peer.offerId = offerId; // Now handler can send candidates
// Transition to waiting for answer
const { WaitingForAnswerState } = await import('./waiting-for-answer-state.js');
this.peer.setState(new WaitingForAnswerState(this.peer, offerId, options));
return offerId;
} catch (error) {
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(this.peer, error as Error));
throw error;
}
}
}

View File

@@ -1,74 +0,0 @@
import { PeerState } from './state.js';
import type { PeerOptions } from './types.js';
import type RondevuPeer from './index.js';
/**
* Exchanging ICE candidates and waiting for connection
*/
export class ExchangingIceState extends PeerState {
private pollingInterval?: ReturnType<typeof setInterval>;
private timeout?: ReturnType<typeof setTimeout>;
private lastIceTimestamp = 0;
constructor(
peer: RondevuPeer,
private offerId: string,
private options: PeerOptions
) {
super(peer);
this.startPolling();
}
get name() { return 'exchanging-ice'; }
private startPolling(): void {
const connectionTimeout = this.options.timeouts?.iceConnection || 30000;
this.timeout = setTimeout(async () => {
this.cleanup();
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(
this.peer,
new Error('ICE connection timeout')
));
}, connectionTimeout);
this.pollingInterval = setInterval(async () => {
try {
const candidates = await this.peer.offersApi.getIceCandidates(
this.offerId,
this.lastIceTimestamp
);
for (const cand of candidates) {
if (cand.candidate && cand.candidate.candidate && cand.candidate.candidate !== '') {
try {
await this.peer.pc.addIceCandidate(new this.peer.RTCIceCandidate(cand.candidate));
this.lastIceTimestamp = cand.createdAt;
} catch (err) {
console.warn('Failed to add ICE candidate:', err);
this.lastIceTimestamp = cand.createdAt;
}
} else {
this.lastIceTimestamp = cand.createdAt;
}
}
} catch (err) {
console.error('Error polling for ICE candidates:', err);
if (err instanceof Error && err.message.includes('not found')) {
this.cleanup();
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(
this.peer,
new Error('Offer expired or not found')
));
}
}
}, 1000);
}
cleanup(): void {
if (this.pollingInterval) clearInterval(this.pollingInterval);
if (this.timeout) clearTimeout(this.timeout);
}
}

View File

@@ -1,18 +0,0 @@
import { PeerState } from './state.js';
/**
* Failed state - connection attempt failed
*/
export class FailedState extends PeerState {
constructor(peer: any, private error: Error) {
super(peer);
peer.emitEvent('failed', error);
}
get name() { return 'failed'; }
cleanup(): void {
// Connection is failed, clean up resources
this.peer.pc.close();
}
}

View File

@@ -1,18 +0,0 @@
import { PeerState } from './state.js';
import type { PeerOptions } from './types.js';
export class IdleState extends PeerState {
get name() { return 'idle'; }
async createOffer(options: PeerOptions): Promise<string> {
const { CreatingOfferState } = await import('./creating-offer-state.js');
this.peer.setState(new CreatingOfferState(this.peer, options));
return this.peer.state.createOffer(options);
}
async answer(offerId: string, offerSdp: string, options: PeerOptions): Promise<void> {
const { AnsweringState } = await import('./answering-state.js');
this.peer.setState(new AnsweringState(this.peer));
return this.peer.state.answer(offerId, offerSdp, options);
}
}

View File

@@ -1,214 +0,0 @@
import { RondevuOffers } from '../offers.js';
import { EventEmitter } from '../event-emitter.js';
import type { PeerOptions, PeerEvents } from './types.js';
import { PeerState } from './state.js';
import { IdleState } from './idle-state.js';
import { CreatingOfferState } from './creating-offer-state.js';
import { WaitingForAnswerState } from './waiting-for-answer-state.js';
import { AnsweringState } from './answering-state.js';
import { ExchangingIceState } from './exchanging-ice-state.js';
import { ConnectedState } from './connected-state.js';
import { FailedState } from './failed-state.js';
import { ClosedState } from './closed-state.js';
// Re-export types for external consumers
export type { PeerTimeouts, PeerOptions, PeerEvents } from './types.js';
/**
* High-level WebRTC peer connection manager with state-based lifecycle
* Handles offer/answer exchange, ICE candidates, timeouts, and error recovery
*/
export default class RondevuPeer extends EventEmitter<PeerEvents> {
pc: RTCPeerConnection;
offersApi: RondevuOffers;
offerId?: string;
role?: 'offerer' | 'answerer';
// WebRTC polyfills for Node.js compatibility
RTCPeerConnection: typeof RTCPeerConnection;
RTCSessionDescription: typeof RTCSessionDescription;
RTCIceCandidate: typeof RTCIceCandidate;
private _state: PeerState;
// Event handler references for cleanup
private connectionStateChangeHandler?: () => void;
private dataChannelHandler?: (event: RTCDataChannelEvent) => void;
private trackHandler?: (event: RTCTrackEvent) => void;
private iceCandidateErrorHandler?: (event: Event) => void;
/**
* Current connection state name
*/
get stateName(): string {
return this._state.name;
}
/**
* Current state object (internal use)
*/
get state(): PeerState {
return this._state;
}
/**
* RTCPeerConnection state
*/
get connectionState(): RTCPeerConnectionState {
return this.pc.connectionState;
}
constructor(
offersApi: RondevuOffers,
rtcConfig: RTCConfiguration = {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
]
},
existingPeerConnection?: RTCPeerConnection,
rtcPeerConnection?: typeof RTCPeerConnection,
rtcSessionDescription?: typeof RTCSessionDescription,
rtcIceCandidate?: typeof RTCIceCandidate
) {
super();
this.offersApi = offersApi;
// Use provided polyfills or fall back to globals
this.RTCPeerConnection = rtcPeerConnection || (typeof globalThis.RTCPeerConnection !== 'undefined'
? globalThis.RTCPeerConnection
: (() => {
throw new Error('RTCPeerConnection is not available. Please provide it in the Rondevu constructor options for Node.js environments.');
}) as any);
this.RTCSessionDescription = rtcSessionDescription || (typeof globalThis.RTCSessionDescription !== 'undefined'
? globalThis.RTCSessionDescription
: (() => {
throw new Error('RTCSessionDescription is not available. Please provide it in the Rondevu constructor options for Node.js environments.');
}) as any);
this.RTCIceCandidate = rtcIceCandidate || (typeof globalThis.RTCIceCandidate !== 'undefined'
? globalThis.RTCIceCandidate
: (() => {
throw new Error('RTCIceCandidate is not available. Please provide it in the Rondevu constructor options for Node.js environments.');
}) as any);
// Use existing peer connection if provided, otherwise create new one
this.pc = existingPeerConnection || new this.RTCPeerConnection(rtcConfig);
this._state = new IdleState(this);
this.setupPeerConnection();
}
/**
* Set up peer connection event handlers
*/
private setupPeerConnection(): void {
this.connectionStateChangeHandler = () => {
switch (this.pc.connectionState) {
case 'connected':
this.setState(new ConnectedState(this));
this.emitEvent('connected');
break;
case 'disconnected':
this.emitEvent('disconnected');
break;
case 'failed':
this.setState(new FailedState(this, new Error('Connection failed')));
break;
case 'closed':
this.setState(new ClosedState(this));
this.emitEvent('disconnected');
break;
}
};
this.pc.addEventListener('connectionstatechange', this.connectionStateChangeHandler);
this.dataChannelHandler = (event: RTCDataChannelEvent) => {
this.emitEvent('datachannel', event.channel);
};
this.pc.addEventListener('datachannel', this.dataChannelHandler);
this.trackHandler = (event: RTCTrackEvent) => {
this.emitEvent('track', event);
};
this.pc.addEventListener('track', this.trackHandler);
this.iceCandidateErrorHandler = (event: Event) => {
console.error('ICE candidate error:', event);
};
this.pc.addEventListener('icecandidateerror', this.iceCandidateErrorHandler);
}
/**
* Set new state and emit state change event
*/
setState(newState: PeerState): void {
this._state.cleanup();
this._state = newState;
this.emitEvent('state', newState.name);
}
/**
* Emit event (exposed for PeerState classes)
* @internal
*/
emitEvent<K extends keyof PeerEvents>(
event: K,
...args: Parameters<PeerEvents[K]>
): void {
this.emit(event, ...args);
}
/**
* Create an offer and advertise on topics
*/
async createOffer(options: PeerOptions): Promise<string> {
return this._state.createOffer(options);
}
/**
* Answer an existing offer
*/
async answer(offerId: string, offerSdp: string, options: PeerOptions): Promise<void> {
return this._state.answer(offerId, offerSdp, options);
}
/**
* Add a media track to the connection
*/
addTrack(track: MediaStreamTrack, ...streams: MediaStream[]): RTCRtpSender {
return this.pc.addTrack(track, ...streams);
}
/**
* Create a data channel for sending and receiving arbitrary data
* This should typically be called by the offerer before creating the offer
* The answerer will receive the channel via the 'datachannel' event
*/
createDataChannel(label: string, options?: RTCDataChannelInit): RTCDataChannel {
return this.pc.createDataChannel(label, options);
}
/**
* Close the connection and clean up
*/
async close(): Promise<void> {
// Remove RTCPeerConnection event listeners
if (this.connectionStateChangeHandler) {
this.pc.removeEventListener('connectionstatechange', this.connectionStateChangeHandler);
}
if (this.dataChannelHandler) {
this.pc.removeEventListener('datachannel', this.dataChannelHandler);
}
if (this.trackHandler) {
this.pc.removeEventListener('track', this.trackHandler);
}
if (this.iceCandidateErrorHandler) {
this.pc.removeEventListener('icecandidateerror', this.iceCandidateErrorHandler);
}
await this._state.close();
this.removeAllListeners();
}
}

View File

@@ -1,66 +0,0 @@
import type { PeerOptions } from './types.js';
import type RondevuPeer from './index.js';
/**
* Base class for peer connection states
* Implements the State pattern for managing WebRTC connection lifecycle
*/
export abstract class PeerState {
protected iceCandidateHandler?: (event: RTCPeerConnectionIceEvent) => void;
constructor(protected peer: RondevuPeer) {}
abstract get name(): string;
async createOffer(options: PeerOptions): Promise<string> {
throw new Error(`Cannot create offer in ${this.name} state`);
}
async answer(offerId: string, offerSdp: string, options: PeerOptions): Promise<void> {
throw new Error(`Cannot answer in ${this.name} state`);
}
async handleAnswer(sdp: string): Promise<void> {
throw new Error(`Cannot handle answer in ${this.name} state`);
}
async handleIceCandidate(candidate: any): Promise<void> {
// ICE candidates can arrive in multiple states, so default is to add them
if (this.peer.pc.remoteDescription) {
await this.peer.pc.addIceCandidate(new this.peer.RTCIceCandidate(candidate));
}
}
/**
* Setup trickle ICE candidate handler
* Sends local ICE candidates to server as they are discovered
*/
protected setupIceCandidateHandler(): void {
this.iceCandidateHandler = async (event: RTCPeerConnectionIceEvent) => {
if (event.candidate && this.peer.offerId) {
const candidateData = event.candidate.toJSON();
if (candidateData.candidate && candidateData.candidate !== '') {
try {
await this.peer.offersApi.addIceCandidates(this.peer.offerId, [candidateData]);
} catch (err) {
console.error('Error sending ICE candidate:', err);
}
}
}
};
this.peer.pc.addEventListener('icecandidate', this.iceCandidateHandler);
}
cleanup(): void {
// Clean up ICE candidate handler if it exists
if (this.iceCandidateHandler) {
this.peer.pc.removeEventListener('icecandidate', this.iceCandidateHandler);
}
}
async close(): Promise<void> {
this.cleanup();
const { ClosedState } = await import('./closed-state.js');
this.peer.setState(new ClosedState(this.peer));
}
}

View File

@@ -1,45 +0,0 @@
/**
* Timeout configurations for different connection phases
*/
export interface PeerTimeouts {
/** Timeout for ICE gathering (default: 10000ms) */
iceGathering?: number;
/** Timeout for waiting for answer (default: 30000ms) */
waitingForAnswer?: number;
/** Timeout for creating answer (default: 10000ms) */
creatingAnswer?: number;
/** Timeout for ICE connection (default: 30000ms) */
iceConnection?: number;
}
/**
* Options for creating a peer connection
*/
export interface PeerOptions {
/** RTCConfiguration for the peer connection */
rtcConfig?: RTCConfiguration;
/** Topics to advertise this connection under */
topics: string[];
/** How long the offer should live (milliseconds) */
ttl?: number;
/** Optional secret to protect the offer (max 128 characters) */
secret?: string;
/** Whether to create a data channel automatically (for offerer) */
createDataChannel?: boolean;
/** Label for the automatically created data channel */
dataChannelLabel?: string;
/** Timeout configurations */
timeouts?: PeerTimeouts;
}
/**
* Events emitted by RondevuPeer
*/
export interface PeerEvents extends Record<string, (...args: any[]) => void> {
'state': (state: string) => void;
'connected': () => void;
'disconnected': () => void;
'failed': (error: Error) => void;
'datachannel': (channel: RTCDataChannel) => void;
'track': (event: RTCTrackEvent) => void;
}

View File

@@ -1,78 +0,0 @@
import { PeerState } from './state.js';
import type { PeerOptions } from './types.js';
import type RondevuPeer from './index.js';
/**
* Waiting for answer from another peer
*/
export class WaitingForAnswerState extends PeerState {
private pollingInterval?: ReturnType<typeof setInterval>;
private timeout?: ReturnType<typeof setTimeout>;
constructor(
peer: RondevuPeer,
private offerId: string,
private options: PeerOptions
) {
super(peer);
this.startPolling();
}
get name() { return 'waiting-for-answer'; }
private startPolling(): void {
const answerTimeout = this.options.timeouts?.waitingForAnswer || 30000;
this.timeout = setTimeout(async () => {
this.cleanup();
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(
this.peer,
new Error('Timeout waiting for answer')
));
}, answerTimeout);
this.pollingInterval = setInterval(async () => {
try {
const answers = await this.peer.offersApi.getAnswers();
const myAnswer = answers.find((a: any) => a.offerId === this.offerId);
if (myAnswer) {
this.cleanup();
await this.handleAnswer(myAnswer.sdp);
}
} catch (err) {
console.error('Error polling for answers:', err);
if (err instanceof Error && err.message.includes('not found')) {
this.cleanup();
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(
this.peer,
new Error('Offer expired or not found')
));
}
}
}, 2000);
}
async handleAnswer(sdp: string): Promise<void> {
try {
await this.peer.pc.setRemoteDescription({
type: 'answer',
sdp
});
// Transition to exchanging ICE
const { ExchangingIceState } = await import('./exchanging-ice-state.js');
this.peer.setState(new ExchangingIceState(this.peer, this.offerId, this.options));
} catch (error) {
const { FailedState } = await import('./failed-state.js');
this.peer.setState(new FailedState(this.peer, error as Error));
}
}
cleanup(): void {
if (this.pollingInterval) clearInterval(this.pollingInterval);
if (this.timeout) clearTimeout(this.timeout);
}
}

View File

@@ -1,296 +0,0 @@
import { RondevuAuth, Credentials, FetchFunction } from './auth.js';
import { RondevuOffers } from './offers.js';
import { RondevuUsername } from './usernames.js';
import RondevuPeer from './peer/index.js';
import { DurableService } from './durable/service.js';
import { DurableConnection } from './durable/connection.js';
import { DurableChannel } from './durable/channel.js';
import type {
DurableServiceConfig,
DurableConnectionConfig,
ConnectionInfo
} from './durable/types.js';
export interface RondevuOptions {
/**
* Base URL of the Rondevu server
* @default 'https://api.ronde.vu'
*/
baseUrl?: string;
/**
* Existing credentials (peerId + secret) to skip registration
*/
credentials?: Credentials;
/**
* Custom fetch implementation for environments without native fetch
* (Node.js < 18, some Workers environments, etc.)
*
* @example Node.js
* ```typescript
* import fetch from 'node-fetch';
* const client = new Rondevu({ fetch });
* ```
*/
fetch?: FetchFunction;
/**
* Custom RTCPeerConnection implementation for Node.js environments
* Required when using in Node.js with wrtc or similar polyfills
*
* @example Node.js with wrtc
* ```typescript
* import { RTCPeerConnection } from 'wrtc';
* const client = new Rondevu({ RTCPeerConnection });
* ```
*/
RTCPeerConnection?: typeof RTCPeerConnection;
/**
* Custom RTCSessionDescription implementation for Node.js environments
* Required when using in Node.js with wrtc or similar polyfills
*
* @example Node.js with wrtc
* ```typescript
* import { RTCSessionDescription } from 'wrtc';
* const client = new Rondevu({ RTCSessionDescription });
* ```
*/
RTCSessionDescription?: typeof RTCSessionDescription;
/**
* Custom RTCIceCandidate implementation for Node.js environments
* Required when using in Node.js with wrtc or similar polyfills
*
* @example Node.js with wrtc
* ```typescript
* import { RTCIceCandidate } from 'wrtc';
* const client = new Rondevu({ RTCIceCandidate });
* ```
*/
RTCIceCandidate?: typeof RTCIceCandidate;
}
export class Rondevu {
readonly auth: RondevuAuth;
readonly usernames: RondevuUsername;
private _offers?: RondevuOffers;
private credentials?: Credentials;
private baseUrl: string;
private fetchFn?: FetchFunction;
private rtcPeerConnection?: typeof RTCPeerConnection;
private rtcSessionDescription?: typeof RTCSessionDescription;
private rtcIceCandidate?: typeof RTCIceCandidate;
constructor(options: RondevuOptions = {}) {
this.baseUrl = options.baseUrl || 'https://api.ronde.vu';
this.fetchFn = options.fetch;
this.rtcPeerConnection = options.RTCPeerConnection;
this.rtcSessionDescription = options.RTCSessionDescription;
this.rtcIceCandidate = options.RTCIceCandidate;
this.auth = new RondevuAuth(this.baseUrl, this.fetchFn);
this.usernames = new RondevuUsername(this.baseUrl);
if (options.credentials) {
this.credentials = options.credentials;
this._offers = new RondevuOffers(this.baseUrl, this.credentials, this.fetchFn);
}
}
/**
* Get offers API (low-level access, requires authentication)
* For most use cases, use the durable connection APIs instead
*/
get offers(): RondevuOffers {
if (!this._offers) {
throw new Error('Not authenticated. Call register() first or provide credentials.');
}
return this._offers;
}
/**
* Register and initialize authenticated client
* Generates a cryptographically random peer ID (128-bit)
*/
async register(): Promise<Credentials> {
this.credentials = await this.auth.register();
// Create offers API instance
this._offers = new RondevuOffers(
this.baseUrl,
this.credentials,
this.fetchFn
);
return this.credentials;
}
/**
* Check if client is authenticated
*/
isAuthenticated(): boolean {
return !!this.credentials;
}
/**
* Get current credentials
*/
getCredentials(): Credentials | undefined {
return this.credentials;
}
/**
* Create a new WebRTC peer connection (requires authentication)
* This is a high-level helper that creates and manages WebRTC connections with state management
*
* @param rtcConfig Optional RTCConfiguration for the peer connection
* @returns RondevuPeer instance
*/
createPeer(rtcConfig?: RTCConfiguration): RondevuPeer {
if (!this._offers) {
throw new Error('Not authenticated. Call register() first or provide credentials.');
}
return new RondevuPeer(
this._offers,
rtcConfig,
undefined, // No existing peer connection
this.rtcPeerConnection,
this.rtcSessionDescription,
this.rtcIceCandidate
);
}
/**
* Expose a durable service with automatic reconnection and TTL refresh
*
* Creates a service that handles incoming connections with automatic
* reconnection and message queuing during network interruptions.
*
* @param config Service configuration
* @returns DurableService instance
*
* @example
* ```typescript
* const service = await client.exposeService({
* username: 'alice',
* privateKey: keypair.privateKey,
* serviceFqn: 'chat@1.0.0',
* poolSize: 10,
* handler: (channel, connectionId) => {
* channel.on('message', (data) => {
* console.log('Received:', data);
* channel.send(`Echo: ${data}`);
* });
* }
* });
*
* await service.start();
* ```
*/
async exposeService(
config: DurableServiceConfig & {
handler: (channel: DurableChannel, connectionId: string) => void | Promise<void>;
}
): Promise<DurableService> {
if (!this._offers || !this.credentials) {
throw new Error('Not authenticated. Call register() first or provide credentials.');
}
const service = new DurableService(
this._offers,
this.baseUrl,
this.credentials,
config.handler,
config
);
return service;
}
/**
* Create a durable connection to a service by username and service FQN
*
* Establishes a WebRTC connection with automatic reconnection and
* message queuing during network interruptions.
*
* @param username Username of the service provider
* @param serviceFqn Fully qualified service name
* @param config Optional connection configuration
* @returns DurableConnection instance
*
* @example
* ```typescript
* const connection = await client.connect('alice', 'chat@1.0.0', {
* maxReconnectAttempts: 5
* });
*
* const channel = connection.createChannel('main');
* channel.on('message', (data) => {
* console.log('Received:', data);
* });
*
* await connection.connect();
* channel.send('Hello!');
* ```
*/
async connect(
username: string,
serviceFqn: string,
config?: DurableConnectionConfig
): Promise<DurableConnection> {
if (!this._offers) {
throw new Error('Not authenticated. Call register() first or provide credentials.');
}
const connectionInfo: ConnectionInfo = {
username,
serviceFqn
};
return new DurableConnection(this._offers, connectionInfo, config);
}
/**
* Create a durable connection to a service by UUID
*
* Establishes a WebRTC connection with automatic reconnection and
* message queuing during network interruptions.
*
* @param uuid Service UUID
* @param config Optional connection configuration
* @returns DurableConnection instance
*
* @example
* ```typescript
* const connection = await client.connectByUuid('service-uuid-here', {
* maxReconnectAttempts: 5
* });
*
* const channel = connection.createChannel('main');
* channel.on('message', (data) => {
* console.log('Received:', data);
* });
*
* await connection.connect();
* channel.send('Hello!');
* ```
*/
async connectByUuid(
uuid: string,
config?: DurableConnectionConfig
): Promise<DurableConnection> {
if (!this._offers) {
throw new Error('Not authenticated. Call register() first or provide credentials.');
}
const connectionInfo: ConnectionInfo = {
uuid
};
return new DurableConnection(this._offers, connectionInfo, config);
}
}

View File

@@ -1,590 +0,0 @@
import { RondevuOffers, Offer } from './offers.js';
import { RondevuUsername } from './usernames.js';
import RondevuPeer from './peer/index.js';
import { OfferPool, AnsweredOffer } from './offer-pool.js';
/**
* Connection information for tracking active connections
*/
interface ConnectionInfo {
peer: RondevuPeer;
channel: RTCDataChannel;
connectedAt: number;
offerId: string;
}
/**
* Status information about the pool
*/
export interface PoolStatus {
/** Number of active offers in the pool */
activeOffers: number;
/** Number of currently connected peers */
activeConnections: number;
/** Total number of connections handled since start */
totalConnectionsHandled: number;
/** Number of failed offer creation attempts */
failedOfferCreations: number;
}
/**
* Configuration options for a pooled service
*/
export interface ServicePoolOptions {
/** Username that owns the service */
username: string;
/** Private key for signing service operations */
privateKey: string;
/** Fully qualified service name (e.g., com.example.chat@1.0.0) */
serviceFqn: string;
/** WebRTC configuration */
rtcConfig?: RTCConfiguration;
/** Whether the service is publicly discoverable */
isPublic?: boolean;
/** Optional metadata for the service */
metadata?: Record<string, any>;
/** Time-to-live for offers in milliseconds */
ttl?: number;
/** Handler invoked for each new connection */
handler: (channel: RTCDataChannel, peer: RondevuPeer, connectionId: string) => void;
/** Number of simultaneous open offers to maintain (default: 1) */
poolSize?: number;
/** Polling interval in milliseconds (default: 2000ms) */
pollingInterval?: number;
/** Callback for pool status updates */
onPoolStatus?: (status: PoolStatus) => void;
/** Error handler for pool operations */
onError?: (error: Error, context: string) => void;
}
/**
* Service handle with pool-specific methods
*/
export interface PooledServiceHandle {
/** Service ID */
serviceId: string;
/** Service UUID */
uuid: string;
/** Offer ID */
offerId: string;
/** Unpublish the service */
unpublish: () => Promise<void>;
/** Get current pool status */
getStatus: () => PoolStatus;
/** Manually add offers to the pool */
addOffers: (count: number) => Promise<void>;
}
/**
* Manages a pooled service with multiple concurrent connections
*
* ServicePool coordinates offer creation, answer polling, and connection
* management for services that need to handle multiple simultaneous connections.
*/
export class ServicePool {
private offerPool?: OfferPool;
private connections: Map<string, ConnectionInfo> = new Map();
private peerConnections: Map<string, RTCPeerConnection> = new Map();
private status: PoolStatus = {
activeOffers: 0,
activeConnections: 0,
totalConnectionsHandled: 0,
failedOfferCreations: 0
};
private serviceId?: string;
private uuid?: string;
private offersApi: RondevuOffers;
private usernameApi: RondevuUsername;
constructor(
private baseUrl: string,
private credentials: { peerId: string; secret: string },
private options: ServicePoolOptions
) {
this.offersApi = new RondevuOffers(baseUrl, credentials);
this.usernameApi = new RondevuUsername(baseUrl);
}
/**
* Start the pooled service
*/
async start(): Promise<PooledServiceHandle> {
const poolSize = this.options.poolSize || 1;
// 1. Create initial service (publishes first offer)
const service = await this.publishInitialService();
this.serviceId = service.serviceId;
this.uuid = service.uuid;
// 2. Create additional offers for pool (poolSize - 1)
const additionalOffers: Offer[] = [];
const additionalPeerConnections: RTCPeerConnection[] = [];
const additionalDataChannels: RTCDataChannel[] = [];
if (poolSize > 1) {
try {
const result = await this.createOffers(poolSize - 1);
additionalOffers.push(...result.offers);
additionalPeerConnections.push(...result.peerConnections);
additionalDataChannels.push(...result.dataChannels);
} catch (error) {
this.handleError(error as Error, 'initial-offer-creation');
}
}
// 3. Initialize OfferPool with all offers
this.offerPool = new OfferPool(this.offersApi, {
poolSize,
pollingInterval: this.options.pollingInterval || 2000,
onAnswered: (answer) => this.handleConnection(answer),
onRefill: (count) => this.createOffers(count),
onError: (err, ctx) => this.handleError(err, ctx)
});
// Add all offers to pool with their peer connections and data channels
const allOffers = [
{ id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() },
...additionalOffers
];
const allPeerConnections = [
service.peerConnection,
...additionalPeerConnections
];
const allDataChannels = [
service.dataChannel,
...additionalDataChannels
];
await this.offerPool.addOffers(allOffers, allPeerConnections, allDataChannels);
// 4. Start polling
await this.offerPool.start();
// Update status
this.updateStatus();
// 5. Return handle
return {
serviceId: this.serviceId,
uuid: this.uuid,
offerId: service.offerId,
unpublish: () => this.stop(),
getStatus: () => this.getStatus(),
addOffers: (count) => this.manualRefill(count)
};
}
/**
* Stop the pooled service and clean up
*/
async stop(): Promise<void> {
// 1. Stop accepting new connections
if (this.offerPool) {
await this.offerPool.stop();
}
// 2. Close peer connections from the pool
if (this.offerPool) {
const poolPeerConnections = this.offerPool.getActivePeerConnections();
poolPeerConnections.forEach(pc => {
try {
pc.close();
} catch {
// Ignore errors during cleanup
}
});
}
// 3. Delete remaining offers
if (this.offerPool) {
const offerIds = this.offerPool.getActiveOfferIds();
await Promise.allSettled(
offerIds.map(id => this.offersApi.delete(id).catch(() => {}))
);
}
// 4. Close active connections
const closePromises = Array.from(this.connections.values()).map(
async (conn) => {
try {
// Give a brief moment for graceful closure
await new Promise(resolve => setTimeout(resolve, 100));
conn.peer.pc.close();
} catch {
// Ignore errors during cleanup
}
}
);
await Promise.allSettled(closePromises);
// 5. Delete service if we have a serviceId
if (this.serviceId) {
try {
const response = await fetch(`${this.baseUrl}/services/${this.serviceId}`, {
method: 'DELETE',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.credentials.peerId}:${this.credentials.secret}`
},
body: JSON.stringify({ username: this.options.username })
});
if (!response.ok) {
console.error('Failed to delete service:', await response.text());
}
} catch (error) {
console.error('Error deleting service:', error);
}
}
// Clear all state
this.connections.clear();
this.offerPool = undefined;
}
/**
* Handle an answered offer by setting up the connection
*/
private async handleConnection(answer: AnsweredOffer): Promise<void> {
const connectionId = this.generateConnectionId();
try {
// Use the existing peer connection from the pool
const peer = new RondevuPeer(
this.offersApi,
this.options.rtcConfig || {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
},
answer.peerConnection // Use the existing peer connection
);
peer.role = 'offerer';
peer.offerId = answer.offerId;
// Verify peer connection is in correct state
if (peer.pc.signalingState !== 'have-local-offer') {
console.error('Peer connection state info:', {
signalingState: peer.pc.signalingState,
connectionState: peer.pc.connectionState,
iceConnectionState: peer.pc.iceConnectionState,
iceGatheringState: peer.pc.iceGatheringState,
hasLocalDescription: !!peer.pc.localDescription,
hasRemoteDescription: !!peer.pc.remoteDescription,
localDescriptionType: peer.pc.localDescription?.type,
remoteDescriptionType: peer.pc.remoteDescription?.type,
offerId: answer.offerId
});
throw new Error(
`Invalid signaling state: ${peer.pc.signalingState}. Expected 'have-local-offer' to set remote answer.`
);
}
// Set remote description (the answer)
await peer.pc.setRemoteDescription({
type: 'answer',
sdp: answer.sdp
});
// Use the data channel we created when making the offer
if (!answer.dataChannel) {
throw new Error('No data channel found for answered offer');
}
const channel = answer.dataChannel;
// Wait for the channel to open (it was created when we made the offer)
if (channel.readyState !== 'open') {
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error('Timeout waiting for data channel to open')),
30000
);
channel.onopen = () => {
clearTimeout(timeout);
resolve();
};
channel.onerror = (error) => {
clearTimeout(timeout);
reject(new Error('Data channel error'));
};
});
}
// Register connection
this.connections.set(connectionId, {
peer,
channel,
connectedAt: Date.now(),
offerId: answer.offerId
});
this.status.activeConnections++;
this.status.totalConnectionsHandled++;
// Setup cleanup on disconnect
peer.on('disconnected', () => {
this.connections.delete(connectionId);
this.status.activeConnections--;
this.updateStatus();
});
peer.on('failed', () => {
this.connections.delete(connectionId);
this.status.activeConnections--;
this.updateStatus();
});
// Update status
this.updateStatus();
// Invoke user handler (wrapped in try-catch)
try {
this.options.handler(channel, peer, connectionId);
} catch (handlerError) {
this.handleError(handlerError as Error, 'handler');
}
} catch (error) {
this.handleError(error as Error, 'connection-setup');
}
}
/**
* Create multiple offers
*/
private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }> {
if (count <= 0) {
return { offers: [], peerConnections: [], dataChannels: [] };
}
// Server supports max 10 offers per request
const batchSize = Math.min(count, 10);
const offers: Offer[] = [];
const peerConnections: RTCPeerConnection[] = [];
const dataChannels: RTCDataChannel[] = [];
try {
// Create peer connections and generate offers
const offerRequests = [];
for (let i = 0; i < batchSize; i++) {
const pc = new RTCPeerConnection(this.options.rtcConfig || {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
});
// Create data channel (required for offers) and save reference
const channel = pc.createDataChannel('rondevu-service');
dataChannels.push(channel);
// Create offer
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
if (!offer.sdp) {
pc.close();
throw new Error('Failed to generate SDP');
}
offerRequests.push({
sdp: offer.sdp,
topics: [], // V2 doesn't use topics
ttl: this.options.ttl
});
// Keep peer connection alive - DO NOT CLOSE
peerConnections.push(pc);
}
// Batch create offers
const createdOffers = await this.offersApi.create(offerRequests);
offers.push(...createdOffers);
// Set up ICE candidate handlers AFTER we have offer IDs
for (let i = 0; i < peerConnections.length; i++) {
const pc = peerConnections[i];
const offerId = createdOffers[i].id;
pc.onicecandidate = async (event) => {
if (event.candidate) {
const candidateData = event.candidate.toJSON();
if (candidateData.candidate && candidateData.candidate !== '') {
try {
await this.offersApi.addIceCandidates(offerId, [candidateData]);
} catch (err) {
console.error('Error sending ICE candidate:', err);
}
}
}
};
}
} catch (error) {
// Close any created peer connections on error
peerConnections.forEach(pc => pc.close());
this.status.failedOfferCreations++;
this.handleError(error as Error, 'offer-creation');
throw error;
}
return { offers, peerConnections, dataChannels };
}
/**
* Publish the initial service (creates first offer)
*/
private async publishInitialService(): Promise<{
serviceId: string;
uuid: string;
offerId: string;
offerSdp: string;
expiresAt: number;
peerConnection: RTCPeerConnection;
dataChannel: RTCDataChannel;
}> {
const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options;
// Create peer connection for initial offer
const pc = new RTCPeerConnection(rtcConfig || {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
});
const dataChannel = pc.createDataChannel('rondevu-service');
// Create offer
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
if (!offer.sdp) {
pc.close();
throw new Error('Failed to generate SDP');
}
// Store the SDP
const offerSdp = offer.sdp;
// Create signature
const timestamp = Date.now();
const message = `publish:${username}:${serviceFqn}:${timestamp}`;
const signature = await this.usernameApi.signMessage(message, privateKey);
// Publish service
const response = await fetch(`${this.baseUrl}/services`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.credentials.peerId}:${this.credentials.secret}`
},
body: JSON.stringify({
username,
serviceFqn,
sdp: offerSdp,
ttl,
isPublic,
metadata,
signature,
message
})
});
if (!response.ok) {
pc.close();
const error = await response.json();
throw new Error(error.error || 'Failed to publish service');
}
const data = await response.json();
// Set up ICE candidate handler now that we have the offer ID
pc.onicecandidate = async (event) => {
if (event.candidate) {
const candidateData = event.candidate.toJSON();
if (candidateData.candidate && candidateData.candidate !== '') {
try {
await this.offersApi.addIceCandidates(data.offerId, [candidateData]);
} catch (err) {
console.error('Error sending ICE candidate:', err);
}
}
}
};
return {
serviceId: data.serviceId,
uuid: data.uuid,
offerId: data.offerId,
offerSdp,
expiresAt: data.expiresAt,
peerConnection: pc, // Keep peer connection alive
dataChannel // Keep data channel alive
};
}
/**
* Manually add offers to the pool
*/
private async manualRefill(count: number): Promise<void> {
if (!this.offerPool) {
throw new Error('Pool not started');
}
const result = await this.createOffers(count);
await this.offerPool.addOffers(result.offers, result.peerConnections, result.dataChannels);
this.updateStatus();
}
/**
* Get current pool status
*/
private getStatus(): PoolStatus {
return { ...this.status };
}
/**
* Update status and notify listeners
*/
private updateStatus(): void {
if (this.offerPool) {
this.status.activeOffers = this.offerPool.getActiveOfferCount();
}
if (this.options.onPoolStatus) {
this.options.onPoolStatus(this.getStatus());
}
}
/**
* Handle errors
*/
private handleError(error: Error, context: string): void {
if (this.options.onError) {
this.options.onError(error, context);
} else {
console.error(`ServicePool error (${context}):`, error);
}
}
/**
* Generate a unique connection ID
*/
private generateConnectionId(): string {
return `conn-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}

24
src/types.ts Normal file
View File

@@ -0,0 +1,24 @@
/**
* Core connection types
*/
export interface ConnectionIdentity {
id: string;
hostUsername: string;
}
export interface ConnectionState {
state: 'connected' | 'disconnected' | 'connecting';
lastActive: number;
}
export interface QueueMessageOptions {
expiresAt?: number;
}
export interface ConnectionInterface {
queueMessage(message: string | ArrayBuffer, options?: QueueMessageOptions): void;
sendMessage(message: string | ArrayBuffer): void;
}
export type Connection = ConnectionIdentity & ConnectionState & ConnectionInterface;

View File

@@ -1,200 +0,0 @@
import * as ed25519 from '@noble/ed25519';
// Set SHA-512 hash function for ed25519 (required in @noble/ed25519 v3+)
// Uses built-in WebCrypto API which only provides async digest
// We use the async ed25519 functions (signAsync, verifyAsync, getPublicKeyAsync)
ed25519.hashes.sha512Async = async (message: Uint8Array) => {
return new Uint8Array(await crypto.subtle.digest('SHA-512', message as BufferSource));
};
/**
* Username claim result
*/
export interface UsernameClaimResult {
username: string;
publicKey: string;
privateKey: string;
claimedAt: number;
expiresAt: number;
}
/**
* Username availability check result
*/
export interface UsernameCheckResult {
username: string;
available: boolean;
claimedAt?: number;
expiresAt?: number;
publicKey?: string;
}
/**
* Convert Uint8Array to base64 string
*/
function bytesToBase64(bytes: Uint8Array): string {
const binString = Array.from(bytes, (byte) =>
String.fromCodePoint(byte)
).join('');
return btoa(binString);
}
/**
* Convert base64 string to Uint8Array
*/
function base64ToBytes(base64: string): Uint8Array {
const binString = atob(base64);
return Uint8Array.from(binString, (char) => char.codePointAt(0)!);
}
/**
* Rondevu Username API
* Handles username claiming with Ed25519 cryptographic proof
*/
export class RondevuUsername {
constructor(private baseUrl: string) {}
/**
* Generates an Ed25519 keypair for username claiming
*/
async generateKeypair(): Promise<{ publicKey: string; privateKey: string }> {
const privateKey = ed25519.utils.randomSecretKey();
const publicKey = await ed25519.getPublicKeyAsync(privateKey);
return {
publicKey: bytesToBase64(publicKey),
privateKey: bytesToBase64(privateKey)
};
}
/**
* Signs a message with an Ed25519 private key
*/
async signMessage(message: string, privateKeyBase64: string): Promise<string> {
const privateKey = base64ToBytes(privateKeyBase64);
const encoder = new TextEncoder();
const messageBytes = encoder.encode(message);
const signature = await ed25519.signAsync(messageBytes, privateKey);
return bytesToBase64(signature);
}
/**
* Claims a username
* Generates a new keypair if one is not provided
*/
async claimUsername(
username: string,
existingKeypair?: { publicKey: string; privateKey: string }
): Promise<UsernameClaimResult> {
// Generate or use existing keypair
const keypair = existingKeypair || await this.generateKeypair();
// Create signed message
const timestamp = Date.now();
const message = `claim:${username}:${timestamp}`;
const signature = await this.signMessage(message, keypair.privateKey);
// Send claim request
const response = await fetch(`${this.baseUrl}/usernames/claim`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
username,
publicKey: keypair.publicKey,
signature,
message
})
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error || 'Failed to claim username');
}
const data = await response.json();
return {
username: data.username,
publicKey: keypair.publicKey,
privateKey: keypair.privateKey,
claimedAt: data.claimedAt,
expiresAt: data.expiresAt
};
}
/**
* Checks if a username is available
*/
async checkUsername(username: string): Promise<UsernameCheckResult> {
const response = await fetch(`${this.baseUrl}/usernames/${username}`);
if (!response.ok) {
throw new Error('Failed to check username');
}
const data = await response.json();
return {
username: data.username,
available: data.available,
claimedAt: data.claimedAt,
expiresAt: data.expiresAt,
publicKey: data.publicKey
};
}
/**
* Helper: Save keypair to localStorage
* WARNING: This stores the private key in localStorage which is not the most secure
* For production use, consider using IndexedDB with encryption or hardware security modules
*/
saveKeypairToStorage(username: string, publicKey: string, privateKey: string): void {
const data = { username, publicKey, privateKey, savedAt: Date.now() };
localStorage.setItem(`rondevu:keypair:${username}`, JSON.stringify(data));
}
/**
* Helper: Load keypair from localStorage
*/
loadKeypairFromStorage(username: string): { publicKey: string; privateKey: string } | null {
const stored = localStorage.getItem(`rondevu:keypair:${username}`);
if (!stored) return null;
try {
const data = JSON.parse(stored);
return { publicKey: data.publicKey, privateKey: data.privateKey };
} catch {
return null;
}
}
/**
* Helper: Delete keypair from localStorage
*/
deleteKeypairFromStorage(username: string): void {
localStorage.removeItem(`rondevu:keypair:${username}`);
}
/**
* Export keypair as JSON string (for backup)
*/
exportKeypair(publicKey: string, privateKey: string): string {
return JSON.stringify({
publicKey,
privateKey,
exportedAt: Date.now()
});
}
/**
* Import keypair from JSON string
*/
importKeypair(json: string): { publicKey: string; privateKey: string } {
const data = JSON.parse(json);
if (!data.publicKey || !data.privateKey) {
throw new Error('Invalid keypair format');
}
return { publicKey: data.publicKey, privateKey: data.privateKey };
}
}