diff --git a/src/offer-pool.ts b/src/offer-pool.ts index 86149fb..eefcb96 100644 --- a/src/offer-pool.ts +++ b/src/offer-pool.ts @@ -8,6 +8,7 @@ export interface AnsweredOffer { answererId: string; sdp: string; // Answer SDP peerConnection: RTCPeerConnection; // Original peer connection + dataChannel?: RTCDataChannel; // Data channel created with offer answeredAt: number; } @@ -25,7 +26,7 @@ export interface OfferPoolOptions { onAnswered: (answer: AnsweredOffer) => Promise; /** Callback to create new offers when refilling the pool */ - onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }>; + onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }>; /** Error handler for pool operations */ onError: (error: Error, context: string) => void; @@ -41,6 +42,7 @@ export interface OfferPoolOptions { export class OfferPool { private offers: Map = new Map(); private peerConnections: Map = new Map(); + private dataChannels: Map = new Map(); private polling: boolean = false; private pollingTimer?: ReturnType; private lastPollTime: number = 0; @@ -54,15 +56,18 @@ export class OfferPool { } /** - * Add offers to the pool with their peer connections + * Add offers to the pool with their peer connections and data channels */ - async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[]): Promise { + async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[], dataChannels?: RTCDataChannel[]): Promise { for (let i = 0; i < offers.length; i++) { const offer = offers[i]; this.offers.set(offer.id, offer); if (peerConnections && peerConnections[i]) { this.peerConnections.set(offer.id, peerConnections[i]); } + if (dataChannels && dataChannels[i]) { + this.dataChannels.set(offer.id, dataChannels[i]); + } } } @@ -116,9 +121,10 @@ export class OfferPool { // Process each answer for (const answer of myAnswers) { - // Get the original offer and peer connection + // Get the original offer, peer connection, and data channel const offer = this.offers.get(answer.offerId); const pc = this.peerConnections.get(answer.offerId); + const channel = this.dataChannels.get(answer.offerId); if (!offer || !pc) { continue; // Offer or peer connection already consumed, skip @@ -127,13 +133,15 @@ export class OfferPool { // Remove from pool BEFORE processing to prevent duplicate processing this.offers.delete(answer.offerId); this.peerConnections.delete(answer.offerId); + this.dataChannels.delete(answer.offerId); - // Notify ServicePool with answer and original peer connection + // Notify ServicePool with answer, original peer connection, and data channel await this.options.onAnswered({ offerId: answer.offerId, answererId: answer.answererId, sdp: answer.sdp, peerConnection: pc, + dataChannel: channel, answeredAt: answer.answeredAt }); } @@ -144,7 +152,7 @@ export class OfferPool { try { const result = await this.options.onRefill(needed); - await this.addOffers(result.offers, result.peerConnections); + await this.addOffers(result.offers, result.peerConnections, result.dataChannels); } catch (refillError) { this.options.onError( refillError as Error, diff --git a/src/service-pool.ts b/src/service-pool.ts index 95d6ea8..e2319a0 100644 --- a/src/service-pool.ts +++ b/src/service-pool.ts @@ -138,11 +138,13 @@ export class ServicePool { // 2. Create additional offers for pool (poolSize - 1) const additionalOffers: Offer[] = []; const additionalPeerConnections: RTCPeerConnection[] = []; + const additionalDataChannels: RTCDataChannel[] = []; if (poolSize > 1) { try { const result = await this.createOffers(poolSize - 1); additionalOffers.push(...result.offers); additionalPeerConnections.push(...result.peerConnections); + additionalDataChannels.push(...result.dataChannels); } catch (error) { this.handleError(error as Error, 'initial-offer-creation'); } @@ -157,7 +159,7 @@ export class ServicePool { onError: (err, ctx) => this.handleError(err, ctx) }); - // Add all offers to pool with their peer connections + // Add all offers to pool with their peer connections and data channels const allOffers = [ { id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() }, ...additionalOffers @@ -166,7 +168,11 @@ export class ServicePool { service.peerConnection, ...additionalPeerConnections ]; - await this.offerPool.addOffers(allOffers, allPeerConnections); + const allDataChannels = [ + service.dataChannel, + ...additionalDataChannels + ]; + await this.offerPool.addOffers(allOffers, allPeerConnections, allDataChannels); // 4. Start polling await this.offerPool.start(); @@ -296,33 +302,32 @@ export class ServicePool { sdp: answer.sdp }); - // Wait for data channel (answerer creates it, we receive it) - const channel = await new Promise((resolve, reject) => { - const timeout = setTimeout( - () => reject(new Error('Timeout waiting for data channel')), - 30000 - ); + // Use the data channel we created when making the offer + if (!answer.dataChannel) { + throw new Error('No data channel found for answered offer'); + } - peer.on('datachannel', (ch: RTCDataChannel) => { - clearTimeout(timeout); - resolve(ch); + const channel = answer.dataChannel; + + // Wait for the channel to open (it was created when we made the offer) + if (channel.readyState !== 'open') { + await new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error('Timeout waiting for data channel to open')), + 30000 + ); + + channel.onopen = () => { + clearTimeout(timeout); + resolve(); + }; + + channel.onerror = (error) => { + clearTimeout(timeout); + reject(new Error('Data channel error')); + }; }); - - // Also check if channel already exists - if (peer.pc.ondatachannel) { - const existingHandler = peer.pc.ondatachannel; - peer.pc.ondatachannel = (event) => { - clearTimeout(timeout); - resolve(event.channel); - if (existingHandler) existingHandler.call(peer.pc, event); - }; - } else { - peer.pc.ondatachannel = (event) => { - clearTimeout(timeout); - resolve(event.channel); - }; - } - }); + } // Register connection this.connections.set(connectionId, { @@ -366,15 +371,16 @@ export class ServicePool { /** * Create multiple offers */ - private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }> { + private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }> { if (count <= 0) { - return { offers: [], peerConnections: [] }; + return { offers: [], peerConnections: [], dataChannels: [] }; } // Server supports max 10 offers per request const batchSize = Math.min(count, 10); const offers: Offer[] = []; const peerConnections: RTCPeerConnection[] = []; + const dataChannels: RTCDataChannel[] = []; try { // Create peer connections and generate offers @@ -384,8 +390,9 @@ export class ServicePool { iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); - // Create data channel (required for offers) - pc.createDataChannel('rondevu-service'); + // Create data channel (required for offers) and save reference + const channel = pc.createDataChannel('rondevu-service'); + dataChannels.push(channel); // Create offer const offer = await pc.createOffer(); @@ -418,7 +425,7 @@ export class ServicePool { throw error; } - return { offers, peerConnections }; + return { offers, peerConnections, dataChannels }; } /** @@ -431,6 +438,7 @@ export class ServicePool { offerSdp: string; expiresAt: number; peerConnection: RTCPeerConnection; + dataChannel: RTCDataChannel; }> { const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options; @@ -439,7 +447,7 @@ export class ServicePool { iceServers: [{ urls: 'stun:stun.l.google.com:19302' }] }); - pc.createDataChannel('rondevu-service'); + const dataChannel = pc.createDataChannel('rondevu-service'); // Create offer const offer = await pc.createOffer(); @@ -491,7 +499,8 @@ export class ServicePool { offerId: data.offerId, offerSdp, expiresAt: data.expiresAt, - peerConnection: pc // Keep peer connection alive + peerConnection: pc, // Keep peer connection alive + dataChannel // Keep data channel alive }; } @@ -504,7 +513,7 @@ export class ServicePool { } const result = await this.createOffers(count); - await this.offerPool.addOffers(result.offers, result.peerConnections); + await this.offerPool.addOffers(result.offers, result.peerConnections, result.dataChannels); this.updateStatus(); }