mirror of
https://github.com/xtr-dev/rondevu-client.git
synced 2025-12-10 10:53:24 +00:00
Fix critical bug: track and use data channels from offers
This fixes the root cause of all connection failures. The service pool was creating data channels but discarding the references, then trying to wait for a 'datachannel' event that would never fire. Changes: - Add dataChannel tracking to OfferPool and ServicePool - Save data channel references when creating offers - Pass channels through the answer flow - Use the existing channel instead of waiting for an event - Wait for channel.onopen instead of ondatachannel The offerer (service pool) creates the data channel and must keep that reference. The 'ondatachannel' event only fires on the answerer side. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ export interface AnsweredOffer {
|
|||||||
answererId: string;
|
answererId: string;
|
||||||
sdp: string; // Answer SDP
|
sdp: string; // Answer SDP
|
||||||
peerConnection: RTCPeerConnection; // Original peer connection
|
peerConnection: RTCPeerConnection; // Original peer connection
|
||||||
|
dataChannel?: RTCDataChannel; // Data channel created with offer
|
||||||
answeredAt: number;
|
answeredAt: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,7 +26,7 @@ export interface OfferPoolOptions {
|
|||||||
onAnswered: (answer: AnsweredOffer) => Promise<void>;
|
onAnswered: (answer: AnsweredOffer) => Promise<void>;
|
||||||
|
|
||||||
/** Callback to create new offers when refilling the pool */
|
/** Callback to create new offers when refilling the pool */
|
||||||
onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }>;
|
onRefill: (count: number) => Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }>;
|
||||||
|
|
||||||
/** Error handler for pool operations */
|
/** Error handler for pool operations */
|
||||||
onError: (error: Error, context: string) => void;
|
onError: (error: Error, context: string) => void;
|
||||||
@@ -41,6 +42,7 @@ export interface OfferPoolOptions {
|
|||||||
export class OfferPool {
|
export class OfferPool {
|
||||||
private offers: Map<string, Offer> = new Map();
|
private offers: Map<string, Offer> = new Map();
|
||||||
private peerConnections: Map<string, RTCPeerConnection> = new Map();
|
private peerConnections: Map<string, RTCPeerConnection> = new Map();
|
||||||
|
private dataChannels: Map<string, RTCDataChannel> = new Map();
|
||||||
private polling: boolean = false;
|
private polling: boolean = false;
|
||||||
private pollingTimer?: ReturnType<typeof setInterval>;
|
private pollingTimer?: ReturnType<typeof setInterval>;
|
||||||
private lastPollTime: number = 0;
|
private lastPollTime: number = 0;
|
||||||
@@ -54,15 +56,18 @@ export class OfferPool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add offers to the pool with their peer connections
|
* Add offers to the pool with their peer connections and data channels
|
||||||
*/
|
*/
|
||||||
async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[]): Promise<void> {
|
async addOffers(offers: Offer[], peerConnections?: RTCPeerConnection[], dataChannels?: RTCDataChannel[]): Promise<void> {
|
||||||
for (let i = 0; i < offers.length; i++) {
|
for (let i = 0; i < offers.length; i++) {
|
||||||
const offer = offers[i];
|
const offer = offers[i];
|
||||||
this.offers.set(offer.id, offer);
|
this.offers.set(offer.id, offer);
|
||||||
if (peerConnections && peerConnections[i]) {
|
if (peerConnections && peerConnections[i]) {
|
||||||
this.peerConnections.set(offer.id, peerConnections[i]);
|
this.peerConnections.set(offer.id, peerConnections[i]);
|
||||||
}
|
}
|
||||||
|
if (dataChannels && dataChannels[i]) {
|
||||||
|
this.dataChannels.set(offer.id, dataChannels[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -116,9 +121,10 @@ export class OfferPool {
|
|||||||
|
|
||||||
// Process each answer
|
// Process each answer
|
||||||
for (const answer of myAnswers) {
|
for (const answer of myAnswers) {
|
||||||
// Get the original offer and peer connection
|
// Get the original offer, peer connection, and data channel
|
||||||
const offer = this.offers.get(answer.offerId);
|
const offer = this.offers.get(answer.offerId);
|
||||||
const pc = this.peerConnections.get(answer.offerId);
|
const pc = this.peerConnections.get(answer.offerId);
|
||||||
|
const channel = this.dataChannels.get(answer.offerId);
|
||||||
|
|
||||||
if (!offer || !pc) {
|
if (!offer || !pc) {
|
||||||
continue; // Offer or peer connection already consumed, skip
|
continue; // Offer or peer connection already consumed, skip
|
||||||
@@ -127,13 +133,15 @@ export class OfferPool {
|
|||||||
// Remove from pool BEFORE processing to prevent duplicate processing
|
// Remove from pool BEFORE processing to prevent duplicate processing
|
||||||
this.offers.delete(answer.offerId);
|
this.offers.delete(answer.offerId);
|
||||||
this.peerConnections.delete(answer.offerId);
|
this.peerConnections.delete(answer.offerId);
|
||||||
|
this.dataChannels.delete(answer.offerId);
|
||||||
|
|
||||||
// Notify ServicePool with answer and original peer connection
|
// Notify ServicePool with answer, original peer connection, and data channel
|
||||||
await this.options.onAnswered({
|
await this.options.onAnswered({
|
||||||
offerId: answer.offerId,
|
offerId: answer.offerId,
|
||||||
answererId: answer.answererId,
|
answererId: answer.answererId,
|
||||||
sdp: answer.sdp,
|
sdp: answer.sdp,
|
||||||
peerConnection: pc,
|
peerConnection: pc,
|
||||||
|
dataChannel: channel,
|
||||||
answeredAt: answer.answeredAt
|
answeredAt: answer.answeredAt
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -144,7 +152,7 @@ export class OfferPool {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await this.options.onRefill(needed);
|
const result = await this.options.onRefill(needed);
|
||||||
await this.addOffers(result.offers, result.peerConnections);
|
await this.addOffers(result.offers, result.peerConnections, result.dataChannels);
|
||||||
} catch (refillError) {
|
} catch (refillError) {
|
||||||
this.options.onError(
|
this.options.onError(
|
||||||
refillError as Error,
|
refillError as Error,
|
||||||
|
|||||||
@@ -138,11 +138,13 @@ export class ServicePool {
|
|||||||
// 2. Create additional offers for pool (poolSize - 1)
|
// 2. Create additional offers for pool (poolSize - 1)
|
||||||
const additionalOffers: Offer[] = [];
|
const additionalOffers: Offer[] = [];
|
||||||
const additionalPeerConnections: RTCPeerConnection[] = [];
|
const additionalPeerConnections: RTCPeerConnection[] = [];
|
||||||
|
const additionalDataChannels: RTCDataChannel[] = [];
|
||||||
if (poolSize > 1) {
|
if (poolSize > 1) {
|
||||||
try {
|
try {
|
||||||
const result = await this.createOffers(poolSize - 1);
|
const result = await this.createOffers(poolSize - 1);
|
||||||
additionalOffers.push(...result.offers);
|
additionalOffers.push(...result.offers);
|
||||||
additionalPeerConnections.push(...result.peerConnections);
|
additionalPeerConnections.push(...result.peerConnections);
|
||||||
|
additionalDataChannels.push(...result.dataChannels);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.handleError(error as Error, 'initial-offer-creation');
|
this.handleError(error as Error, 'initial-offer-creation');
|
||||||
}
|
}
|
||||||
@@ -157,7 +159,7 @@ export class ServicePool {
|
|||||||
onError: (err, ctx) => this.handleError(err, ctx)
|
onError: (err, ctx) => this.handleError(err, ctx)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Add all offers to pool with their peer connections
|
// Add all offers to pool with their peer connections and data channels
|
||||||
const allOffers = [
|
const allOffers = [
|
||||||
{ id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() },
|
{ id: service.offerId, peerId: this.credentials.peerId, sdp: service.offerSdp, topics: [], expiresAt: service.expiresAt, lastSeen: Date.now() },
|
||||||
...additionalOffers
|
...additionalOffers
|
||||||
@@ -166,7 +168,11 @@ export class ServicePool {
|
|||||||
service.peerConnection,
|
service.peerConnection,
|
||||||
...additionalPeerConnections
|
...additionalPeerConnections
|
||||||
];
|
];
|
||||||
await this.offerPool.addOffers(allOffers, allPeerConnections);
|
const allDataChannels = [
|
||||||
|
service.dataChannel,
|
||||||
|
...additionalDataChannels
|
||||||
|
];
|
||||||
|
await this.offerPool.addOffers(allOffers, allPeerConnections, allDataChannels);
|
||||||
|
|
||||||
// 4. Start polling
|
// 4. Start polling
|
||||||
await this.offerPool.start();
|
await this.offerPool.start();
|
||||||
@@ -296,33 +302,32 @@ export class ServicePool {
|
|||||||
sdp: answer.sdp
|
sdp: answer.sdp
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wait for data channel (answerer creates it, we receive it)
|
// Use the data channel we created when making the offer
|
||||||
const channel = await new Promise<RTCDataChannel>((resolve, reject) => {
|
if (!answer.dataChannel) {
|
||||||
|
throw new Error('No data channel found for answered offer');
|
||||||
|
}
|
||||||
|
|
||||||
|
const channel = answer.dataChannel;
|
||||||
|
|
||||||
|
// Wait for the channel to open (it was created when we made the offer)
|
||||||
|
if (channel.readyState !== 'open') {
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
const timeout = setTimeout(
|
const timeout = setTimeout(
|
||||||
() => reject(new Error('Timeout waiting for data channel')),
|
() => reject(new Error('Timeout waiting for data channel to open')),
|
||||||
30000
|
30000
|
||||||
);
|
);
|
||||||
|
|
||||||
peer.on('datachannel', (ch: RTCDataChannel) => {
|
channel.onopen = () => {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
resolve(ch);
|
resolve();
|
||||||
});
|
};
|
||||||
|
|
||||||
// Also check if channel already exists
|
channel.onerror = (error) => {
|
||||||
if (peer.pc.ondatachannel) {
|
|
||||||
const existingHandler = peer.pc.ondatachannel;
|
|
||||||
peer.pc.ondatachannel = (event) => {
|
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
resolve(event.channel);
|
reject(new Error('Data channel error'));
|
||||||
if (existingHandler) existingHandler.call(peer.pc, event);
|
|
||||||
};
|
};
|
||||||
} else {
|
|
||||||
peer.pc.ondatachannel = (event) => {
|
|
||||||
clearTimeout(timeout);
|
|
||||||
resolve(event.channel);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Register connection
|
// Register connection
|
||||||
this.connections.set(connectionId, {
|
this.connections.set(connectionId, {
|
||||||
@@ -366,15 +371,16 @@ export class ServicePool {
|
|||||||
/**
|
/**
|
||||||
* Create multiple offers
|
* Create multiple offers
|
||||||
*/
|
*/
|
||||||
private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[] }> {
|
private async createOffers(count: number): Promise<{ offers: Offer[], peerConnections: RTCPeerConnection[], dataChannels: RTCDataChannel[] }> {
|
||||||
if (count <= 0) {
|
if (count <= 0) {
|
||||||
return { offers: [], peerConnections: [] };
|
return { offers: [], peerConnections: [], dataChannels: [] };
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server supports max 10 offers per request
|
// Server supports max 10 offers per request
|
||||||
const batchSize = Math.min(count, 10);
|
const batchSize = Math.min(count, 10);
|
||||||
const offers: Offer[] = [];
|
const offers: Offer[] = [];
|
||||||
const peerConnections: RTCPeerConnection[] = [];
|
const peerConnections: RTCPeerConnection[] = [];
|
||||||
|
const dataChannels: RTCDataChannel[] = [];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create peer connections and generate offers
|
// Create peer connections and generate offers
|
||||||
@@ -384,8 +390,9 @@ export class ServicePool {
|
|||||||
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
|
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
|
||||||
});
|
});
|
||||||
|
|
||||||
// Create data channel (required for offers)
|
// Create data channel (required for offers) and save reference
|
||||||
pc.createDataChannel('rondevu-service');
|
const channel = pc.createDataChannel('rondevu-service');
|
||||||
|
dataChannels.push(channel);
|
||||||
|
|
||||||
// Create offer
|
// Create offer
|
||||||
const offer = await pc.createOffer();
|
const offer = await pc.createOffer();
|
||||||
@@ -418,7 +425,7 @@ export class ServicePool {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
return { offers, peerConnections };
|
return { offers, peerConnections, dataChannels };
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -431,6 +438,7 @@ export class ServicePool {
|
|||||||
offerSdp: string;
|
offerSdp: string;
|
||||||
expiresAt: number;
|
expiresAt: number;
|
||||||
peerConnection: RTCPeerConnection;
|
peerConnection: RTCPeerConnection;
|
||||||
|
dataChannel: RTCDataChannel;
|
||||||
}> {
|
}> {
|
||||||
const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options;
|
const { username, privateKey, serviceFqn, rtcConfig, isPublic, metadata, ttl } = this.options;
|
||||||
|
|
||||||
@@ -439,7 +447,7 @@ export class ServicePool {
|
|||||||
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
|
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
|
||||||
});
|
});
|
||||||
|
|
||||||
pc.createDataChannel('rondevu-service');
|
const dataChannel = pc.createDataChannel('rondevu-service');
|
||||||
|
|
||||||
// Create offer
|
// Create offer
|
||||||
const offer = await pc.createOffer();
|
const offer = await pc.createOffer();
|
||||||
@@ -491,7 +499,8 @@ export class ServicePool {
|
|||||||
offerId: data.offerId,
|
offerId: data.offerId,
|
||||||
offerSdp,
|
offerSdp,
|
||||||
expiresAt: data.expiresAt,
|
expiresAt: data.expiresAt,
|
||||||
peerConnection: pc // Keep peer connection alive
|
peerConnection: pc, // Keep peer connection alive
|
||||||
|
dataChannel // Keep data channel alive
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -504,7 +513,7 @@ export class ServicePool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const result = await this.createOffers(count);
|
const result = await this.createOffers(count);
|
||||||
await this.offerPool.addOffers(result.offers, result.peerConnections);
|
await this.offerPool.addOffers(result.offers, result.peerConnections, result.dataChannels);
|
||||||
this.updateStatus();
|
this.updateStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user