Fix WebRTC signaling state error in pooled services

- Add signaling state validation before setting remote answer
- Fix race condition by removing offers from pool before processing
- Add detailed debug logging for state mismatch errors
- Prevent duplicate processing of answered offers

This fixes the "Cannot set remote answer in state stable" error
that occurred when multiple answers arrived in quick succession.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-06 11:36:58 +01:00
parent 122f211e7c
commit cffb092d3f
4 changed files with 96 additions and 42 deletions

View File

@@ -7,7 +7,7 @@ export interface AnsweredOffer {
offerId: string; offerId: string;
answererId: string; answererId: string;
sdp: string; // Answer SDP sdp: string; // Answer SDP
offerSdp: string; // Original offer SDP peerConnection: RTCPeerConnection; // Original peer connection
answeredAt: number; answeredAt: number;
} }
@@ -25,7 +25,7 @@ export interface OfferPoolOptions {
onAnswered: (answer: AnsweredOffer) => Promise<void>; onAnswered: (answer: AnsweredOffer) => Promise<void>;
/** Callback to create new offers when refilling the pool */ /** Callback to create new offers when refilling the pool */
onRefill: (count: number) => Promise<Offer[]>; onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }>;
/** Error handler for pool operations */ /** Error handler for pool operations */
onError: (error: Error, context: string) => void; onError: (error: Error, context: string) => void;
@@ -40,6 +40,7 @@ export interface OfferPoolOptions {
*/ */
export class OfferPool { export class OfferPool {
private offers: Map<string, Offer> = new Map(); private offers: Map<string, Offer> = new Map();
private peerConnections: Map<string, RTCPeerConnection> = new Map();
private polling: boolean = false; private polling: boolean = false;
private pollingTimer?: ReturnType<typeof setInterval>; private pollingTimer?: ReturnType<typeof setInterval>;
private lastPollTime: number = 0; private lastPollTime: number = 0;
@@ -53,11 +54,15 @@ export class OfferPool {
} }
/** /**
* Add offers to the pool * Add offers to the pool with their peer connections
*/ */
async addOffers(offers: Offer[]): Promise<void> { async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[]): Promise<void> {
for (const offer of offers) { for (let i = 0; i < offers.length; i++) {
const offer = offers[i];
this.offers.set(offer.id, offer); this.offers.set(offer.id, offer);
if (peerConnections && peerConnections[i]) {
this.peerConnections.set(offer.id, peerConnections[i]);
}
} }
} }
@@ -111,23 +116,26 @@ export class OfferPool {
// Process each answer // Process each answer
for (const answer of myAnswers) { for (const answer of myAnswers) {
// Get the original offer // Get the original offer and peer connection
const offer = this.offers.get(answer.offerId); const offer = this.offers.get(answer.offerId);
if (!offer) { const pc = this.peerConnections.get(answer.offerId);
continue; // Offer already consumed, skip
if (!offer || !pc) {
continue; // Offer or peer connection already consumed, skip
} }
// Notify ServicePool with both answer and original offer SDP // Remove from pool BEFORE processing to prevent duplicate processing
this.offers.delete(answer.offerId);
this.peerConnections.delete(answer.offerId);
// Notify ServicePool with answer and original peer connection
await this.options.onAnswered({ await this.options.onAnswered({
offerId: answer.offerId, offerId: answer.offerId,
answererId: answer.answererId, answererId: answer.answererId,
sdp: answer.sdp, sdp: answer.sdp,
offerSdp: offer.sdp, peerConnection: pc,
answeredAt: answer.answeredAt answeredAt: answer.answeredAt
}); });
// Remove consumed offer from pool
this.offers.delete(answer.offerId);
} }
// Immediate refill if below pool size // Immediate refill if below pool size
@@ -135,8 +143,8 @@ export class OfferPool {
const needed = this.options.poolSize - this.offers.size; const needed = this.options.poolSize - this.offers.size;
try { try {
const newOffers = await this.options.onRefill(needed); const result = await this.options.onRefill(needed);
await this.addOffers(newOffers); await this.addOffers(result.offers, result.peerConnections);
} catch (refillError) { } catch (refillError) {
this.options.onError( this.options.onError(
refillError as Error, refillError as Error,
@@ -166,6 +174,13 @@ export class OfferPool {
return Array.from(this.offers.keys()); return Array.from(this.offers.keys());
} }
/**
* Get all active peer connections
*/
getActivePeerConnections(): RTCPeerConnection[] {
return Array.from(this.peerConnections.values());
}
/** /**
* Get the last poll timestamp * Get the last poll timestamp
*/ */

View File

@@ -66,6 +66,7 @@ export default class RondevuPeer extends EventEmitter<PeerEvents> {
{ urls: 'stun:stun1.l.google.com:19302' } { urls: 'stun:stun1.l.google.com:19302' }
] ]
}, },
existingPeerConnection?: RTCPeerConnection,
rtcPeerConnection?: typeof RTCPeerConnection, rtcPeerConnection?: typeof RTCPeerConnection,
rtcSessionDescription?: typeof RTCSessionDescription, rtcSessionDescription?: typeof RTCSessionDescription,
rtcIceCandidate?: typeof RTCIceCandidate rtcIceCandidate?: typeof RTCIceCandidate
@@ -92,7 +93,8 @@ export default class RondevuPeer extends EventEmitter<PeerEvents> {
throw new Error('RTCIceCandidate is not available. Please provide it in the Rondevu constructor options for Node.js environments.'); throw new Error('RTCIceCandidate is not available. Please provide it in the Rondevu constructor options for Node.js environments.');
}) as any); }) as any);
this.pc = new this.RTCPeerConnection(rtcConfig); // Use existing peer connection if provided, otherwise create new one
this.pc = existingPeerConnection || new this.RTCPeerConnection(rtcConfig);
this._state = new IdleState(this); this._state = new IdleState(this);
this.setupPeerConnection(); this.setupPeerConnection();

View File

@@ -177,6 +177,7 @@ export class Rondevu {
return new RondevuPeer( return new RondevuPeer(
this._offers, this._offers,
rtcConfig, rtcConfig,
undefined, // No existing peer connection
this.rtcPeerConnection, this.rtcPeerConnection,
this.rtcSessionDescription, this.rtcSessionDescription,
this.rtcIceCandidate this.rtcIceCandidate

View File

@@ -92,6 +92,7 @@ export interface PooledServiceHandle extends ServiceHandle {
export class ServicePool { export class ServicePool {
private offerPool?: OfferPool; private offerPool?: OfferPool;
private connections: Map<string, ConnectionInfo> = new Map(); private connections: Map<string, ConnectionInfo> = new Map();
private peerConnections: Map<string, RTCPeerConnection> = new Map();
private status: PoolStatus = { private status: PoolStatus = {
activeOffers: 0, activeOffers: 0,
activeConnections: 0, activeConnections: 0,
@@ -125,10 +126,12 @@ export class ServicePool {
// 2. Create additional offers for pool (poolSize - 1) // 2. Create additional offers for pool (poolSize - 1)
const additionalOffers: Offer[] = []; const additionalOffers: Offer[] = [];
const additionalPeerConnections: RTCPeerConnection[] = [];
if (poolSize > 1) { if (poolSize > 1) {
try { try {
const offers = await this.createOffers(poolSize - 1); const result = await this.createOffers(poolSize - 1);
additionalOffers.push(...offers); additionalOffers.push(...result.offers);
additionalPeerConnections.push(...result.peerConnections);
} catch (error) { } catch (error) {
this.handleError(error as Error, 'initial-offer-creation'); this.handleError(error as Error, 'initial-offer-creation');
} }
@@ -143,12 +146,16 @@ export class ServicePool {
onError: (err, ctx) => this.handleError(err, ctx) onError: (err, ctx) => this.handleError(err, ctx)
}); });
// Add all offers to pool (include the SDP from the initial offer) // Add all offers to pool with their peer connections
const allOffers = [ const allOffers = [
{ id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() }, { id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() },
...additionalOffers ...additionalOffers
]; ];
await this.offerPool.addOffers(allOffers); const allPeerConnections = [
service.peerConnection,
...additionalPeerConnections
];
await this.offerPool.addOffers(allOffers, allPeerConnections);
// 4. Start polling // 4. Start polling
await this.offerPool.start(); await this.offerPool.start();
@@ -176,7 +183,19 @@ export class ServicePool {
await this.offerPool.stop(); await this.offerPool.stop();
} }
// 2. Delete remaining offers // 2. Close peer connections from the pool
if (this.offerPool) {
const poolPeerConnections = this.offerPool.getActivePeerConnections();
poolPeerConnections.forEach(pc => {
try {
pc.close();
} catch {
// Ignore errors during cleanup
}
});
}
// 3. Delete remaining offers
if (this.offerPool) { if (this.offerPool) {
const offerIds = this.offerPool.getActiveOfferIds(); const offerIds = this.offerPool.getActiveOfferIds();
await Promise.allSettled( await Promise.allSettled(
@@ -184,7 +203,7 @@ export class ServicePool {
); );
} }
// 3. Close active connections // 4. Close active connections
const closePromises = Array.from(this.connections.values()).map( const closePromises = Array.from(this.connections.values()).map(
async (conn) => { async (conn) => {
try { try {
@@ -198,7 +217,7 @@ export class ServicePool {
); );
await Promise.allSettled(closePromises); await Promise.allSettled(closePromises);
// 4. Delete service if we have a serviceId // 5. Delete service if we have a serviceId
if (this.serviceId) { if (this.serviceId) {
try { try {
const response = await fetch(`${this.baseUrl}/services/${this.serviceId}`, { const response = await fetch(`${this.baseUrl}/services/${this.serviceId}`, {
@@ -230,24 +249,37 @@ export class ServicePool {
const connectionId = this.generateConnectionId(); const connectionId = this.generateConnectionId();
try { try {
// Create peer connection // Use the existing peer connection from the pool
const peer = new RondevuPeer( const peer = new RondevuPeer(
this.offersApi, this.offersApi,
this.options.rtcConfig || { this.options.rtcConfig || {
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
} },
answer.peerConnection // Use the existing peer connection
); );
peer.role = 'offerer'; peer.role = 'offerer';
peer.offerId = answer.offerId; peer.offerId = answer.offerId;
// Set local description (the original offer) first // Verify peer connection is in correct state
await peer.pc.setLocalDescription({ if (peer.pc.signalingState !== 'have-local-offer') {
type: 'offer', console.error('Peer connection state info:', {
sdp: answer.offerSdp signalingState: peer.pc.signalingState,
connectionState: peer.pc.connectionState,
iceConnectionState: peer.pc.iceConnectionState,
iceGatheringState: peer.pc.iceGatheringState,
hasLocalDescription: !!peer.pc.localDescription,
hasRemoteDescription: !!peer.pc.remoteDescription,
localDescriptionType: peer.pc.localDescription?.type,
remoteDescriptionType: peer.pc.remoteDescription?.type,
offerId: answer.offerId
}); });
throw new Error(
`Invalid signaling state: ${peer.pc.signalingState}. Expected 'have-local-offer' to set remote answer.`
);
}
// Now set remote description (the answer) // Set remote description (the answer)
await peer.pc.setRemoteDescription({ await peer.pc.setRemoteDescription({
type: 'answer', type: 'answer',
sdp: answer.sdp sdp: answer.sdp
@@ -323,14 +355,15 @@ export class ServicePool {
/** /**
* Create multiple offers * Create multiple offers
*/ */
private async createOffers(count: number): Promise<Offer[]> { private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }> {
if (count <= 0) { if (count <= 0) {
return []; return { offers: [], peerConnections: [] };
} }
// Server supports max 10 offers per request // Server supports max 10 offers per request
const batchSize = Math.min(count, 10); const batchSize = Math.min(count, 10);
const offers: Offer[] = []; const offers: Offer[] = [];
const peerConnections: RTCPeerConnection[] = [];
try { try {
// Create peer connections and generate offers // Create peer connections and generate offers
@@ -358,8 +391,8 @@ export class ServicePool {
ttl: this.options.ttl ttl: this.options.ttl
}); });
// Close the PC immediately - we only needed the SDP // Keep peer connection alive - DO NOT CLOSE
pc.close(); peerConnections.push(pc);
} }
// Batch create offers // Batch create offers
@@ -367,12 +400,14 @@ export class ServicePool {
offers.push(...createdOffers); offers.push(...createdOffers);
} catch (error) { } catch (error) {
// Close any created peer connections on error
peerConnections.forEach(pc => pc.close());
this.status.failedOfferCreations++; this.status.failedOfferCreations++;
this.handleError(error as Error, 'offer-creation'); this.handleError(error as Error, 'offer-creation');
throw error; throw error;
} }
return offers; return { offers, peerConnections };
} }
/** /**
@@ -384,6 +419,7 @@ export class ServicePool {
offerId: string; offerId: string;
offerSdp: string; offerSdp: string;
expiresAt: number; expiresAt: number;
peerConnection: RTCPeerConnection;
}> { }> {
const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options; const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options;
@@ -403,7 +439,7 @@ export class ServicePool {
throw new Error('Failed to generate SDP'); throw new Error('Failed to generate SDP');
} }
// Store the SDP before closing // Store the SDP
const offerSdp = offer.sdp; const offerSdp = offer.sdp;
// Create signature // Create signature
@@ -430,9 +466,8 @@ export class ServicePool {
}) })
}); });
pc.close();
if (!response.ok) { if (!response.ok) {
pc.close();
const error = await response.json(); const error = await response.json();
throw new Error(error.error || 'Failed to publish service'); throw new Error(error.error || 'Failed to publish service');
} }
@@ -444,7 +479,8 @@ export class ServicePool {
uuid: data.uuid, uuid: data.uuid,
offerId: data.offerId, offerId: data.offerId,
offerSdp, offerSdp,
expiresAt: data.expiresAt expiresAt: data.expiresAt,
peerConnection: pc // Keep peer connection alive
}; };
} }
@@ -456,8 +492,8 @@ export class ServicePool {
throw new Error('Pool not started'); throw new Error('Pool not started');
} }
const offers = await this.createOffers(count); const result = await this.createOffers(count);
await this.offerPool.addOffers(offers); await this.offerPool.addOffers(result.offers, result.peerConnections);
this.updateStatus(); this.updateStatus();
} }