Add WebRTC connection manager and fix race condition

- Add RondevuConnection class for high-level WebRTC management
- Handles offer/answer exchange, ICE candidates, and data channels
- Fix race condition in answer() method (register answerer before sending ICE)
- Add event-driven API (connecting, connected, disconnected, error, datachannel, track)
- Update README with connection manager examples
- Export new connection types and classes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-14 18:30:47 +01:00
parent e1ca8e1c16
commit 5a47e0a397
8 changed files with 1279 additions and 598 deletions

View File

@@ -1,332 +1,346 @@
import { EventEmitter } from './event-emitter.js';
import { RondevuAPI } from './client.js';
import { RondevuConnectionParams, WebRTCPolyfill } from './types.js';
import { RondevuOffers } from './offers.js';
/**
* Represents a WebRTC connection with automatic signaling and ICE exchange
* Events emitted by RondevuConnection
*/
export class RondevuConnection extends EventEmitter {
readonly id: string;
readonly role: 'offerer' | 'answerer';
readonly remotePeerId: string;
export interface RondevuConnectionEvents {
'connecting': () => void;
'connected': () => void;
'disconnected': () => void;
'error': (error: Error) => void;
'datachannel': (channel: RTCDataChannel) => void;
'track': (event: RTCTrackEvent) => void;
}
/**
* Options for creating a WebRTC connection
*/
export interface ConnectionOptions {
/**
* RTCConfiguration for the peer connection
* @default { iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }
*/
rtcConfig?: RTCConfiguration;
/**
* Topics to advertise this connection under
*/
topics: string[];
/**
* How long the offer should live (milliseconds)
* @default 300000 (5 minutes)
*/
ttl?: number;
/**
* Whether to create a data channel automatically (for offerer)
* @default true
*/
createDataChannel?: boolean;
/**
* Label for the automatically created data channel
* @default 'data'
*/
dataChannelLabel?: string;
}
/**
* High-level WebRTC connection manager for Rondevu
* Handles offer/answer exchange, ICE candidates, and connection lifecycle
*/
export class RondevuConnection {
private pc: RTCPeerConnection;
private client: RondevuAPI;
private localPeerId: string;
private dataChannels: Map<string, RTCDataChannel>;
private pollingInterval?: ReturnType<typeof setInterval>;
private pollingIntervalMs: number;
private connectionTimeoutMs: number;
private connectionTimer?: ReturnType<typeof setTimeout>;
private isPolling: boolean = false;
private isClosed: boolean = false;
private hasConnected: boolean = false;
private wrtc?: WebRTCPolyfill;
private RTCIceCandidate: typeof RTCIceCandidate;
private offersApi: RondevuOffers;
private offerId?: string;
private role?: 'offerer' | 'answerer';
private icePollingInterval?: ReturnType<typeof setInterval>;
private answerPollingInterval?: ReturnType<typeof setInterval>;
private lastIceTimestamp: number = Date.now();
private eventListeners: Map<keyof RondevuConnectionEvents, Set<Function>> = new Map();
private dataChannel?: RTCDataChannel;
constructor(params: RondevuConnectionParams, client: RondevuAPI) {
super();
this.id = params.id;
this.role = params.role;
this.pc = params.pc;
this.localPeerId = params.localPeerId;
this.remotePeerId = params.remotePeerId;
this.client = client;
this.dataChannels = new Map();
this.pollingIntervalMs = params.pollingInterval;
this.connectionTimeoutMs = params.connectionTimeout;
this.wrtc = params.wrtc;
// Use injected WebRTC polyfill or fall back to global
this.RTCIceCandidate = params.wrtc?.RTCIceCandidate || globalThis.RTCIceCandidate;
this.setupEventHandlers();
this.startConnectionTimeout();
/**
* Current connection state
*/
get connectionState(): RTCPeerConnectionState {
return this.pc.connectionState;
}
/**
* Setup RTCPeerConnection event handlers
* The offer ID for this connection
*/
private setupEventHandlers(): void {
// ICE candidate gathering
this.pc.onicecandidate = (event) => {
if (event.candidate && !this.isClosed) {
this.sendIceCandidate(event.candidate).catch((err) => {
this.emit('error', new Error(`Failed to send ICE candidate: ${err.message}`));
});
get id(): string | undefined {
return this.offerId;
}
/**
* Get the primary data channel (if created)
*/
get channel(): RTCDataChannel | undefined {
return this.dataChannel;
}
constructor(
offersApi: RondevuOffers,
private rtcConfig: RTCConfiguration = {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
{ urls: 'stun:stun1.l.google.com:19302' }
]
}
) {
this.offersApi = offersApi;
this.pc = new RTCPeerConnection(rtcConfig);
this.setupPeerConnection();
}
/**
* Set up peer connection event handlers
*/
private setupPeerConnection(): void {
this.pc.onicecandidate = async (event) => {
if (event.candidate && this.offerId) {
try {
await this.offersApi.addIceCandidates(this.offerId, [event.candidate.candidate]);
} catch (err) {
console.error('Error sending ICE candidate:', err);
}
}
};
// Connection state changes
this.pc.onconnectionstatechange = () => {
this.handleConnectionStateChange();
};
// Remote data channels
this.pc.ondatachannel = (event) => {
this.handleRemoteDataChannel(event.channel);
};
// Remote media streams
this.pc.ontrack = (event) => {
if (event.streams && event.streams[0]) {
this.emit('stream', event.streams[0]);
}
};
// ICE connection state changes
this.pc.oniceconnectionstatechange = () => {
const state = this.pc.iceConnectionState;
if (state === 'failed' || state === 'closed') {
this.emit('error', new Error(`ICE connection ${state}`));
if (state === 'failed') {
this.close();
}
}
};
}
/**
* Handle RTCPeerConnection state changes
*/
private handleConnectionStateChange(): void {
const state = this.pc.connectionState;
switch (state) {
case 'connected':
if (!this.hasConnected) {
this.hasConnected = true;
this.clearConnectionTimeout();
switch (this.pc.connectionState) {
case 'connecting':
this.emit('connecting');
break;
case 'connected':
this.emit('connected');
break;
case 'disconnected':
case 'failed':
case 'closed':
this.emit('disconnected');
this.stopPolling();
this.emit('connect');
}
break;
break;
}
};
case 'disconnected':
this.emit('disconnect');
break;
this.pc.ondatachannel = (event) => {
this.dataChannel = event.channel;
this.emit('datachannel', event.channel);
};
case 'failed':
this.emit('error', new Error('Connection failed'));
this.close();
break;
this.pc.ontrack = (event) => {
this.emit('track', event);
};
case 'closed':
this.emit('disconnect');
break;
}
this.pc.onicecandidateerror = (event) => {
console.error('ICE candidate error:', event);
};
}
/**
* Send an ICE candidate to the remote peer via signaling server
* Create an offer and advertise on topics
*/
private async sendIceCandidate(candidate: RTCIceCandidate): Promise<void> {
try {
await this.client.sendAnswer({
code: this.id,
candidate: JSON.stringify(candidate.toJSON()),
side: this.role,
});
} catch (err: any) {
throw new Error(`Failed to send ICE candidate: ${err.message}`);
async createOffer(options: ConnectionOptions): Promise<string> {
this.role = 'offerer';
// Create data channel if requested
if (options.createDataChannel !== false) {
this.dataChannel = this.pc.createDataChannel(
options.dataChannelLabel || 'data'
);
this.emit('datachannel', this.dataChannel);
}
// Create WebRTC offer
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Create offer on Rondevu server
const offers = await this.offersApi.create([{
sdp: offer.sdp!,
topics: options.topics,
ttl: options.ttl || 300000
}]);
this.offerId = offers[0].id;
// Start polling for answers
this.startAnswerPolling();
return this.offerId;
}
/**
* Start polling for remote session data (answer/candidates)
* Answer an existing offer
*/
startPolling(): void {
if (this.isPolling || this.isClosed) {
return;
}
async answer(offerId: string, offerSdp: string): Promise<void> {
this.role = 'answerer';
this.isPolling = true;
// Poll immediately
this.poll().catch((err) => {
this.emit('error', new Error(`Poll error: ${err.message}`));
// Set remote description
await this.pc.setRemoteDescription({
type: 'offer',
sdp: offerSdp
});
// Set up interval polling
this.pollingInterval = setInterval(() => {
this.poll().catch((err) => {
this.emit('error', new Error(`Poll error: ${err.message}`));
});
}, this.pollingIntervalMs);
// Create answer
const answer = await this.pc.createAnswer();
await this.pc.setLocalDescription(answer);
// Send answer to server FIRST
// This registers us as the answerer before ICE candidates arrive
await this.offersApi.answer(offerId, answer.sdp!);
// Now set offerId to enable ICE candidate sending
// This prevents a race condition where ICE candidates arrive before answer is registered
this.offerId = offerId;
// Start polling for ICE candidates
this.startIcePolling();
}
/**
* Stop polling
* Start polling for answers (offerer only)
*/
private stopPolling(): void {
this.isPolling = false;
if (this.pollingInterval) {
clearInterval(this.pollingInterval);
this.pollingInterval = undefined;
}
}
private startAnswerPolling(): void {
if (this.role !== 'offerer' || !this.offerId) return;
/**
* Poll the signaling server for remote data
*/
private async poll(): Promise<void> {
if (this.isClosed) {
this.stopPolling();
return;
}
this.answerPollingInterval = setInterval(async () => {
try {
const answers = await this.offersApi.getAnswers();
const myAnswer = answers.find(a => a.offerId === this.offerId);
try {
const response = await this.client.poll(this.id, this.role);
if (this.role === 'offerer') {
const offererResponse = response as { answer: string | null; answerCandidates: string[] };
// Apply answer if received and not yet applied
if (offererResponse.answer && !this.pc.currentRemoteDescription) {
if (myAnswer) {
// Set remote description
await this.pc.setRemoteDescription({
type: 'answer',
sdp: offererResponse.answer,
sdp: myAnswer.sdp
});
// Stop answer polling, start ICE polling
this.stopAnswerPolling();
this.startIcePolling();
}
} catch (err) {
console.error('Error polling for answers:', err);
}
}, 2000);
}
// Apply ICE candidates
if (offererResponse.answerCandidates && offererResponse.answerCandidates.length > 0) {
for (const candidateStr of offererResponse.answerCandidates) {
try {
const candidate = JSON.parse(candidateStr);
await this.pc.addIceCandidate(new this.RTCIceCandidate(candidate));
} catch (err) {
console.warn('Failed to add ICE candidate:', err);
}
}
}
} else {
// Answerer role
const answererResponse = response as { offer: string; offerCandidates: string[] };
// Apply ICE candidates from offerer
if (answererResponse.offerCandidates && answererResponse.offerCandidates.length > 0) {
for (const candidateStr of answererResponse.offerCandidates) {
try {
const candidate = JSON.parse(candidateStr);
await this.pc.addIceCandidate(new this.RTCIceCandidate(candidate));
} catch (err) {
console.warn('Failed to add ICE candidate:', err);
}
}
/**
* Start polling for ICE candidates
*/
private startIcePolling(): void {
if (!this.offerId) return;
this.icePollingInterval = setInterval(async () => {
if (!this.offerId) return;
try {
const candidates = await this.offersApi.getIceCandidates(
this.offerId,
this.lastIceTimestamp
);
for (const candidate of candidates) {
await this.pc.addIceCandidate({
candidate: candidate.candidate,
sdpMLineIndex: 0,
sdpMid: '0'
});
this.lastIceTimestamp = candidate.createdAt;
}
} catch (err) {
console.error('Error polling for ICE candidates:', err);
}
} catch (err: any) {
// Session not found or expired
if (err.message.includes('404') || err.message.includes('not found')) {
this.emit('error', new Error('Session not found or expired'));
this.close();
}
throw err;
}, 1000);
}
/**
* Stop answer polling
*/
private stopAnswerPolling(): void {
if (this.answerPollingInterval) {
clearInterval(this.answerPollingInterval);
this.answerPollingInterval = undefined;
}
}
/**
* Handle remotely created data channel
* Stop ICE polling
*/
private handleRemoteDataChannel(channel: RTCDataChannel): void {
this.dataChannels.set(channel.label, channel);
this.emit('datachannel', channel);
}
/**
* Get or create a data channel
*/
dataChannel(label: string, options?: RTCDataChannelInit): RTCDataChannel {
let channel = this.dataChannels.get(label);
if (!channel) {
channel = this.pc.createDataChannel(label, options);
this.dataChannels.set(label, channel);
}
return channel;
}
/**
* Add a local media stream to the connection
*/
addStream(stream: MediaStream): void {
stream.getTracks().forEach(track => {
this.pc.addTrack(track, stream);
});
}
/**
* Get the underlying RTCPeerConnection for advanced usage
*/
getPeerConnection(): RTCPeerConnection {
return this.pc;
}
/**
* Start connection timeout
*/
private startConnectionTimeout(): void {
this.connectionTimer = setTimeout(() => {
if (this.pc.connectionState !== 'connected') {
this.emit('error', new Error('Connection timeout'));
this.close();
}
}, this.connectionTimeoutMs);
}
/**
* Clear connection timeout
*/
private clearConnectionTimeout(): void {
if (this.connectionTimer) {
clearTimeout(this.connectionTimer);
this.connectionTimer = undefined;
private stopIcePolling(): void {
if (this.icePollingInterval) {
clearInterval(this.icePollingInterval);
this.icePollingInterval = undefined;
}
}
/**
* Leave the session by deleting the offer on the server and closing the connection
* This ends the session for all connected peers
* Stop all polling
*/
async leave(): Promise<void> {
try {
await this.client.leave(this.id);
} catch (err) {
// Ignore errors - session might already be expired
console.debug('Leave error (ignored):', err);
}
this.close();
private stopPolling(): void {
this.stopAnswerPolling();
this.stopIcePolling();
}
/**
* Close the connection and cleanup resources
* Add event listener
*/
on<K extends keyof RondevuConnectionEvents>(
event: K,
listener: RondevuConnectionEvents[K]
): void {
if (!this.eventListeners.has(event)) {
this.eventListeners.set(event, new Set());
}
this.eventListeners.get(event)!.add(listener);
}
/**
* Remove event listener
*/
off<K extends keyof RondevuConnectionEvents>(
event: K,
listener: RondevuConnectionEvents[K]
): void {
const listeners = this.eventListeners.get(event);
if (listeners) {
listeners.delete(listener);
}
}
/**
* Emit event
*/
private emit<K extends keyof RondevuConnectionEvents>(
event: K,
...args: Parameters<RondevuConnectionEvents[K]>
): void {
const listeners = this.eventListeners.get(event);
if (listeners) {
listeners.forEach(listener => {
(listener as any)(...args);
});
}
}
/**
* Add a media track to the connection
*/
addTrack(track: MediaStreamTrack, ...streams: MediaStream[]): RTCRtpSender {
return this.pc.addTrack(track, ...streams);
}
/**
* Close the connection and clean up
*/
close(): void {
if (this.isClosed) {
return;
}
this.isClosed = true;
this.stopPolling();
this.clearConnectionTimeout();
// Close all data channels
this.dataChannels.forEach(dc => {
if (dc.readyState === 'open' || dc.readyState === 'connecting') {
dc.close();
}
});
this.dataChannels.clear();
// Close peer connection
if (this.pc.connectionState !== 'closed') {
this.pc.close();
}
this.emit('disconnect');
this.pc.close();
this.eventListeners.clear();
}
}