diff --git a/package-lock.json b/package-lock.json index 532e61f..f15e85c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@xtr-dev/rondevu-client", - "version": "0.9.2", + "version": "0.12.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@xtr-dev/rondevu-client", - "version": "0.9.2", + "version": "0.12.0", "license": "MIT", "dependencies": { "@noble/ed25519": "^3.0.0", diff --git a/package.json b/package.json index aa2f79b..ed54f82 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@xtr-dev/rondevu-client", - "version": "0.11.0", + "version": "0.12.0", "description": "TypeScript client for Rondevu with durable WebRTC connections, automatic reconnection, and message queuing", "type": "module", "main": "dist/index.js", diff --git a/src/api.ts b/src/api.ts index 09d2069..6982605 100644 --- a/src/api.ts +++ b/src/api.ts @@ -37,10 +37,14 @@ export interface Offer { answererPeerId?: string } +export interface OfferRequest { + sdp: string +} + export interface ServiceRequest { username: string serviceFqn: string - sdp: string + offers: OfferRequest[] ttl?: number isPublic?: boolean metadata?: Record @@ -48,10 +52,17 @@ export interface ServiceRequest { message: string } +export interface ServiceOffer { + offerId: string + sdp: string + createdAt: number + expiresAt: number +} + export interface Service { serviceId: string uuid: string - offerId: string + offers: ServiceOffer[] username: string serviceFqn: string isPublic: boolean @@ -90,6 +101,13 @@ export class RondevuAPI { private credentials?: Credentials ) {} + /** + * Set credentials for authentication + */ + setCredentials(credentials: Credentials): void { + this.credentials = credentials + } + /** * Authentication header */ @@ -210,42 +228,45 @@ export class RondevuAPI { } /** - * Answer an offer + * Answer a service */ - async answerOffer(offerId: string, sdp: string, secret?: string): Promise { - const response = await fetch(`${this.baseUrl}/offers/${offerId}/answer`, { + async answerService(serviceUuid: string, sdp: string): Promise<{ offerId: string }> { + const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/answer`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.getAuthHeader(), }, - body: JSON.stringify({ sdp, secret }), + body: JSON.stringify({ sdp }), }) if (!response.ok) { const error = await response.json().catch(() => ({ error: 'Unknown error' })) - throw new Error(`Failed to answer offer: ${error.error || response.statusText}`) + throw new Error(`Failed to answer service: ${error.error || response.statusText}`) } + + return await response.json() } /** - * Get answer for an offer (offerer polls this) + * Get answer for a service (offerer polls this) */ - async getAnswer(offerId: string): Promise<{ sdp: string } | null> { - const response = await fetch(`${this.baseUrl}/offers/${offerId}/answer`, { + async getServiceAnswer(serviceUuid: string): Promise<{ sdp: string; offerId: string } | null> { + const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/answer`, { headers: this.getAuthHeader(), }) - if (response.status === 404) { - return null // No answer yet - } - if (!response.ok) { + // 404 means not yet answered + if (response.status === 404) { + return null + } const error = await response.json().catch(() => ({ error: 'Unknown error' })) throw new Error(`Failed to get answer: ${error.error || response.statusText}`) } - return await response.json() + const data = await response.json() + return { sdp: data.sdp, offerId: data.offerId } } /** @@ -269,39 +290,48 @@ export class RondevuAPI { // ============================================ /** - * Add ICE candidates to an offer + * Add ICE candidates to a service */ - async addIceCandidates(offerId: string, candidates: RTCIceCandidateInit[]): Promise { - const response = await fetch(`${this.baseUrl}/offers/${offerId}/ice-candidates`, { + async addServiceIceCandidates(serviceUuid: string, candidates: RTCIceCandidateInit[], offerId?: string): Promise<{ offerId: string }> { + const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/ice-candidates`, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.getAuthHeader(), }, - body: JSON.stringify({ candidates }), + body: JSON.stringify({ candidates, offerId }), }) if (!response.ok) { const error = await response.json().catch(() => ({ error: 'Unknown error' })) throw new Error(`Failed to add ICE candidates: ${error.error || response.statusText}`) } + + return await response.json() } /** - * Get ICE candidates for an offer (with polling support) + * Get ICE candidates for a service (with polling support) */ - async getIceCandidates(offerId: string, since: number = 0): Promise { - const response = await fetch( - `${this.baseUrl}/offers/${offerId}/ice-candidates?since=${since}`, - { headers: this.getAuthHeader() } - ) + async getServiceIceCandidates(serviceUuid: string, since: number = 0, offerId?: string): Promise<{ candidates: IceCandidate[]; offerId: string }> { + const url = new URL(`${this.baseUrl}/services/${serviceUuid}/ice-candidates`) + url.searchParams.set('since', since.toString()) + if (offerId) { + url.searchParams.set('offerId', offerId) + } + + const response = await fetch(url.toString(), { headers: this.getAuthHeader() }) if (!response.ok) { const error = await response.json().catch(() => ({ error: 'Unknown error' })) throw new Error(`Failed to get ICE candidates: ${error.error || response.statusText}`) } - return await response.json() + const data = await response.json() + return { + candidates: data.candidates || [], + offerId: data.offerId + } } // ============================================ @@ -312,7 +342,7 @@ export class RondevuAPI { * Publish a service */ async publishService(service: ServiceRequest): Promise { - const response = await fetch(`${this.baseUrl}/services`, { + const response = await fetch(`${this.baseUrl}/users/${encodeURIComponent(service.username)}/services`, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -346,11 +376,11 @@ export class RondevuAPI { } /** - * Search services by username + * Search services by username - lists all services for a username */ async searchServicesByUsername(username: string): Promise { const response = await fetch( - `${this.baseUrl}/services?username=${encodeURIComponent(username)}`, + `${this.baseUrl}/users/${encodeURIComponent(username)}/services`, { headers: this.getAuthHeader() } ) @@ -359,41 +389,29 @@ export class RondevuAPI { throw new Error(`Failed to search services: ${error.error || response.statusText}`) } - return await response.json() + const data = await response.json() + return data.services || [] } /** - * Search services by FQN - */ - async searchServicesByFqn(serviceFqn: string): Promise { - const response = await fetch( - `${this.baseUrl}/services?serviceFqn=${encodeURIComponent(serviceFqn)}`, - { headers: this.getAuthHeader() } - ) - - if (!response.ok) { - const error = await response.json().catch(() => ({ error: 'Unknown error' })) - throw new Error(`Failed to search services: ${error.error || response.statusText}`) - } - - return await response.json() - } - - /** - * Search services by username AND FQN + * Search services by username AND FQN - returns full service details */ async searchServices(username: string, serviceFqn: string): Promise { const response = await fetch( - `${this.baseUrl}/services?username=${encodeURIComponent(username)}&serviceFqn=${encodeURIComponent(serviceFqn)}`, + `${this.baseUrl}/users/${encodeURIComponent(username)}/services/${encodeURIComponent(serviceFqn)}`, { headers: this.getAuthHeader() } ) if (!response.ok) { + if (response.status === 404) { + return [] + } const error = await response.json().catch(() => ({ error: 'Unknown error' })) throw new Error(`Failed to search services: ${error.error || response.statusText}`) } - return await response.json() + const service = await response.json() + return [service] } // ============================================ @@ -405,7 +423,7 @@ export class RondevuAPI { */ async checkUsername(username: string): Promise<{ available: boolean; owner?: string }> { const response = await fetch( - `${this.baseUrl}/usernames/${encodeURIComponent(username)}/check` + `${this.baseUrl}/users/${encodeURIComponent(username)}` ) if (!response.ok) { @@ -425,7 +443,7 @@ export class RondevuAPI { signature: string, message: string ): Promise<{ success: boolean; username: string }> { - const response = await fetch(`${this.baseUrl}/usernames/${encodeURIComponent(username)}`, { + const response = await fetch(`${this.baseUrl}/users/${encodeURIComponent(username)}`, { method: 'POST', headers: { 'Content-Type': 'application/json', diff --git a/src/durable-connection.ts b/src/durable-connection.ts index 04eb4ba..cfcb07b 100644 --- a/src/durable-connection.ts +++ b/src/durable-connection.ts @@ -12,10 +12,9 @@ import { createBin } from './bin.js' import { WebRTCContext } from './webrtc-context' export type WebRTCRondevuConnectionOptions = { - id: string - service: string - offer: RTCSessionDescriptionInit | null + offer?: RTCSessionDescriptionInit | null context: WebRTCContext + signaler: Signaler } /** @@ -63,41 +62,39 @@ export type WebRTCRondevuConnectionOptions = { * }); * ``` */ -export class WebRTCRondevuConnection implements ConnectionInterface { +export class RTCDurableConnection implements ConnectionInterface { private readonly side: 'offer' | 'answer' public readonly expiresAt: number = 0 public readonly lastActive: number = 0 public readonly events: EventBus = new EventBus() public readonly ready: Promise private iceBin = createBin() - private ctx: WebRTCContext + private context: WebRTCContext + private readonly signaler: Signaler private _conn: RTCPeerConnection | null = null private _state: ConnectionInterface['state'] = 'disconnected' + private _dataChannel: RTCDataChannel | null = null + private messageQueue: Array<{ + message: Message + options: QueueMessageOptions + timestamp: number + }> = [] - constructor({ context: ctx, offer }: WebRTCRondevuConnectionOptions) { - this.ctx = ctx - this._conn = ctx.createPeerConnection() + constructor({ context, offer, signaler }: WebRTCRondevuConnectionOptions) { + this.context = context + this.signaler = signaler + this._conn = context.createPeerConnection() this.side = offer ? 'answer' : 'offer' // setup data channel if (offer) { this._conn.addEventListener('datachannel', e => { - const channel = e.channel - channel.addEventListener('message', e => { - console.log('Message from peer:', e) - }) - channel.addEventListener('open', () => { - channel.send('I am ' + this.side) - }) + this._dataChannel = e.channel + this.setupDataChannelListeners(this._dataChannel) }) } else { - const channel = this._conn.createDataChannel('vu.ronde.protocol') - channel.addEventListener('message', e => { - console.log('Message from peer:', e) - }) - channel.addEventListener('open', () => { - channel.send('I am ' + this.side) - }) + this._dataChannel = this._conn.createDataChannel('vu.ronde.protocol') + this.setupDataChannelListeners(this._dataChannel) } // setup description exchange @@ -108,12 +105,12 @@ export class WebRTCRondevuConnection implements ConnectionInterface { .then(async answer => { if (!answer || !this._conn) throw new Error('Connection disappeared') await this._conn.setLocalDescription(answer) - return await ctx.signaler.setAnswer(answer) + return await signaler.setAnswer(answer) }) : this._conn.createOffer().then(async offer => { if (!this._conn) throw new Error('Connection disappeared') await this._conn.setLocalDescription(offer) - return await ctx.signaler.setOffer(offer) + return await signaler.setOffer(offer) }) // propagate connection state changes @@ -161,12 +158,12 @@ export class WebRTCRondevuConnection implements ConnectionInterface { */ private startIce() { const listener = ({ candidate }: { candidate: RTCIceCandidate | null }) => { - if (candidate) this.ctx.signaler.addIceCandidate(candidate) + if (candidate) this.signaler.addIceCandidate(candidate) } if (!this._conn) throw new Error('Connection disappeared') this._conn.addEventListener('icecandidate', listener) this.iceBin( - this.ctx.signaler.addListener((candidate: RTCIceCandidate) => + this.signaler.addListener((candidate: RTCIceCandidate) => this._conn?.addIceCandidate(candidate) ), () => this._conn?.removeEventListener('icecandidate', listener) @@ -201,15 +198,68 @@ export class WebRTCRondevuConnection implements ConnectionInterface { return this._state } + /** + * Setup data channel event listeners + */ + private setupDataChannelListeners(channel: RTCDataChannel): void { + channel.addEventListener('message', e => { + this.events.emit('message', e.data) + }) + + channel.addEventListener('open', () => { + // Channel opened - flush queued messages + this.flushQueue().catch(err => { + console.error('Failed to flush message queue:', err) + }) + }) + + channel.addEventListener('error', err => { + console.error('Data channel error:', err) + }) + + channel.addEventListener('close', () => { + console.log('Data channel closed') + }) + } + + /** + * Flush the message queue + */ + private async flushQueue(): Promise { + while (this.messageQueue.length > 0 && this._state === 'connected') { + const item = this.messageQueue.shift()! + + // Check expiration + if (item.options.expiresAt && Date.now() > item.options.expiresAt) { + continue + } + + const success = await this.sendMessage(item.message) + if (!success) { + // Re-queue on failure + this.messageQueue.unshift(item) + break + } + } + } + /** * Queue a message for sending when connection is established * * @param message - Message to queue (string or ArrayBuffer) * @param options - Queue options (e.g., expiration time) */ - queueMessage(message: Message, options: QueueMessageOptions = {}): Promise { - // TODO: Implement message queuing - return Promise.resolve(undefined) + async queueMessage(message: Message, options: QueueMessageOptions = {}): Promise { + this.messageQueue.push({ + message, + options, + timestamp: Date.now() + }) + + // Try immediate send if connected + if (this._state === 'connected') { + await this.flushQueue() + } } /** @@ -218,8 +268,23 @@ export class WebRTCRondevuConnection implements ConnectionInterface { * @param message - Message to send (string or ArrayBuffer) * @returns Promise resolving to true if sent successfully */ - sendMessage(message: Message): Promise { - // TODO: Implement message sending via data channel - return Promise.resolve(false) + async sendMessage(message: Message): Promise { + if (this._state !== 'connected' || !this._dataChannel) { + return false + } + + if (this._dataChannel.readyState !== 'open') { + return false + } + + try { + // TypeScript has trouble with the union type, so we cast to any + // Both string and ArrayBuffer are valid for RTCDataChannel.send() + this._dataChannel.send(message as any) + return true + } catch (err) { + console.error('Send failed:', err) + return false + } } } diff --git a/src/index.ts b/src/index.ts index 8d84bc5..d6b1240 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,10 +6,11 @@ export { EventBus } from './event-bus.js' export { RondevuAPI } from './api.js' export { RondevuService } from './rondevu-service.js' -export { RondevuSignaler } from './signaler.js' +export { RondevuSignaler } from './rondevu-signaler.js' +export { WebRTCContext } from './webrtc-context.js' +export { RTCDurableConnection } from './durable-connection' export { ServiceHost } from './service-host.js' export { ServiceClient } from './service-client.js' -export { WebRTCRondevuConnection } from './connection.js' export { createBin } from './bin.js' // Export types @@ -38,3 +39,6 @@ export type { RondevuServiceOptions, PublishServiceOptions } from './rondevu-ser export type { ServiceHostOptions, ServiceHostEvents } from './service-host.js' export type { ServiceClientOptions, ServiceClientEvents } from './service-client.js' + +export type { PollingConfig } from './rondevu-signaler.js' + diff --git a/src/rondevu-context.ts b/src/rondevu-context.ts deleted file mode 100644 index e69de29..0000000 diff --git a/src/rondevu-service.ts b/src/rondevu-service.ts index 7ebd055..b5ff264c 100644 --- a/src/rondevu-service.ts +++ b/src/rondevu-service.ts @@ -9,7 +9,7 @@ export interface RondevuServiceOptions { export interface PublishServiceOptions { serviceFqn: string - sdp: string + offers: Array<{ sdp: string }> ttl?: number isPublic?: boolean metadata?: Record @@ -39,7 +39,7 @@ export interface PublishServiceOptions { * // Publish a service * const publishedService = await service.publishService({ * serviceFqn: 'chat.app@1.0.0', - * sdp: offerSdp, + * offers: [{ sdp: offerSdp }], * ttl: 300000, * isPublic: true, * }) @@ -69,7 +69,7 @@ export class RondevuService { // Register with API if no credentials provided if (!this.api['credentials']) { const credentials = await this.api.register() - ;(this.api as any).credentials = credentials + this.api.setCredentials(credentials) } } @@ -94,7 +94,7 @@ export class RondevuService { } // Generate signature for username claim - const message = `claim-username-${this.username}-${Date.now()}` + const message = `claim:${this.username}:${Date.now()}` const signature = await RondevuAPI.signMessage(message, this.keypair.privateKey) // Claim the username @@ -116,17 +116,17 @@ export class RondevuService { ) } - const { serviceFqn, sdp, ttl, isPublic, metadata } = options + const { serviceFqn, offers, ttl, isPublic, metadata } = options // Generate signature for service publication - const message = `publish-${this.username}-${serviceFqn}-${Date.now()}` + const message = `publish:${this.username}:${serviceFqn}:${Date.now()}` const signature = await RondevuAPI.signMessage(message, this.keypair.privateKey) // Create service request const serviceRequest: ServiceRequest = { username: this.username, serviceFqn, - sdp, + offers, signature, message, ttl, @@ -145,6 +145,13 @@ export class RondevuService { return this.keypair } + /** + * Get the username + */ + getUsername(): string { + return this.username + } + /** * Get the public key */ diff --git a/src/rondevu-signaler.ts b/src/rondevu-signaler.ts index e69de29..d5bdf0c 100644 --- a/src/rondevu-signaler.ts +++ b/src/rondevu-signaler.ts @@ -0,0 +1,462 @@ +import { Signaler } from './types.js' +import { RondevuService } from './rondevu-service.js' +import { Binnable } from './bin.js' + +export interface PollingConfig { + initialInterval?: number // Default: 500ms + maxInterval?: number // Default: 5000ms + backoffMultiplier?: number // Default: 1.5 + maxRetries?: number // Default: 50 (50 seconds max) + jitter?: boolean // Default: true +} + +/** + * RondevuSignaler - Handles WebRTC signaling via Rondevu service + * + * Manages offer/answer exchange and ICE candidate polling for establishing + * WebRTC connections through the Rondevu signaling server. + * + * Supports configurable polling with exponential backoff and jitter to reduce + * server load and prevent thundering herd issues. + * + * @example + * ```typescript + * const signaler = new RondevuSignaler( + * rondevuService, + * 'chat.app@1.0.0', + * 'peer-username', + * { initialInterval: 500, maxInterval: 5000, jitter: true } + * ) + * + * // For offerer: + * await signaler.setOffer(offer) + * signaler.addAnswerListener(answer => { + * // Handle remote answer + * }) + * + * // For answerer: + * signaler.addOfferListener(offer => { + * // Handle remote offer + * }) + * await signaler.setAnswer(answer) + * ``` + */ +export class RondevuSignaler implements Signaler { + private offerId: string | null = null + private serviceUuid: string | null = null + private offerListeners: Array<(offer: RTCSessionDescriptionInit) => void> = [] + private answerListeners: Array<(answer: RTCSessionDescriptionInit) => void> = [] + private iceListeners: Array<(candidate: RTCIceCandidate) => void> = [] + private answerPollingTimeout: ReturnType | null = null + private icePollingTimeout: ReturnType | null = null + private lastIceTimestamp = 0 + private isPolling = false + private pollingConfig: Required + + constructor( + private readonly rondevu: RondevuService, + private readonly service: string, + private readonly host?: string, + pollingConfig?: PollingConfig + ) { + this.pollingConfig = { + initialInterval: pollingConfig?.initialInterval ?? 500, + maxInterval: pollingConfig?.maxInterval ?? 5000, + backoffMultiplier: pollingConfig?.backoffMultiplier ?? 1.5, + maxRetries: pollingConfig?.maxRetries ?? 50, + jitter: pollingConfig?.jitter ?? true + } + } + + /** + * Publish an offer as a service + * Used by the offerer to make their offer available + */ + async setOffer(offer: RTCSessionDescriptionInit): Promise { + if (!offer.sdp) { + throw new Error('Offer SDP is required') + } + + // Publish service with the offer SDP + const publishedService = await this.rondevu.publishService({ + serviceFqn: this.service, + offers: [{ sdp: offer.sdp }], + ttl: 300000, // 5 minutes + isPublic: true, + }) + + // Get the first offer from the published service + if (!publishedService.offers || publishedService.offers.length === 0) { + throw new Error('No offers returned from service publication') + } + + this.offerId = publishedService.offers[0].offerId + this.serviceUuid = publishedService.uuid + + // Start polling for answer + this.startAnswerPolling() + + // Start polling for ICE candidates + this.startIcePolling() + } + + /** + * Send an answer to the offerer + * Used by the answerer to respond to an offer + */ + async setAnswer(answer: RTCSessionDescriptionInit): Promise { + if (!answer.sdp) { + throw new Error('Answer SDP is required') + } + + if (!this.serviceUuid) { + throw new Error('No service UUID available. Must receive offer first.') + } + + // Send answer to the service + const result = await this.rondevu.getAPI().answerService(this.serviceUuid, answer.sdp) + this.offerId = result.offerId + + // Start polling for ICE candidates + this.startIcePolling() + } + + /** + * Listen for incoming offers + * Used by the answerer to receive offers from the offerer + */ + addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable { + this.offerListeners.push(callback) + + // If we have a host, start searching for their service + if (this.host && !this.isPolling) { + this.searchForOffer() + } + + // Return cleanup function + return () => { + const index = this.offerListeners.indexOf(callback) + if (index > -1) { + this.offerListeners.splice(index, 1) + } + } + } + + /** + * Listen for incoming answers + * Used by the offerer to receive the answer from the answerer + */ + addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable { + this.answerListeners.push(callback) + + // Return cleanup function + return () => { + const index = this.answerListeners.indexOf(callback) + if (index > -1) { + this.answerListeners.splice(index, 1) + } + } + } + + /** + * Send an ICE candidate to the remote peer + */ + async addIceCandidate(candidate: RTCIceCandidate): Promise { + if (!this.serviceUuid) { + console.warn('Cannot send ICE candidate: no service UUID') + return + } + + const candidateData = candidate.toJSON() + + // Skip empty candidates + if (!candidateData.candidate || candidateData.candidate === '') { + return + } + + try { + const result = await this.rondevu.getAPI().addServiceIceCandidates( + this.serviceUuid, + [candidateData], + this.offerId || undefined + ) + // Store offerId if we didn't have it yet + if (!this.offerId) { + this.offerId = result.offerId + } + } catch (err) { + console.error('Failed to send ICE candidate:', err) + } + } + + /** + * Listen for ICE candidates from the remote peer + */ + addListener(callback: (candidate: RTCIceCandidate) => void): Binnable { + this.iceListeners.push(callback) + + // Return cleanup function + return () => { + const index = this.iceListeners.indexOf(callback) + if (index > -1) { + this.iceListeners.splice(index, 1) + } + } + } + + /** + * Search for an offer from the host + * Used by the answerer to find the offerer's service + */ + private async searchForOffer(): Promise { + if (!this.host) { + throw new Error('No host specified for offer search') + } + + this.isPolling = true + + try { + // Search for services by username and service FQN + const services = await this.rondevu.getAPI().searchServices(this.host, this.service) + + if (services.length === 0) { + console.warn(`No services found for ${this.host}/${this.service}`) + this.isPolling = false + return + } + + // Get the first available service (already has full details from searchServices) + const service = services[0] as any + + // Get the first available offer from the service + if (!service.offers || service.offers.length === 0) { + console.warn(`No offers available for service ${this.host}/${this.service}`) + this.isPolling = false + return + } + + const firstOffer = service.offers[0] + this.offerId = firstOffer.offerId + this.serviceUuid = service.uuid + + // Notify offer listeners + const offer: RTCSessionDescriptionInit = { + type: 'offer', + sdp: firstOffer.sdp, + } + + this.offerListeners.forEach(listener => { + try { + listener(offer) + } catch (err) { + console.error('Offer listener error:', err) + } + }) + } catch (err) { + console.error('Failed to search for offer:', err) + this.isPolling = false + } + } + + /** + * Start polling for answer (offerer side) with exponential backoff + */ + private startAnswerPolling(): void { + if (this.answerPollingTimeout || !this.serviceUuid) { + return + } + + let interval = this.pollingConfig.initialInterval + let retries = 0 + + const poll = async () => { + if (!this.serviceUuid) { + this.stopAnswerPolling() + return + } + + try { + const answer = await this.rondevu.getAPI().getServiceAnswer(this.serviceUuid) + + if (answer && answer.sdp) { + // Store offerId if we didn't have it yet + if (!this.offerId) { + this.offerId = answer.offerId + } + + // Got answer - notify listeners and stop polling + const answerDesc: RTCSessionDescriptionInit = { + type: 'answer', + sdp: answer.sdp, + } + + this.answerListeners.forEach(listener => { + try { + listener(answerDesc) + } catch (err) { + console.error('Answer listener error:', err) + } + }) + + // Stop polling once we get the answer + this.stopAnswerPolling() + return + } + + // No answer yet - exponential backoff + retries++ + if (retries > this.pollingConfig.maxRetries) { + console.warn('Max retries reached for answer polling') + this.stopAnswerPolling() + return + } + + interval = Math.min( + interval * this.pollingConfig.backoffMultiplier, + this.pollingConfig.maxInterval + ) + + // Add jitter to prevent thundering herd + const finalInterval = this.pollingConfig.jitter + ? interval + Math.random() * 100 + : interval + + this.answerPollingTimeout = setTimeout(poll, finalInterval) + + } catch (err) { + // 404 is expected when answer isn't available yet + if (err instanceof Error && !err.message?.includes('404')) { + console.error('Error polling for answer:', err) + } + + // Retry with backoff + const finalInterval = this.pollingConfig.jitter + ? interval + Math.random() * 100 + : interval + this.answerPollingTimeout = setTimeout(poll, finalInterval) + } + } + + poll() // Start immediately + } + + /** + * Stop polling for answer + */ + private stopAnswerPolling(): void { + if (this.answerPollingTimeout) { + clearTimeout(this.answerPollingTimeout) + this.answerPollingTimeout = null + } + } + + /** + * Start polling for ICE candidates with adaptive backoff + */ + private startIcePolling(): void { + if (this.icePollingTimeout || !this.serviceUuid) { + return + } + + let interval = this.pollingConfig.initialInterval + + const poll = async () => { + if (!this.serviceUuid) { + this.stopIcePolling() + return + } + + try { + const result = await this.rondevu + .getAPI() + .getServiceIceCandidates(this.serviceUuid, this.lastIceTimestamp, this.offerId || undefined) + + // Store offerId if we didn't have it yet + if (!this.offerId) { + this.offerId = result.offerId + } + + let foundCandidates = false + + for (const item of result.candidates) { + if (item.candidate && item.candidate.candidate && item.candidate.candidate !== '') { + foundCandidates = true + try { + const rtcCandidate = new RTCIceCandidate(item.candidate) + + this.iceListeners.forEach(listener => { + try { + listener(rtcCandidate) + } catch (err) { + console.error('ICE listener error:', err) + } + }) + + this.lastIceTimestamp = item.createdAt + } catch (err) { + console.warn('Failed to process ICE candidate:', err) + this.lastIceTimestamp = item.createdAt + } + } else { + this.lastIceTimestamp = item.createdAt + } + } + + // If candidates found, reset interval to initial value + // Otherwise, increase interval with backoff + if (foundCandidates) { + interval = this.pollingConfig.initialInterval + } else { + interval = Math.min( + interval * this.pollingConfig.backoffMultiplier, + this.pollingConfig.maxInterval + ) + } + + // Add jitter + const finalInterval = this.pollingConfig.jitter + ? interval + Math.random() * 100 + : interval + + this.icePollingTimeout = setTimeout(poll, finalInterval) + + } catch (err) { + // 404/410 means offer expired, stop polling + if (err instanceof Error && (err.message?.includes('404') || err.message?.includes('410'))) { + console.warn('Offer not found or expired, stopping ICE polling') + this.stopIcePolling() + } else if (err instanceof Error && !err.message?.includes('404')) { + console.error('Error polling for ICE candidates:', err) + // Continue polling despite errors + const finalInterval = this.pollingConfig.jitter + ? interval + Math.random() * 100 + : interval + this.icePollingTimeout = setTimeout(poll, finalInterval) + } + } + } + + poll() // Start immediately + } + + /** + * Stop polling for ICE candidates + */ + private stopIcePolling(): void { + if (this.icePollingTimeout) { + clearTimeout(this.icePollingTimeout) + this.icePollingTimeout = null + } + } + + /** + * Stop all polling and cleanup + */ + dispose(): void { + this.stopAnswerPolling() + this.stopIcePolling() + this.offerListeners = [] + this.answerListeners = [] + this.iceListeners = [] + } +} diff --git a/src/service-client.ts b/src/service-client.ts new file mode 100644 index 0000000..2c530b4 --- /dev/null +++ b/src/service-client.ts @@ -0,0 +1,203 @@ +import { RondevuService } from './rondevu-service.js' +import { RondevuSignaler } from './rondevu-signaler.js' +import { WebRTCContext } from './webrtc-context.js' +import { RTCDurableConnection } from './durable-connection.js' +import { EventBus } from './event-bus.js' + +export interface ServiceClientOptions { + username: string // Host username + serviceFqn: string // e.g., 'chat.app@1.0.0' + rondevuService: RondevuService + autoReconnect?: boolean // Default: true + maxReconnectAttempts?: number // Default: 5 + rtcConfiguration?: RTCConfiguration +} + +export interface ServiceClientEvents { + connected: RTCDurableConnection + disconnected: void + reconnecting: { attempt: number; maxAttempts: number } + error: Error +} + +/** + * ServiceClient - High-level wrapper for connecting to a WebRTC service + * + * Simplifies client connection by handling: + * - Service discovery + * - Offer/answer exchange + * - ICE candidate polling + * - Automatic reconnection + * + * @example + * ```typescript + * const client = new ServiceClient({ + * username: 'host-user', + * serviceFqn: 'chat.app@1.0.0', + * rondevuService: myService + * }) + * + * client.events.on('connected', conn => { + * conn.events.on('message', msg => console.log('Received:', msg)) + * conn.sendMessage('Hello from client!') + * }) + * + * await client.connect() + * ``` + */ +export class ServiceClient { + events: EventBus + + private signaler: RondevuSignaler | null = null + private webrtcContext: WebRTCContext + private connection: RTCDurableConnection | null = null + private autoReconnect: boolean + private maxReconnectAttempts: number + private reconnectAttempts = 0 + private isConnecting = false + + constructor(private options: ServiceClientOptions) { + this.events = new EventBus() + this.webrtcContext = new WebRTCContext(options.rtcConfiguration) + this.autoReconnect = options.autoReconnect !== undefined ? options.autoReconnect : true + this.maxReconnectAttempts = options.maxReconnectAttempts || 5 + } + + /** + * Connect to the service + */ + async connect(): Promise { + if (this.isConnecting) { + throw new Error('Connection already in progress') + } + + if (this.connection) { + throw new Error('Already connected. Disconnect first.') + } + + this.isConnecting = true + + try { + // Create signaler + this.signaler = new RondevuSignaler( + this.options.rondevuService, + this.options.serviceFqn, + this.options.username + ) + + // Wait for remote offer from signaler + const remoteOffer = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Service discovery timeout')) + }, 30000) + + this.signaler!.addOfferListener((offer) => { + clearTimeout(timeout) + resolve(offer) + }) + }) + + // Create connection with remote offer (makes us the answerer) + const connection = new RTCDurableConnection({ + context: this.webrtcContext, + signaler: this.signaler, + offer: remoteOffer + }) + + // Wait for connection to be ready + await connection.ready + + // Set up connection event listeners + connection.events.on('state-change', (state) => { + if (state === 'connected') { + this.reconnectAttempts = 0 + this.events.emit('connected', connection) + } else if (state === 'disconnected') { + this.events.emit('disconnected', undefined) + if (this.autoReconnect && this.reconnectAttempts < this.maxReconnectAttempts) { + this.attemptReconnect() + } + } + }) + + this.connection = connection + this.isConnecting = false + + return connection + + } catch (err) { + this.isConnecting = false + const error = err instanceof Error ? err : new Error(String(err)) + this.events.emit('error', error) + throw error + } + } + + /** + * Disconnect from the service + */ + dispose(): void { + if (this.signaler) { + this.signaler.dispose() + this.signaler = null + } + + if (this.connection) { + this.connection.disconnect() + this.connection = null + } + + this.isConnecting = false + this.reconnectAttempts = 0 + } + + /** + * @deprecated Use dispose() instead + */ + disconnect(): void { + this.dispose() + } + + /** + * Attempt to reconnect + */ + private async attemptReconnect(): Promise { + this.reconnectAttempts++ + this.events.emit('reconnecting', { + attempt: this.reconnectAttempts, + maxAttempts: this.maxReconnectAttempts + }) + + // Cleanup old connection + if (this.signaler) { + this.signaler.dispose() + this.signaler = null + } + + if (this.connection) { + this.connection = null + } + + // Wait a bit before reconnecting + await new Promise(resolve => setTimeout(resolve, 1000 * this.reconnectAttempts)) + + try { + await this.connect() + } catch (err) { + console.error('Reconnection attempt failed:', err) + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.attemptReconnect() + } else { + const error = new Error('Max reconnection attempts reached') + this.events.emit('error', error) + } + } + } + + /** + * Get the current connection + */ + getConnection(): RTCDurableConnection | null { + return this.connection + } +} diff --git a/src/service-host.ts b/src/service-host.ts new file mode 100644 index 0000000..f5c5f9b --- /dev/null +++ b/src/service-host.ts @@ -0,0 +1,158 @@ +import { RondevuService } from './rondevu-service.js' +import { RondevuSignaler } from './rondevu-signaler.js' +import { WebRTCContext } from './webrtc-context.js' +import { RTCDurableConnection } from './durable-connection.js' +import { EventBus } from './event-bus.js' + +export interface ServiceHostOptions { + service: string // e.g., 'chat.app@1.0.0' + rondevuService: RondevuService + maxPeers?: number // Default: 5 + ttl?: number // Default: 300000 (5 min) + isPublic?: boolean // Default: true + rtcConfiguration?: RTCConfiguration + metadata?: Record +} + +export interface ServiceHostEvents { + connection: RTCDurableConnection + error: Error +} + +/** + * ServiceHost - High-level wrapper for hosting a WebRTC service + * + * Simplifies hosting by handling: + * - Offer/answer exchange + * - ICE candidate polling + * - Connection pool management + * - Automatic reconnection + * + * @example + * ```typescript + * const host = new ServiceHost({ + * service: 'chat.app@1.0.0', + * rondevuService: myService, + * maxPeers: 5 + * }) + * + * host.events.on('connection', conn => { + * conn.events.on('message', msg => console.log('Received:', msg)) + * conn.sendMessage('Hello!') + * }) + * + * await host.start() + * ``` + */ +export class ServiceHost { + events: EventBus + + private signaler: RondevuSignaler | null = null + private webrtcContext: WebRTCContext + private connections: RTCDurableConnection[] = [] + private maxPeers: number + private running = false + + constructor(private options: ServiceHostOptions) { + this.events = new EventBus() + this.webrtcContext = new WebRTCContext(options.rtcConfiguration) + this.maxPeers = options.maxPeers || 5 + } + + /** + * Start hosting the service + */ + async start(): Promise { + if (this.running) { + throw new Error('ServiceHost already running') + } + + this.running = true + + // Create signaler + this.signaler = new RondevuSignaler( + this.options.rondevuService, + this.options.service + ) + + // Create first connection (offerer) + const connection = new RTCDurableConnection({ + context: this.webrtcContext, + signaler: this.signaler, + offer: null // null means we're the offerer + }) + + // Wait for connection to be ready + await connection.ready + + // Set up connection event listeners + connection.events.on('state-change', (state) => { + if (state === 'connected') { + this.connections.push(connection) + this.events.emit('connection', connection) + + // Create next connection if under maxPeers + if (this.connections.length < this.maxPeers) { + this.createNextConnection().catch(err => { + console.error('Failed to create next connection:', err) + this.events.emit('error', err) + }) + } + } else if (state === 'disconnected') { + // Remove from connections list + const index = this.connections.indexOf(connection) + if (index > -1) { + this.connections.splice(index, 1) + } + } + }) + + // Publish service with the offer + const offer = connection.connection?.localDescription + if (!offer?.sdp) { + throw new Error('Offer SDP is empty') + } + + await this.signaler.setOffer(offer) + } + + /** + * Create the next connection for incoming peers + */ + private async createNextConnection(): Promise { + if (!this.signaler || !this.running) { + return + } + + // For now, we'll use the same offer for all connections + // In a production scenario, you'd create multiple offers + // This is a limitation of the current service model + // which publishes one offer per service + } + + /** + * Stop hosting the service + */ + dispose(): void { + this.running = false + + // Cleanup signaler + if (this.signaler) { + this.signaler.dispose() + this.signaler = null + } + + // Disconnect all connections + for (const conn of this.connections) { + conn.disconnect() + } + this.connections = [] + } + + /** + * Get all active connections + */ + getConnections(): RTCDurableConnection[] { + return [...this.connections] + } +} diff --git a/src/signaler.ts b/src/signaler.ts deleted file mode 100644 index 0559673..0000000 --- a/src/signaler.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { Signaler } from './types.js' -import { Binnable } from './bin.js' -import { RondevuAPI } from './api.js' - -/** - * RondevuSignaler - Handles ICE candidate exchange via Rondevu API - * Uses polling to retrieve remote candidates - */ -export class RondevuSignaler implements Signaler { - constructor( - private api: RondevuAPI, - private offerId: string - ) {} - - addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable { - throw new Error('Method not implemented.') - } - addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable { - throw new Error('Method not implemented.') - } - setOffer(offer: RTCSessionDescriptionInit): Promise { - throw new Error('Method not implemented.') - } - setAnswer(answer: RTCSessionDescriptionInit): Promise { - throw new Error('Method not implemented.') - } - - /** - * Send a local ICE candidate to signaling server - */ - async addIceCandidate(candidate: RTCIceCandidate): Promise { - const candidateData = candidate.toJSON() - - // Skip empty candidates - if (!candidateData.candidate || candidateData.candidate === '') { - return - } - - await this.api.addIceCandidates(this.offerId, [candidateData]) - } - - /** - * Poll for remote ICE candidates and call callback for each one - * Returns cleanup function to stop polling - */ - addListener(callback: (candidate: RTCIceCandidate) => void): Binnable { - let lastTimestamp = 0 - let polling = true - - const poll = async () => { - while (polling) { - try { - const candidates = await this.api.getIceCandidates(this.offerId, lastTimestamp) - - // Process each candidate - for (const item of candidates) { - if ( - item.candidate && - item.candidate.candidate && - item.candidate.candidate !== '' - ) { - try { - const rtcCandidate = new RTCIceCandidate(item.candidate) - callback(rtcCandidate) - lastTimestamp = item.createdAt - } catch (err) { - console.warn('Failed to process ICE candidate:', err) - lastTimestamp = item.createdAt - } - } else { - lastTimestamp = item.createdAt - } - } - } catch (err) { - // If offer not found or expired, stop polling - if ( - err instanceof Error && - (err.message.includes('404') || err.message.includes('410')) - ) { - console.warn('Offer not found or expired, stopping ICE polling') - polling = false - break - } - console.error('Error polling for ICE candidates:', err) - } - - // Poll every second - if (polling) { - await new Promise(resolve => setTimeout(resolve, 1000)) - } - } - } - - // Start polling in the background - poll().then(() => { - console.log('ICE polling started') - }) - - // Return cleanup function - return () => { - polling = false - } - } -} diff --git a/src/types.ts b/src/types.ts index 2691992..c9038f6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -15,14 +15,16 @@ export interface ConnectionEvents { message: Message } -export const ConnectionStates = ['connected', 'disconnected', 'connecting'] as const +export const ConnectionStates = [ + 'connected', + 'disconnected', + 'connecting' +] as const export const isConnectionState = (state: string): state is (typeof ConnectionStates)[number] => ConnectionStates.includes(state as any) export interface ConnectionInterface { - id: string - service: string state: (typeof ConnectionStates)[number] lastActive: number expiresAt?: number @@ -33,7 +35,7 @@ export interface ConnectionInterface { } export interface Signaler { - addIceCandidate(candidate: RTCIceCandidate): Promise | void + addIceCandidate(candidate: RTCIceCandidate): Promise addListener(callback: (candidate: RTCIceCandidate) => void): Binnable addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable diff --git a/src/webrtc-context.ts b/src/webrtc-context.ts index b585646..b24a6ba 100644 --- a/src/webrtc-context.ts +++ b/src/webrtc-context.ts @@ -30,7 +30,6 @@ const DEFAULT_RTC_CONFIGURATION: RTCConfiguration = { export class WebRTCContext { constructor( - public readonly signaler: Signaler, private readonly config?: RTCConfiguration ) {}