diff --git a/src/rondevu-signaler.ts b/src/rondevu-signaler.ts index f6fdf80..59fd9f8 100644 --- a/src/rondevu-signaler.ts +++ b/src/rondevu-signaler.ts @@ -46,10 +46,11 @@ export class RondevuSignaler implements Signaler { private offerListeners: Array<(offer: RTCSessionDescriptionInit) => void> = [] private answerListeners: Array<(answer: RTCSessionDescriptionInit) => void> = [] private iceListeners: Array<(candidate: RTCIceCandidate) => void> = [] - private answerPollingTimeout: ReturnType | null = null + private pollingTimeout: ReturnType | null = null private icePollingTimeout: ReturnType | null = null - private lastIceTimestamp = 0 + private lastPollTimestamp = 0 private isPolling = false + private isOfferer = false private pollingConfig: Required constructor( @@ -90,12 +91,10 @@ export class RondevuSignaler implements Signaler { this.offerId = publishedService.offers[0].offerId this.serviceFqn = publishedService.serviceFqn + this.isOfferer = true - // Start polling for answer - this.startAnswerPolling() - - // Start polling for ICE candidates - this.startIcePolling() + // Start combined polling for answers and ICE candidates + this.startPolling() } /** @@ -114,8 +113,9 @@ export class RondevuSignaler implements Signaler { // Send answer to the service const result = await this.rondevu.getAPI().postOfferAnswer(this.serviceFqn, this.offerId, answer.sdp) this.offerId = result.offerId + this.isOfferer = false - // Start polling for ICE candidates + // Start polling for ICE candidates (answerer uses separate endpoint) this.startIcePolling() } @@ -244,81 +244,117 @@ export class RondevuSignaler implements Signaler { } /** - * Start polling for answer (offerer side) with exponential backoff + * Start combined polling for answers and ICE candidates (offerer side) + * Uses pollOffers() for efficient batch polling */ - private startAnswerPolling(): void { - if (this.answerPollingTimeout || !this.serviceFqn || !this.offerId) { + private startPolling(): void { + if (this.pollingTimeout || !this.isOfferer) { return } let interval = this.pollingConfig.initialInterval let retries = 0 + let answerReceived = false const poll = async () => { - if (!this.serviceFqn || !this.offerId) { - this.stopAnswerPolling() - return - } - try { - const answer = await this.rondevu.getAPI().getOfferAnswer(this.serviceFqn, this.offerId) + const result = await this.rondevu.pollOffers(this.lastPollTimestamp) - if (answer && answer.sdp) { - // Store offerId if we didn't have it yet - if (!this.offerId) { - this.offerId = answer.offerId - } + let foundActivity = false - // Got answer - notify listeners and stop polling - const answerDesc: RTCSessionDescriptionInit = { - type: 'answer', - sdp: answer.sdp, - } + // Process answers + if (result.answers.length > 0 && !answerReceived) { + foundActivity = true - this.answerListeners.forEach(listener => { - try { - listener(answerDesc) - } catch (err) { - console.error('Answer listener error:', err) + // Find answer for our offerId + const answer = result.answers.find(a => a.offerId === this.offerId) + + if (answer && answer.sdp) { + answerReceived = true + + const answerDesc: RTCSessionDescriptionInit = { + type: 'answer', + sdp: answer.sdp, } - }) - // Stop polling once we get the answer - this.stopAnswerPolling() - return + this.answerListeners.forEach(listener => { + try { + listener(answerDesc) + } catch (err) { + console.error('Answer listener error:', err) + } + }) + + this.lastPollTimestamp = Math.max(this.lastPollTimestamp, answer.answeredAt) + } } - // No answer yet - exponential backoff - retries++ - if (retries > this.pollingConfig.maxRetries) { - console.warn('Max retries reached for answer polling') - this.stopAnswerPolling() - return + // Process ICE candidates for our offer + if (this.offerId && result.iceCandidates[this.offerId]) { + const candidates = result.iceCandidates[this.offerId] + + // Filter for answerer candidates (offerer receives answerer's candidates) + const answererCandidates = candidates.filter(c => c.role === 'answerer') + + if (answererCandidates.length > 0) { + foundActivity = true + + for (const item of answererCandidates) { + if (item.candidate && item.candidate.candidate && item.candidate.candidate !== '') { + try { + const rtcCandidate = new RTCIceCandidate(item.candidate) + + this.iceListeners.forEach(listener => { + try { + listener(rtcCandidate) + } catch (err) { + console.error('ICE listener error:', err) + } + }) + + this.lastPollTimestamp = Math.max(this.lastPollTimestamp, item.createdAt) + } catch (err) { + console.warn('Failed to process ICE candidate:', err) + this.lastPollTimestamp = Math.max(this.lastPollTimestamp, item.createdAt) + } + } + } + } } - interval = Math.min( - interval * this.pollingConfig.backoffMultiplier, - this.pollingConfig.maxInterval - ) + // Adjust interval based on activity + if (foundActivity) { + interval = this.pollingConfig.initialInterval + retries = 0 + } else { + retries++ + if (retries > this.pollingConfig.maxRetries) { + console.warn('Max retries reached for polling') + this.stopPolling() + return + } + + interval = Math.min( + interval * this.pollingConfig.backoffMultiplier, + this.pollingConfig.maxInterval + ) + } // Add jitter to prevent thundering herd const finalInterval = this.pollingConfig.jitter ? interval + Math.random() * 100 : interval - this.answerPollingTimeout = setTimeout(poll, finalInterval) + this.pollingTimeout = setTimeout(poll, finalInterval) } catch (err) { - // 404 is expected when answer isn't available yet - if (err instanceof Error && !err.message?.includes('404')) { - console.error('Error polling for answer:', err) - } + console.error('Error polling offers:', err) // Retry with backoff const finalInterval = this.pollingConfig.jitter ? interval + Math.random() * 100 : interval - this.answerPollingTimeout = setTimeout(poll, finalInterval) + this.pollingTimeout = setTimeout(poll, finalInterval) } } @@ -326,20 +362,21 @@ export class RondevuSignaler implements Signaler { } /** - * Stop polling for answer + * Stop combined polling */ - private stopAnswerPolling(): void { - if (this.answerPollingTimeout) { - clearTimeout(this.answerPollingTimeout) - this.answerPollingTimeout = null + private stopPolling(): void { + if (this.pollingTimeout) { + clearTimeout(this.pollingTimeout) + this.pollingTimeout = null } } /** - * Start polling for ICE candidates with adaptive backoff + * Start polling for ICE candidates (answerer side only) + * Answerers use the separate endpoint since they don't have offers to poll */ private startIcePolling(): void { - if (this.icePollingTimeout || !this.serviceFqn || !this.offerId) { + if (this.icePollingTimeout || !this.serviceFqn || !this.offerId || this.isOfferer) { return } @@ -354,7 +391,7 @@ export class RondevuSignaler implements Signaler { try { const result = await this.rondevu .getAPI() - .getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastIceTimestamp) + .getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastPollTimestamp) let foundCandidates = false @@ -372,13 +409,13 @@ export class RondevuSignaler implements Signaler { } }) - this.lastIceTimestamp = item.createdAt + this.lastPollTimestamp = item.createdAt } catch (err) { console.warn('Failed to process ICE candidate:', err) - this.lastIceTimestamp = item.createdAt + this.lastPollTimestamp = item.createdAt } } else { - this.lastIceTimestamp = item.createdAt + this.lastPollTimestamp = item.createdAt } } @@ -433,7 +470,7 @@ export class RondevuSignaler implements Signaler { * Stop all polling and cleanup */ dispose(): void { - this.stopAnswerPolling() + this.stopPolling() this.stopIcePolling() this.offerListeners = [] this.answerListeners = []