Replace separate polling with combined pollOffers() in RondevuSignaler

Breaking Change: Offerer now uses pollOffers() for efficient batch polling

Changes:
- Offerer: use pollOffers() for combined answer+ICE polling (1 request vs 2)
- Answerer: keep using getOfferIceCandidates() (separate endpoint still needed)
- Add isOfferer flag to track role
- Replace startAnswerPolling() with startPolling() using pollOffers()
- Filter ICE candidates by role (answerer candidates for offerer)
- Use single lastPollTimestamp for both answers and ICE
- Reduce HTTP requests by 50% for offerers
- More efficient signaling with timestamp-based filtering

No backwards compatibility maintained per user request.
This commit is contained in:
2025-12-10 20:55:02 +01:00
parent b4be5e9060
commit 3327c5b219

View File

@@ -46,10 +46,11 @@ export class RondevuSignaler implements Signaler {
private offerListeners: Array<(offer: RTCSessionDescriptionInit) => void> = [] private offerListeners: Array<(offer: RTCSessionDescriptionInit) => void> = []
private answerListeners: Array<(answer: RTCSessionDescriptionInit) => void> = [] private answerListeners: Array<(answer: RTCSessionDescriptionInit) => void> = []
private iceListeners: Array<(candidate: RTCIceCandidate) => void> = [] private iceListeners: Array<(candidate: RTCIceCandidate) => void> = []
private answerPollingTimeout: ReturnType<typeof setTimeout> | null = null private pollingTimeout: ReturnType<typeof setTimeout> | null = null
private icePollingTimeout: ReturnType<typeof setTimeout> | null = null private icePollingTimeout: ReturnType<typeof setTimeout> | null = null
private lastIceTimestamp = 0 private lastPollTimestamp = 0
private isPolling = false private isPolling = false
private isOfferer = false
private pollingConfig: Required<PollingConfig> private pollingConfig: Required<PollingConfig>
constructor( constructor(
@@ -90,12 +91,10 @@ export class RondevuSignaler implements Signaler {
this.offerId = publishedService.offers[0].offerId this.offerId = publishedService.offers[0].offerId
this.serviceFqn = publishedService.serviceFqn this.serviceFqn = publishedService.serviceFqn
this.isOfferer = true
// Start polling for answer // Start combined polling for answers and ICE candidates
this.startAnswerPolling() this.startPolling()
// Start polling for ICE candidates
this.startIcePolling()
} }
/** /**
@@ -114,8 +113,9 @@ export class RondevuSignaler implements Signaler {
// Send answer to the service // Send answer to the service
const result = await this.rondevu.getAPI().postOfferAnswer(this.serviceFqn, this.offerId, answer.sdp) const result = await this.rondevu.getAPI().postOfferAnswer(this.serviceFqn, this.offerId, answer.sdp)
this.offerId = result.offerId this.offerId = result.offerId
this.isOfferer = false
// Start polling for ICE candidates // Start polling for ICE candidates (answerer uses separate endpoint)
this.startIcePolling() this.startIcePolling()
} }
@@ -244,81 +244,117 @@ export class RondevuSignaler implements Signaler {
} }
/** /**
* Start polling for answer (offerer side) with exponential backoff * Start combined polling for answers and ICE candidates (offerer side)
* Uses pollOffers() for efficient batch polling
*/ */
private startAnswerPolling(): void { private startPolling(): void {
if (this.answerPollingTimeout || !this.serviceFqn || !this.offerId) { if (this.pollingTimeout || !this.isOfferer) {
return return
} }
let interval = this.pollingConfig.initialInterval let interval = this.pollingConfig.initialInterval
let retries = 0 let retries = 0
let answerReceived = false
const poll = async () => { const poll = async () => {
if (!this.serviceFqn || !this.offerId) {
this.stopAnswerPolling()
return
}
try { try {
const answer = await this.rondevu.getAPI().getOfferAnswer(this.serviceFqn, this.offerId) const result = await this.rondevu.pollOffers(this.lastPollTimestamp)
if (answer && answer.sdp) { let foundActivity = false
// Store offerId if we didn't have it yet
if (!this.offerId) {
this.offerId = answer.offerId
}
// Got answer - notify listeners and stop polling // Process answers
const answerDesc: RTCSessionDescriptionInit = { if (result.answers.length > 0 && !answerReceived) {
type: 'answer', foundActivity = true
sdp: answer.sdp,
}
this.answerListeners.forEach(listener => { // Find answer for our offerId
try { const answer = result.answers.find(a => a.offerId === this.offerId)
listener(answerDesc)
} catch (err) { if (answer && answer.sdp) {
console.error('Answer listener error:', err) answerReceived = true
const answerDesc: RTCSessionDescriptionInit = {
type: 'answer',
sdp: answer.sdp,
} }
})
// Stop polling once we get the answer this.answerListeners.forEach(listener => {
this.stopAnswerPolling() try {
return listener(answerDesc)
} catch (err) {
console.error('Answer listener error:', err)
}
})
this.lastPollTimestamp = Math.max(this.lastPollTimestamp, answer.answeredAt)
}
} }
// No answer yet - exponential backoff // Process ICE candidates for our offer
retries++ if (this.offerId && result.iceCandidates[this.offerId]) {
if (retries > this.pollingConfig.maxRetries) { const candidates = result.iceCandidates[this.offerId]
console.warn('Max retries reached for answer polling')
this.stopAnswerPolling() // Filter for answerer candidates (offerer receives answerer's candidates)
return const answererCandidates = candidates.filter(c => c.role === 'answerer')
if (answererCandidates.length > 0) {
foundActivity = true
for (const item of answererCandidates) {
if (item.candidate && item.candidate.candidate && item.candidate.candidate !== '') {
try {
const rtcCandidate = new RTCIceCandidate(item.candidate)
this.iceListeners.forEach(listener => {
try {
listener(rtcCandidate)
} catch (err) {
console.error('ICE listener error:', err)
}
})
this.lastPollTimestamp = Math.max(this.lastPollTimestamp, item.createdAt)
} catch (err) {
console.warn('Failed to process ICE candidate:', err)
this.lastPollTimestamp = Math.max(this.lastPollTimestamp, item.createdAt)
}
}
}
}
} }
interval = Math.min( // Adjust interval based on activity
interval * this.pollingConfig.backoffMultiplier, if (foundActivity) {
this.pollingConfig.maxInterval interval = this.pollingConfig.initialInterval
) retries = 0
} else {
retries++
if (retries > this.pollingConfig.maxRetries) {
console.warn('Max retries reached for polling')
this.stopPolling()
return
}
interval = Math.min(
interval * this.pollingConfig.backoffMultiplier,
this.pollingConfig.maxInterval
)
}
// Add jitter to prevent thundering herd // Add jitter to prevent thundering herd
const finalInterval = this.pollingConfig.jitter const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100 ? interval + Math.random() * 100
: interval : interval
this.answerPollingTimeout = setTimeout(poll, finalInterval) this.pollingTimeout = setTimeout(poll, finalInterval)
} catch (err) { } catch (err) {
// 404 is expected when answer isn't available yet console.error('Error polling offers:', err)
if (err instanceof Error && !err.message?.includes('404')) {
console.error('Error polling for answer:', err)
}
// Retry with backoff // Retry with backoff
const finalInterval = this.pollingConfig.jitter const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100 ? interval + Math.random() * 100
: interval : interval
this.answerPollingTimeout = setTimeout(poll, finalInterval) this.pollingTimeout = setTimeout(poll, finalInterval)
} }
} }
@@ -326,20 +362,21 @@ export class RondevuSignaler implements Signaler {
} }
/** /**
* Stop polling for answer * Stop combined polling
*/ */
private stopAnswerPolling(): void { private stopPolling(): void {
if (this.answerPollingTimeout) { if (this.pollingTimeout) {
clearTimeout(this.answerPollingTimeout) clearTimeout(this.pollingTimeout)
this.answerPollingTimeout = null this.pollingTimeout = null
} }
} }
/** /**
* Start polling for ICE candidates with adaptive backoff * Start polling for ICE candidates (answerer side only)
* Answerers use the separate endpoint since they don't have offers to poll
*/ */
private startIcePolling(): void { private startIcePolling(): void {
if (this.icePollingTimeout || !this.serviceFqn || !this.offerId) { if (this.icePollingTimeout || !this.serviceFqn || !this.offerId || this.isOfferer) {
return return
} }
@@ -354,7 +391,7 @@ export class RondevuSignaler implements Signaler {
try { try {
const result = await this.rondevu const result = await this.rondevu
.getAPI() .getAPI()
.getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastIceTimestamp) .getOfferIceCandidates(this.serviceFqn, this.offerId, this.lastPollTimestamp)
let foundCandidates = false let foundCandidates = false
@@ -372,13 +409,13 @@ export class RondevuSignaler implements Signaler {
} }
}) })
this.lastIceTimestamp = item.createdAt this.lastPollTimestamp = item.createdAt
} catch (err) { } catch (err) {
console.warn('Failed to process ICE candidate:', err) console.warn('Failed to process ICE candidate:', err)
this.lastIceTimestamp = item.createdAt this.lastPollTimestamp = item.createdAt
} }
} else { } else {
this.lastIceTimestamp = item.createdAt this.lastPollTimestamp = item.createdAt
} }
} }
@@ -433,7 +470,7 @@ export class RondevuSignaler implements Signaler {
* Stop all polling and cleanup * Stop all polling and cleanup
*/ */
dispose(): void { dispose(): void {
this.stopAnswerPolling() this.stopPolling()
this.stopIcePolling() this.stopIcePolling()
this.offerListeners = [] this.offerListeners = []
this.answerListeners = [] this.answerListeners = []