From cffb092d3f72fdd2730788c5000a1a93a26b8c72 Mon Sep 17 00:00:00 2001 From: Bas van den Aakster Date: Sat, 6 Dec 2025 11:36:58 +0100 Subject: [PATCH] Fix WebRTC signaling state error in pooled services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add signaling state validation before setting remote answer - Fix race condition by removing offers from pool before processing - Add detailed debug logging for state mismatch errors - Prevent duplicate processing of answered offers This fixes the "Cannot set remote answer in state stable" error that occurred when multiple answers arrived in quick succession. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- src/offer-pool.ts | 45 +++++++++++++++-------- src/peer/index.ts | 4 ++- src/rondevu.ts | 1 + src/service-pool.ts | 88 +++++++++++++++++++++++++++++++-------------- 4 files changed, 96 insertions(+), 42 deletions(-) diff --git a/src/offer-pool.ts b/src/offer-pool.ts index da9f5e7..86149fb 100644 --- a/src/offer-pool.ts +++ b/src/offer-pool.ts @@ -7,7 +7,7 @@ export interface AnsweredOffer { offerId: string; answererId: string; sdp: string; // Answer SDP - offerSdp: string; // Original offer SDP + peerConnection: RTCPeerConnection; // Original peer connection answeredAt: number; } @@ -25,7 +25,7 @@ export interface OfferPoolOptions { onAnswered: (answer: AnsweredOffer) => Promise; /** Callback to create new offers when refilling the pool */ - onRefill: (count: number) => Promise; + onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }>; /** Error handler for pool operations */ onError: (error: Error, context: string) => void; @@ -40,6 +40,7 @@ export interface OfferPoolOptions { */ export class OfferPool { private offers: Map = new Map(); + private peerConnections: Map = new Map(); private polling: boolean = false; private pollingTimer?: ReturnType; private lastPollTime: number = 0; @@ -53,11 +54,15 @@ export class OfferPool { } /** - * Add offers to the pool + * Add offers to the pool with their peer connections */ - async addOffers(offers: Offer[]): Promise { - for (const offer of offers) { + async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[]): Promise { + for (let i = 0; i < offers.length; i++) { + const offer = offers[i]; this.offers.set(offer.id, offer); + if (peerConnections && peerConnections[i]) { + this.peerConnections.set(offer.id, peerConnections[i]); + } } } @@ -111,23 +116,26 @@ export class OfferPool { // Process each answer for (const answer of myAnswers) { - // Get the original offer + // Get the original offer and peer connection const offer = this.offers.get(answer.offerId); - if (!offer) { - continue; // Offer already consumed, skip + const pc = this.peerConnections.get(answer.offerId); + + if (!offer || !pc) { + continue; // Offer or peer connection already consumed, skip } - // Notify ServicePool with both answer and original offer SDP + // Remove from pool BEFORE processing to prevent duplicate processing + this.offers.delete(answer.offerId); + this.peerConnections.delete(answer.offerId); + + // Notify ServicePool with answer and original peer connection await this.options.onAnswered({ offerId: answer.offerId, answererId: answer.answererId, sdp: answer.sdp, - offerSdp: offer.sdp, + peerConnection: pc, answeredAt: answer.answeredAt }); - - // Remove consumed offer from pool - this.offers.delete(answer.offerId); } // Immediate refill if below pool size @@ -135,8 +143,8 @@ export class OfferPool { const needed = this.options.poolSize - this.offers.size; try { - const newOffers = await this.options.onRefill(needed); - await this.addOffers(newOffers); + const result = await this.options.onRefill(needed); + await this.addOffers(result.offers, result.peerConnections); } catch (refillError) { this.options.onError( refillError as Error, @@ -166,6 +174,13 @@ export class OfferPool { return Array.from(this.offers.keys()); } + /** + * Get all active peer connections + */ + getActivePeerConnections(): RTCPeerConnection[] { + return Array.from(this.peerConnections.values()); + } + /** * Get the last poll timestamp */ diff --git a/src/peer/index.ts b/src/peer/index.ts index cac1141..db54ad2 100644 --- a/src/peer/index.ts +++ b/src/peer/index.ts @@ -66,6 +66,7 @@ export default class RondevuPeer extends EventEmitter { { urls: 'stun:stun1.l.google.com:19302' } ] }, + existingPeerConnection?: RTCPeerConnection, rtcPeerConnection?: typeof RTCPeerConnection, rtcSessionDescription?: typeof RTCSessionDescription, rtcIceCandidate?: typeof RTCIceCandidate @@ -92,7 +93,8 @@ export default class RondevuPeer extends EventEmitter { throw new Error('RTCIceCandidate is not available. Please provide it in the Rondevu constructor options for Node.js environments.'); }) as any); - this.pc = new this.RTCPeerConnection(rtcConfig); + // Use existing peer connection if provided, otherwise create new one + this.pc = existingPeerConnection || new this.RTCPeerConnection(rtcConfig); this._state = new IdleState(this); this.setupPeerConnection(); diff --git a/src/rondevu.ts b/src/rondevu.ts index d2660ef..557ad97 100644 --- a/src/rondevu.ts +++ b/src/rondevu.ts @@ -177,6 +177,7 @@ export class Rondevu { return new RondevuPeer( this._offers, rtcConfig, + undefined, // No existing peer connection this.rtcPeerConnection, this.rtcSessionDescription, this.rtcIceCandidate diff --git a/src/service-pool.ts b/src/service-pool.ts index 837b72b..bace761 100644 --- a/src/service-pool.ts +++ b/src/service-pool.ts @@ -92,6 +92,7 @@ export interface PooledServiceHandle extends ServiceHandle { export class ServicePool { private offerPool?: OfferPool; private connections: Map = new Map(); + private peerConnections: Map = new Map(); private status: PoolStatus = { activeOffers: 0, activeConnections: 0, @@ -125,10 +126,12 @@ export class ServicePool { // 2. Create additional offers for pool (poolSize - 1) const additionalOffers: Offer[] = []; + const additionalPeerConnections: RTCPeerConnection[] = []; if (poolSize > 1) { try { - const offers = await this.createOffers(poolSize - 1); - additionalOffers.push(...offers); + const result = await this.createOffers(poolSize - 1); + additionalOffers.push(...result.offers); + additionalPeerConnections.push(...result.peerConnections); } catch (error) { this.handleError(error as Error, 'initial-offer-creation'); } @@ -143,12 +146,16 @@ export class ServicePool { onError: (err, ctx) => this.handleError(err, ctx) }); - // Add all offers to pool (include the SDP from the initial offer) + // Add all offers to pool with their peer connections const allOffers = [ { id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() }, ...additionalOffers ]; - await this.offerPool.addOffers(allOffers); + const allPeerConnections = [ + service.peerConnection, + ...additionalPeerConnections + ]; + await this.offerPool.addOffers(allOffers, allPeerConnections); // 4. Start polling await this.offerPool.start(); @@ -176,7 +183,19 @@ export class ServicePool { await this.offerPool.stop(); } - // 2. Delete remaining offers + // 2. Close peer connections from the pool + if (this.offerPool) { + const poolPeerConnections = this.offerPool.getActivePeerConnections(); + poolPeerConnections.forEach(pc => { + try { + pc.close(); + } catch { + // Ignore errors during cleanup + } + }); + } + + // 3. Delete remaining offers if (this.offerPool) { const offerIds = this.offerPool.getActiveOfferIds(); await Promise.allSettled( @@ -184,7 +203,7 @@ export class ServicePool { ); } - // 3. Close active connections + // 4. Close active connections const closePromises = Array.from(this.connections.values()).map( async (conn) => { try { @@ -198,7 +217,7 @@ export class ServicePool { ); await Promise.allSettled(closePromises); - // 4. Delete service if we have a serviceId + // 5. Delete service if we have a serviceId if (this.serviceId) { try { const response = await fetch(`${this.baseUrl}/services/${this.serviceId}`, { @@ -230,24 +249,37 @@ export class ServicePool { const connectionId = this.generateConnectionId(); try { - // Create peer connection + // Use the existing peer connection from the pool const peer = new RondevuPeer( this.offersApi, this.options.rtcConfig || { iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] - } + }, + answer.peerConnection // Use the existing peer connection ); peer.role = 'offerer'; peer.offerId = answer.offerId; - // Set local description (the original offer) first - await peer.pc.setLocalDescription({ - type: 'offer', - sdp: answer.offerSdp - }); + // Verify peer connection is in correct state + if (peer.pc.signalingState !== 'have-local-offer') { + console.error('Peer connection state info:', { + signalingState: peer.pc.signalingState, + connectionState: peer.pc.connectionState, + iceConnectionState: peer.pc.iceConnectionState, + iceGatheringState: peer.pc.iceGatheringState, + hasLocalDescription: !!peer.pc.localDescription, + hasRemoteDescription: !!peer.pc.remoteDescription, + localDescriptionType: peer.pc.localDescription?.type, + remoteDescriptionType: peer.pc.remoteDescription?.type, + offerId: answer.offerId + }); + throw new Error( + `Invalid signaling state: ${peer.pc.signalingState}. Expected 'have-local-offer' to set remote answer.` + ); + } - // Now set remote description (the answer) + // Set remote description (the answer) await peer.pc.setRemoteDescription({ type: 'answer', sdp: answer.sdp @@ -323,14 +355,15 @@ export class ServicePool { /** * Create multiple offers */ - private async createOffers(count: number): Promise { + private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }> { if (count <= 0) { - return []; + return { offers: [], peerConnections: [] }; } // Server supports max 10 offers per request const batchSize = Math.min(count, 10); const offers: Offer[] = []; + const peerConnections: RTCPeerConnection[] = []; try { // Create peer connections and generate offers @@ -358,8 +391,8 @@ export class ServicePool { ttl: this.options.ttl }); - // Close the PC immediately - we only needed the SDP - pc.close(); + // Keep peer connection alive - DO NOT CLOSE + peerConnections.push(pc); } // Batch create offers @@ -367,12 +400,14 @@ export class ServicePool { offers.push(...createdOffers); } catch (error) { + // Close any created peer connections on error + peerConnections.forEach(pc => pc.close()); this.status.failedOfferCreations++; this.handleError(error as Error, 'offer-creation'); throw error; } - return offers; + return { offers, peerConnections }; } /** @@ -384,6 +419,7 @@ export class ServicePool { offerId: string; offerSdp: string; expiresAt: number; + peerConnection: RTCPeerConnection; }> { const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options; @@ -403,7 +439,7 @@ export class ServicePool { throw new Error('Failed to generate SDP'); } - // Store the SDP before closing + // Store the SDP const offerSdp = offer.sdp; // Create signature @@ -430,9 +466,8 @@ export class ServicePool { }) }); - pc.close(); - if (!response.ok) { + pc.close(); const error = await response.json(); throw new Error(error.error || 'Failed to publish service'); } @@ -444,7 +479,8 @@ export class ServicePool { uuid: data.uuid, offerId: data.offerId, offerSdp, - expiresAt: data.expiresAt + expiresAt: data.expiresAt, + peerConnection: pc // Keep peer connection alive }; } @@ -456,8 +492,8 @@ export class ServicePool { throw new Error('Pool not started'); } - const offers = await this.createOffers(count); - await this.offerPool.addOffers(offers); + const result = await this.createOffers(count); + await this.offerPool.addOffers(result.offers, result.peerConnections); this.updateStatus(); }