feat: update client to use service-based signaling endpoints

BREAKING CHANGE: Client API now uses service UUIDs for WebRTC signaling

- Replace answerOffer() with answerService()
- Replace getAnswer() with getServiceAnswer()
- Replace addIceCandidates() with addServiceIceCandidates()
- Replace getIceCandidates() with getServiceIceCandidates()
- Update RondevuSignaler to use service UUID instead of offer ID for signaling
- Automatically track offerId returned from service endpoints
- Bump version to 0.12.0

Matches server v0.4.0 service-based API refactor.
This commit is contained in:
2025-12-07 22:17:36 +01:00
parent d06b2166c1
commit 177ee2ec2d
13 changed files with 1019 additions and 205 deletions

4
package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "@xtr-dev/rondevu-client",
"version": "0.9.2",
"version": "0.12.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@xtr-dev/rondevu-client",
"version": "0.9.2",
"version": "0.12.0",
"license": "MIT",
"dependencies": {
"@noble/ed25519": "^3.0.0",

View File

@@ -1,6 +1,6 @@
{
"name": "@xtr-dev/rondevu-client",
"version": "0.11.0",
"version": "0.12.0",
"description": "TypeScript client for Rondevu with durable WebRTC connections, automatic reconnection, and message queuing",
"type": "module",
"main": "dist/index.js",

View File

@@ -37,10 +37,14 @@ export interface Offer {
answererPeerId?: string
}
export interface OfferRequest {
sdp: string
}
export interface ServiceRequest {
username: string
serviceFqn: string
sdp: string
offers: OfferRequest[]
ttl?: number
isPublic?: boolean
metadata?: Record<string, any>
@@ -48,10 +52,17 @@ export interface ServiceRequest {
message: string
}
export interface ServiceOffer {
offerId: string
sdp: string
createdAt: number
expiresAt: number
}
export interface Service {
serviceId: string
uuid: string
offerId: string
offers: ServiceOffer[]
username: string
serviceFqn: string
isPublic: boolean
@@ -90,6 +101,13 @@ export class RondevuAPI {
private credentials?: Credentials
) {}
/**
* Set credentials for authentication
*/
setCredentials(credentials: Credentials): void {
this.credentials = credentials
}
/**
* Authentication header
*/
@@ -210,42 +228,45 @@ export class RondevuAPI {
}
/**
* Answer an offer
* Answer a service
*/
async answerOffer(offerId: string, sdp: string, secret?: string): Promise<void> {
const response = await fetch(`${this.baseUrl}/offers/${offerId}/answer`, {
async answerService(serviceUuid: string, sdp: string): Promise<{ offerId: string }> {
const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/answer`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...this.getAuthHeader(),
},
body: JSON.stringify({ sdp, secret }),
body: JSON.stringify({ sdp }),
})
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to answer offer: ${error.error || response.statusText}`)
throw new Error(`Failed to answer service: ${error.error || response.statusText}`)
}
return await response.json()
}
/**
* Get answer for an offer (offerer polls this)
* Get answer for a service (offerer polls this)
*/
async getAnswer(offerId: string): Promise<{ sdp: string } | null> {
const response = await fetch(`${this.baseUrl}/offers/${offerId}/answer`, {
async getServiceAnswer(serviceUuid: string): Promise<{ sdp: string; offerId: string } | null> {
const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/answer`, {
headers: this.getAuthHeader(),
})
if (response.status === 404) {
return null // No answer yet
}
if (!response.ok) {
// 404 means not yet answered
if (response.status === 404) {
return null
}
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to get answer: ${error.error || response.statusText}`)
}
return await response.json()
const data = await response.json()
return { sdp: data.sdp, offerId: data.offerId }
}
/**
@@ -269,39 +290,48 @@ export class RondevuAPI {
// ============================================
/**
* Add ICE candidates to an offer
* Add ICE candidates to a service
*/
async addIceCandidates(offerId: string, candidates: RTCIceCandidateInit[]): Promise<void> {
const response = await fetch(`${this.baseUrl}/offers/${offerId}/ice-candidates`, {
async addServiceIceCandidates(serviceUuid: string, candidates: RTCIceCandidateInit[], offerId?: string): Promise<{ offerId: string }> {
const response = await fetch(`${this.baseUrl}/services/${serviceUuid}/ice-candidates`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...this.getAuthHeader(),
},
body: JSON.stringify({ candidates }),
body: JSON.stringify({ candidates, offerId }),
})
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to add ICE candidates: ${error.error || response.statusText}`)
}
return await response.json()
}
/**
* Get ICE candidates for an offer (with polling support)
* Get ICE candidates for a service (with polling support)
*/
async getIceCandidates(offerId: string, since: number = 0): Promise<IceCandidate[]> {
const response = await fetch(
`${this.baseUrl}/offers/${offerId}/ice-candidates?since=${since}`,
{ headers: this.getAuthHeader() }
)
async getServiceIceCandidates(serviceUuid: string, since: number = 0, offerId?: string): Promise<{ candidates: IceCandidate[]; offerId: string }> {
const url = new URL(`${this.baseUrl}/services/${serviceUuid}/ice-candidates`)
url.searchParams.set('since', since.toString())
if (offerId) {
url.searchParams.set('offerId', offerId)
}
const response = await fetch(url.toString(), { headers: this.getAuthHeader() })
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to get ICE candidates: ${error.error || response.statusText}`)
}
return await response.json()
const data = await response.json()
return {
candidates: data.candidates || [],
offerId: data.offerId
}
}
// ============================================
@@ -312,7 +342,7 @@ export class RondevuAPI {
* Publish a service
*/
async publishService(service: ServiceRequest): Promise<Service> {
const response = await fetch(`${this.baseUrl}/services`, {
const response = await fetch(`${this.baseUrl}/users/${encodeURIComponent(service.username)}/services`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -346,11 +376,11 @@ export class RondevuAPI {
}
/**
* Search services by username
* Search services by username - lists all services for a username
*/
async searchServicesByUsername(username: string): Promise<Service[]> {
const response = await fetch(
`${this.baseUrl}/services?username=${encodeURIComponent(username)}`,
`${this.baseUrl}/users/${encodeURIComponent(username)}/services`,
{ headers: this.getAuthHeader() }
)
@@ -359,41 +389,29 @@ export class RondevuAPI {
throw new Error(`Failed to search services: ${error.error || response.statusText}`)
}
return await response.json()
const data = await response.json()
return data.services || []
}
/**
* Search services by FQN
*/
async searchServicesByFqn(serviceFqn: string): Promise<Service[]> {
const response = await fetch(
`${this.baseUrl}/services?serviceFqn=${encodeURIComponent(serviceFqn)}`,
{ headers: this.getAuthHeader() }
)
if (!response.ok) {
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to search services: ${error.error || response.statusText}`)
}
return await response.json()
}
/**
* Search services by username AND FQN
* Search services by username AND FQN - returns full service details
*/
async searchServices(username: string, serviceFqn: string): Promise<Service[]> {
const response = await fetch(
`${this.baseUrl}/services?username=${encodeURIComponent(username)}&serviceFqn=${encodeURIComponent(serviceFqn)}`,
`${this.baseUrl}/users/${encodeURIComponent(username)}/services/${encodeURIComponent(serviceFqn)}`,
{ headers: this.getAuthHeader() }
)
if (!response.ok) {
if (response.status === 404) {
return []
}
const error = await response.json().catch(() => ({ error: 'Unknown error' }))
throw new Error(`Failed to search services: ${error.error || response.statusText}`)
}
return await response.json()
const service = await response.json()
return [service]
}
// ============================================
@@ -405,7 +423,7 @@ export class RondevuAPI {
*/
async checkUsername(username: string): Promise<{ available: boolean; owner?: string }> {
const response = await fetch(
`${this.baseUrl}/usernames/${encodeURIComponent(username)}/check`
`${this.baseUrl}/users/${encodeURIComponent(username)}`
)
if (!response.ok) {
@@ -425,7 +443,7 @@ export class RondevuAPI {
signature: string,
message: string
): Promise<{ success: boolean; username: string }> {
const response = await fetch(`${this.baseUrl}/usernames/${encodeURIComponent(username)}`, {
const response = await fetch(`${this.baseUrl}/users/${encodeURIComponent(username)}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View File

@@ -12,10 +12,9 @@ import { createBin } from './bin.js'
import { WebRTCContext } from './webrtc-context'
export type WebRTCRondevuConnectionOptions = {
id: string
service: string
offer: RTCSessionDescriptionInit | null
offer?: RTCSessionDescriptionInit | null
context: WebRTCContext
signaler: Signaler
}
/**
@@ -63,41 +62,39 @@ export type WebRTCRondevuConnectionOptions = {
* });
* ```
*/
export class WebRTCRondevuConnection implements ConnectionInterface {
export class RTCDurableConnection implements ConnectionInterface {
private readonly side: 'offer' | 'answer'
public readonly expiresAt: number = 0
public readonly lastActive: number = 0
public readonly events: EventBus<ConnectionEvents> = new EventBus()
public readonly ready: Promise<void>
private iceBin = createBin()
private ctx: WebRTCContext
private context: WebRTCContext
private readonly signaler: Signaler
private _conn: RTCPeerConnection | null = null
private _state: ConnectionInterface['state'] = 'disconnected'
private _dataChannel: RTCDataChannel | null = null
private messageQueue: Array<{
message: Message
options: QueueMessageOptions
timestamp: number
}> = []
constructor({ context: ctx, offer }: WebRTCRondevuConnectionOptions) {
this.ctx = ctx
this._conn = ctx.createPeerConnection()
constructor({ context, offer, signaler }: WebRTCRondevuConnectionOptions) {
this.context = context
this.signaler = signaler
this._conn = context.createPeerConnection()
this.side = offer ? 'answer' : 'offer'
// setup data channel
if (offer) {
this._conn.addEventListener('datachannel', e => {
const channel = e.channel
channel.addEventListener('message', e => {
console.log('Message from peer:', e)
})
channel.addEventListener('open', () => {
channel.send('I am ' + this.side)
})
this._dataChannel = e.channel
this.setupDataChannelListeners(this._dataChannel)
})
} else {
const channel = this._conn.createDataChannel('vu.ronde.protocol')
channel.addEventListener('message', e => {
console.log('Message from peer:', e)
})
channel.addEventListener('open', () => {
channel.send('I am ' + this.side)
})
this._dataChannel = this._conn.createDataChannel('vu.ronde.protocol')
this.setupDataChannelListeners(this._dataChannel)
}
// setup description exchange
@@ -108,12 +105,12 @@ export class WebRTCRondevuConnection implements ConnectionInterface {
.then(async answer => {
if (!answer || !this._conn) throw new Error('Connection disappeared')
await this._conn.setLocalDescription(answer)
return await ctx.signaler.setAnswer(answer)
return await signaler.setAnswer(answer)
})
: this._conn.createOffer().then(async offer => {
if (!this._conn) throw new Error('Connection disappeared')
await this._conn.setLocalDescription(offer)
return await ctx.signaler.setOffer(offer)
return await signaler.setOffer(offer)
})
// propagate connection state changes
@@ -161,12 +158,12 @@ export class WebRTCRondevuConnection implements ConnectionInterface {
*/
private startIce() {
const listener = ({ candidate }: { candidate: RTCIceCandidate | null }) => {
if (candidate) this.ctx.signaler.addIceCandidate(candidate)
if (candidate) this.signaler.addIceCandidate(candidate)
}
if (!this._conn) throw new Error('Connection disappeared')
this._conn.addEventListener('icecandidate', listener)
this.iceBin(
this.ctx.signaler.addListener((candidate: RTCIceCandidate) =>
this.signaler.addListener((candidate: RTCIceCandidate) =>
this._conn?.addIceCandidate(candidate)
),
() => this._conn?.removeEventListener('icecandidate', listener)
@@ -201,15 +198,68 @@ export class WebRTCRondevuConnection implements ConnectionInterface {
return this._state
}
/**
* Setup data channel event listeners
*/
private setupDataChannelListeners(channel: RTCDataChannel): void {
channel.addEventListener('message', e => {
this.events.emit('message', e.data)
})
channel.addEventListener('open', () => {
// Channel opened - flush queued messages
this.flushQueue().catch(err => {
console.error('Failed to flush message queue:', err)
})
})
channel.addEventListener('error', err => {
console.error('Data channel error:', err)
})
channel.addEventListener('close', () => {
console.log('Data channel closed')
})
}
/**
* Flush the message queue
*/
private async flushQueue(): Promise<void> {
while (this.messageQueue.length > 0 && this._state === 'connected') {
const item = this.messageQueue.shift()!
// Check expiration
if (item.options.expiresAt && Date.now() > item.options.expiresAt) {
continue
}
const success = await this.sendMessage(item.message)
if (!success) {
// Re-queue on failure
this.messageQueue.unshift(item)
break
}
}
}
/**
* Queue a message for sending when connection is established
*
* @param message - Message to queue (string or ArrayBuffer)
* @param options - Queue options (e.g., expiration time)
*/
queueMessage(message: Message, options: QueueMessageOptions = {}): Promise<void> {
// TODO: Implement message queuing
return Promise.resolve(undefined)
async queueMessage(message: Message, options: QueueMessageOptions = {}): Promise<void> {
this.messageQueue.push({
message,
options,
timestamp: Date.now()
})
// Try immediate send if connected
if (this._state === 'connected') {
await this.flushQueue()
}
}
/**
@@ -218,8 +268,23 @@ export class WebRTCRondevuConnection implements ConnectionInterface {
* @param message - Message to send (string or ArrayBuffer)
* @returns Promise resolving to true if sent successfully
*/
sendMessage(message: Message): Promise<boolean> {
// TODO: Implement message sending via data channel
return Promise.resolve(false)
async sendMessage(message: Message): Promise<boolean> {
if (this._state !== 'connected' || !this._dataChannel) {
return false
}
if (this._dataChannel.readyState !== 'open') {
return false
}
try {
// TypeScript has trouble with the union type, so we cast to any
// Both string and ArrayBuffer are valid for RTCDataChannel.send()
this._dataChannel.send(message as any)
return true
} catch (err) {
console.error('Send failed:', err)
return false
}
}
}

View File

@@ -6,10 +6,11 @@
export { EventBus } from './event-bus.js'
export { RondevuAPI } from './api.js'
export { RondevuService } from './rondevu-service.js'
export { RondevuSignaler } from './signaler.js'
export { RondevuSignaler } from './rondevu-signaler.js'
export { WebRTCContext } from './webrtc-context.js'
export { RTCDurableConnection } from './durable-connection'
export { ServiceHost } from './service-host.js'
export { ServiceClient } from './service-client.js'
export { WebRTCRondevuConnection } from './connection.js'
export { createBin } from './bin.js'
// Export types
@@ -38,3 +39,6 @@ export type { RondevuServiceOptions, PublishServiceOptions } from './rondevu-ser
export type { ServiceHostOptions, ServiceHostEvents } from './service-host.js'
export type { ServiceClientOptions, ServiceClientEvents } from './service-client.js'
export type { PollingConfig } from './rondevu-signaler.js'

View File

View File

@@ -9,7 +9,7 @@ export interface RondevuServiceOptions {
export interface PublishServiceOptions {
serviceFqn: string
sdp: string
offers: Array<{ sdp: string }>
ttl?: number
isPublic?: boolean
metadata?: Record<string, any>
@@ -39,7 +39,7 @@ export interface PublishServiceOptions {
* // Publish a service
* const publishedService = await service.publishService({
* serviceFqn: 'chat.app@1.0.0',
* sdp: offerSdp,
* offers: [{ sdp: offerSdp }],
* ttl: 300000,
* isPublic: true,
* })
@@ -69,7 +69,7 @@ export class RondevuService {
// Register with API if no credentials provided
if (!this.api['credentials']) {
const credentials = await this.api.register()
;(this.api as any).credentials = credentials
this.api.setCredentials(credentials)
}
}
@@ -94,7 +94,7 @@ export class RondevuService {
}
// Generate signature for username claim
const message = `claim-username-${this.username}-${Date.now()}`
const message = `claim:${this.username}:${Date.now()}`
const signature = await RondevuAPI.signMessage(message, this.keypair.privateKey)
// Claim the username
@@ -116,17 +116,17 @@ export class RondevuService {
)
}
const { serviceFqn, sdp, ttl, isPublic, metadata } = options
const { serviceFqn, offers, ttl, isPublic, metadata } = options
// Generate signature for service publication
const message = `publish-${this.username}-${serviceFqn}-${Date.now()}`
const message = `publish:${this.username}:${serviceFqn}:${Date.now()}`
const signature = await RondevuAPI.signMessage(message, this.keypair.privateKey)
// Create service request
const serviceRequest: ServiceRequest = {
username: this.username,
serviceFqn,
sdp,
offers,
signature,
message,
ttl,
@@ -145,6 +145,13 @@ export class RondevuService {
return this.keypair
}
/**
* Get the username
*/
getUsername(): string {
return this.username
}
/**
* Get the public key
*/

View File

@@ -0,0 +1,462 @@
import { Signaler } from './types.js'
import { RondevuService } from './rondevu-service.js'
import { Binnable } from './bin.js'
export interface PollingConfig {
initialInterval?: number // Default: 500ms
maxInterval?: number // Default: 5000ms
backoffMultiplier?: number // Default: 1.5
maxRetries?: number // Default: 50 (50 seconds max)
jitter?: boolean // Default: true
}
/**
* RondevuSignaler - Handles WebRTC signaling via Rondevu service
*
* Manages offer/answer exchange and ICE candidate polling for establishing
* WebRTC connections through the Rondevu signaling server.
*
* Supports configurable polling with exponential backoff and jitter to reduce
* server load and prevent thundering herd issues.
*
* @example
* ```typescript
* const signaler = new RondevuSignaler(
* rondevuService,
* 'chat.app@1.0.0',
* 'peer-username',
* { initialInterval: 500, maxInterval: 5000, jitter: true }
* )
*
* // For offerer:
* await signaler.setOffer(offer)
* signaler.addAnswerListener(answer => {
* // Handle remote answer
* })
*
* // For answerer:
* signaler.addOfferListener(offer => {
* // Handle remote offer
* })
* await signaler.setAnswer(answer)
* ```
*/
export class RondevuSignaler implements Signaler {
private offerId: string | null = null
private serviceUuid: string | null = null
private offerListeners: Array<(offer: RTCSessionDescriptionInit) => void> = []
private answerListeners: Array<(answer: RTCSessionDescriptionInit) => void> = []
private iceListeners: Array<(candidate: RTCIceCandidate) => void> = []
private answerPollingTimeout: ReturnType<typeof setTimeout> | null = null
private icePollingTimeout: ReturnType<typeof setTimeout> | null = null
private lastIceTimestamp = 0
private isPolling = false
private pollingConfig: Required<PollingConfig>
constructor(
private readonly rondevu: RondevuService,
private readonly service: string,
private readonly host?: string,
pollingConfig?: PollingConfig
) {
this.pollingConfig = {
initialInterval: pollingConfig?.initialInterval ?? 500,
maxInterval: pollingConfig?.maxInterval ?? 5000,
backoffMultiplier: pollingConfig?.backoffMultiplier ?? 1.5,
maxRetries: pollingConfig?.maxRetries ?? 50,
jitter: pollingConfig?.jitter ?? true
}
}
/**
* Publish an offer as a service
* Used by the offerer to make their offer available
*/
async setOffer(offer: RTCSessionDescriptionInit): Promise<void> {
if (!offer.sdp) {
throw new Error('Offer SDP is required')
}
// Publish service with the offer SDP
const publishedService = await this.rondevu.publishService({
serviceFqn: this.service,
offers: [{ sdp: offer.sdp }],
ttl: 300000, // 5 minutes
isPublic: true,
})
// Get the first offer from the published service
if (!publishedService.offers || publishedService.offers.length === 0) {
throw new Error('No offers returned from service publication')
}
this.offerId = publishedService.offers[0].offerId
this.serviceUuid = publishedService.uuid
// Start polling for answer
this.startAnswerPolling()
// Start polling for ICE candidates
this.startIcePolling()
}
/**
* Send an answer to the offerer
* Used by the answerer to respond to an offer
*/
async setAnswer(answer: RTCSessionDescriptionInit): Promise<void> {
if (!answer.sdp) {
throw new Error('Answer SDP is required')
}
if (!this.serviceUuid) {
throw new Error('No service UUID available. Must receive offer first.')
}
// Send answer to the service
const result = await this.rondevu.getAPI().answerService(this.serviceUuid, answer.sdp)
this.offerId = result.offerId
// Start polling for ICE candidates
this.startIcePolling()
}
/**
* Listen for incoming offers
* Used by the answerer to receive offers from the offerer
*/
addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable {
this.offerListeners.push(callback)
// If we have a host, start searching for their service
if (this.host && !this.isPolling) {
this.searchForOffer()
}
// Return cleanup function
return () => {
const index = this.offerListeners.indexOf(callback)
if (index > -1) {
this.offerListeners.splice(index, 1)
}
}
}
/**
* Listen for incoming answers
* Used by the offerer to receive the answer from the answerer
*/
addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable {
this.answerListeners.push(callback)
// Return cleanup function
return () => {
const index = this.answerListeners.indexOf(callback)
if (index > -1) {
this.answerListeners.splice(index, 1)
}
}
}
/**
* Send an ICE candidate to the remote peer
*/
async addIceCandidate(candidate: RTCIceCandidate): Promise<void> {
if (!this.serviceUuid) {
console.warn('Cannot send ICE candidate: no service UUID')
return
}
const candidateData = candidate.toJSON()
// Skip empty candidates
if (!candidateData.candidate || candidateData.candidate === '') {
return
}
try {
const result = await this.rondevu.getAPI().addServiceIceCandidates(
this.serviceUuid,
[candidateData],
this.offerId || undefined
)
// Store offerId if we didn't have it yet
if (!this.offerId) {
this.offerId = result.offerId
}
} catch (err) {
console.error('Failed to send ICE candidate:', err)
}
}
/**
* Listen for ICE candidates from the remote peer
*/
addListener(callback: (candidate: RTCIceCandidate) => void): Binnable {
this.iceListeners.push(callback)
// Return cleanup function
return () => {
const index = this.iceListeners.indexOf(callback)
if (index > -1) {
this.iceListeners.splice(index, 1)
}
}
}
/**
* Search for an offer from the host
* Used by the answerer to find the offerer's service
*/
private async searchForOffer(): Promise<void> {
if (!this.host) {
throw new Error('No host specified for offer search')
}
this.isPolling = true
try {
// Search for services by username and service FQN
const services = await this.rondevu.getAPI().searchServices(this.host, this.service)
if (services.length === 0) {
console.warn(`No services found for ${this.host}/${this.service}`)
this.isPolling = false
return
}
// Get the first available service (already has full details from searchServices)
const service = services[0] as any
// Get the first available offer from the service
if (!service.offers || service.offers.length === 0) {
console.warn(`No offers available for service ${this.host}/${this.service}`)
this.isPolling = false
return
}
const firstOffer = service.offers[0]
this.offerId = firstOffer.offerId
this.serviceUuid = service.uuid
// Notify offer listeners
const offer: RTCSessionDescriptionInit = {
type: 'offer',
sdp: firstOffer.sdp,
}
this.offerListeners.forEach(listener => {
try {
listener(offer)
} catch (err) {
console.error('Offer listener error:', err)
}
})
} catch (err) {
console.error('Failed to search for offer:', err)
this.isPolling = false
}
}
/**
* Start polling for answer (offerer side) with exponential backoff
*/
private startAnswerPolling(): void {
if (this.answerPollingTimeout || !this.serviceUuid) {
return
}
let interval = this.pollingConfig.initialInterval
let retries = 0
const poll = async () => {
if (!this.serviceUuid) {
this.stopAnswerPolling()
return
}
try {
const answer = await this.rondevu.getAPI().getServiceAnswer(this.serviceUuid)
if (answer && answer.sdp) {
// Store offerId if we didn't have it yet
if (!this.offerId) {
this.offerId = answer.offerId
}
// Got answer - notify listeners and stop polling
const answerDesc: RTCSessionDescriptionInit = {
type: 'answer',
sdp: answer.sdp,
}
this.answerListeners.forEach(listener => {
try {
listener(answerDesc)
} catch (err) {
console.error('Answer listener error:', err)
}
})
// Stop polling once we get the answer
this.stopAnswerPolling()
return
}
// No answer yet - exponential backoff
retries++
if (retries > this.pollingConfig.maxRetries) {
console.warn('Max retries reached for answer polling')
this.stopAnswerPolling()
return
}
interval = Math.min(
interval * this.pollingConfig.backoffMultiplier,
this.pollingConfig.maxInterval
)
// Add jitter to prevent thundering herd
const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100
: interval
this.answerPollingTimeout = setTimeout(poll, finalInterval)
} catch (err) {
// 404 is expected when answer isn't available yet
if (err instanceof Error && !err.message?.includes('404')) {
console.error('Error polling for answer:', err)
}
// Retry with backoff
const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100
: interval
this.answerPollingTimeout = setTimeout(poll, finalInterval)
}
}
poll() // Start immediately
}
/**
* Stop polling for answer
*/
private stopAnswerPolling(): void {
if (this.answerPollingTimeout) {
clearTimeout(this.answerPollingTimeout)
this.answerPollingTimeout = null
}
}
/**
* Start polling for ICE candidates with adaptive backoff
*/
private startIcePolling(): void {
if (this.icePollingTimeout || !this.serviceUuid) {
return
}
let interval = this.pollingConfig.initialInterval
const poll = async () => {
if (!this.serviceUuid) {
this.stopIcePolling()
return
}
try {
const result = await this.rondevu
.getAPI()
.getServiceIceCandidates(this.serviceUuid, this.lastIceTimestamp, this.offerId || undefined)
// Store offerId if we didn't have it yet
if (!this.offerId) {
this.offerId = result.offerId
}
let foundCandidates = false
for (const item of result.candidates) {
if (item.candidate && item.candidate.candidate && item.candidate.candidate !== '') {
foundCandidates = true
try {
const rtcCandidate = new RTCIceCandidate(item.candidate)
this.iceListeners.forEach(listener => {
try {
listener(rtcCandidate)
} catch (err) {
console.error('ICE listener error:', err)
}
})
this.lastIceTimestamp = item.createdAt
} catch (err) {
console.warn('Failed to process ICE candidate:', err)
this.lastIceTimestamp = item.createdAt
}
} else {
this.lastIceTimestamp = item.createdAt
}
}
// If candidates found, reset interval to initial value
// Otherwise, increase interval with backoff
if (foundCandidates) {
interval = this.pollingConfig.initialInterval
} else {
interval = Math.min(
interval * this.pollingConfig.backoffMultiplier,
this.pollingConfig.maxInterval
)
}
// Add jitter
const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100
: interval
this.icePollingTimeout = setTimeout(poll, finalInterval)
} catch (err) {
// 404/410 means offer expired, stop polling
if (err instanceof Error && (err.message?.includes('404') || err.message?.includes('410'))) {
console.warn('Offer not found or expired, stopping ICE polling')
this.stopIcePolling()
} else if (err instanceof Error && !err.message?.includes('404')) {
console.error('Error polling for ICE candidates:', err)
// Continue polling despite errors
const finalInterval = this.pollingConfig.jitter
? interval + Math.random() * 100
: interval
this.icePollingTimeout = setTimeout(poll, finalInterval)
}
}
}
poll() // Start immediately
}
/**
* Stop polling for ICE candidates
*/
private stopIcePolling(): void {
if (this.icePollingTimeout) {
clearTimeout(this.icePollingTimeout)
this.icePollingTimeout = null
}
}
/**
* Stop all polling and cleanup
*/
dispose(): void {
this.stopAnswerPolling()
this.stopIcePolling()
this.offerListeners = []
this.answerListeners = []
this.iceListeners = []
}
}

203
src/service-client.ts Normal file
View File

@@ -0,0 +1,203 @@
import { RondevuService } from './rondevu-service.js'
import { RondevuSignaler } from './rondevu-signaler.js'
import { WebRTCContext } from './webrtc-context.js'
import { RTCDurableConnection } from './durable-connection.js'
import { EventBus } from './event-bus.js'
export interface ServiceClientOptions {
username: string // Host username
serviceFqn: string // e.g., 'chat.app@1.0.0'
rondevuService: RondevuService
autoReconnect?: boolean // Default: true
maxReconnectAttempts?: number // Default: 5
rtcConfiguration?: RTCConfiguration
}
export interface ServiceClientEvents {
connected: RTCDurableConnection
disconnected: void
reconnecting: { attempt: number; maxAttempts: number }
error: Error
}
/**
* ServiceClient - High-level wrapper for connecting to a WebRTC service
*
* Simplifies client connection by handling:
* - Service discovery
* - Offer/answer exchange
* - ICE candidate polling
* - Automatic reconnection
*
* @example
* ```typescript
* const client = new ServiceClient({
* username: 'host-user',
* serviceFqn: 'chat.app@1.0.0',
* rondevuService: myService
* })
*
* client.events.on('connected', conn => {
* conn.events.on('message', msg => console.log('Received:', msg))
* conn.sendMessage('Hello from client!')
* })
*
* await client.connect()
* ```
*/
export class ServiceClient {
events: EventBus<ServiceClientEvents>
private signaler: RondevuSignaler | null = null
private webrtcContext: WebRTCContext
private connection: RTCDurableConnection | null = null
private autoReconnect: boolean
private maxReconnectAttempts: number
private reconnectAttempts = 0
private isConnecting = false
constructor(private options: ServiceClientOptions) {
this.events = new EventBus<ServiceClientEvents>()
this.webrtcContext = new WebRTCContext(options.rtcConfiguration)
this.autoReconnect = options.autoReconnect !== undefined ? options.autoReconnect : true
this.maxReconnectAttempts = options.maxReconnectAttempts || 5
}
/**
* Connect to the service
*/
async connect(): Promise<RTCDurableConnection> {
if (this.isConnecting) {
throw new Error('Connection already in progress')
}
if (this.connection) {
throw new Error('Already connected. Disconnect first.')
}
this.isConnecting = true
try {
// Create signaler
this.signaler = new RondevuSignaler(
this.options.rondevuService,
this.options.serviceFqn,
this.options.username
)
// Wait for remote offer from signaler
const remoteOffer = await new Promise<RTCSessionDescriptionInit>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Service discovery timeout'))
}, 30000)
this.signaler!.addOfferListener((offer) => {
clearTimeout(timeout)
resolve(offer)
})
})
// Create connection with remote offer (makes us the answerer)
const connection = new RTCDurableConnection({
context: this.webrtcContext,
signaler: this.signaler,
offer: remoteOffer
})
// Wait for connection to be ready
await connection.ready
// Set up connection event listeners
connection.events.on('state-change', (state) => {
if (state === 'connected') {
this.reconnectAttempts = 0
this.events.emit('connected', connection)
} else if (state === 'disconnected') {
this.events.emit('disconnected', undefined)
if (this.autoReconnect && this.reconnectAttempts < this.maxReconnectAttempts) {
this.attemptReconnect()
}
}
})
this.connection = connection
this.isConnecting = false
return connection
} catch (err) {
this.isConnecting = false
const error = err instanceof Error ? err : new Error(String(err))
this.events.emit('error', error)
throw error
}
}
/**
* Disconnect from the service
*/
dispose(): void {
if (this.signaler) {
this.signaler.dispose()
this.signaler = null
}
if (this.connection) {
this.connection.disconnect()
this.connection = null
}
this.isConnecting = false
this.reconnectAttempts = 0
}
/**
* @deprecated Use dispose() instead
*/
disconnect(): void {
this.dispose()
}
/**
* Attempt to reconnect
*/
private async attemptReconnect(): Promise<void> {
this.reconnectAttempts++
this.events.emit('reconnecting', {
attempt: this.reconnectAttempts,
maxAttempts: this.maxReconnectAttempts
})
// Cleanup old connection
if (this.signaler) {
this.signaler.dispose()
this.signaler = null
}
if (this.connection) {
this.connection = null
}
// Wait a bit before reconnecting
await new Promise(resolve => setTimeout(resolve, 1000 * this.reconnectAttempts))
try {
await this.connect()
} catch (err) {
console.error('Reconnection attempt failed:', err)
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.attemptReconnect()
} else {
const error = new Error('Max reconnection attempts reached')
this.events.emit('error', error)
}
}
}
/**
* Get the current connection
*/
getConnection(): RTCDurableConnection | null {
return this.connection
}
}

158
src/service-host.ts Normal file
View File

@@ -0,0 +1,158 @@
import { RondevuService } from './rondevu-service.js'
import { RondevuSignaler } from './rondevu-signaler.js'
import { WebRTCContext } from './webrtc-context.js'
import { RTCDurableConnection } from './durable-connection.js'
import { EventBus } from './event-bus.js'
export interface ServiceHostOptions {
service: string // e.g., 'chat.app@1.0.0'
rondevuService: RondevuService
maxPeers?: number // Default: 5
ttl?: number // Default: 300000 (5 min)
isPublic?: boolean // Default: true
rtcConfiguration?: RTCConfiguration
metadata?: Record<string, any>
}
export interface ServiceHostEvents {
connection: RTCDurableConnection
error: Error
}
/**
* ServiceHost - High-level wrapper for hosting a WebRTC service
*
* Simplifies hosting by handling:
* - Offer/answer exchange
* - ICE candidate polling
* - Connection pool management
* - Automatic reconnection
*
* @example
* ```typescript
* const host = new ServiceHost({
* service: 'chat.app@1.0.0',
* rondevuService: myService,
* maxPeers: 5
* })
*
* host.events.on('connection', conn => {
* conn.events.on('message', msg => console.log('Received:', msg))
* conn.sendMessage('Hello!')
* })
*
* await host.start()
* ```
*/
export class ServiceHost {
events: EventBus<ServiceHostEvents>
private signaler: RondevuSignaler | null = null
private webrtcContext: WebRTCContext
private connections: RTCDurableConnection[] = []
private maxPeers: number
private running = false
constructor(private options: ServiceHostOptions) {
this.events = new EventBus<ServiceHostEvents>()
this.webrtcContext = new WebRTCContext(options.rtcConfiguration)
this.maxPeers = options.maxPeers || 5
}
/**
* Start hosting the service
*/
async start(): Promise<void> {
if (this.running) {
throw new Error('ServiceHost already running')
}
this.running = true
// Create signaler
this.signaler = new RondevuSignaler(
this.options.rondevuService,
this.options.service
)
// Create first connection (offerer)
const connection = new RTCDurableConnection({
context: this.webrtcContext,
signaler: this.signaler,
offer: null // null means we're the offerer
})
// Wait for connection to be ready
await connection.ready
// Set up connection event listeners
connection.events.on('state-change', (state) => {
if (state === 'connected') {
this.connections.push(connection)
this.events.emit('connection', connection)
// Create next connection if under maxPeers
if (this.connections.length < this.maxPeers) {
this.createNextConnection().catch(err => {
console.error('Failed to create next connection:', err)
this.events.emit('error', err)
})
}
} else if (state === 'disconnected') {
// Remove from connections list
const index = this.connections.indexOf(connection)
if (index > -1) {
this.connections.splice(index, 1)
}
}
})
// Publish service with the offer
const offer = connection.connection?.localDescription
if (!offer?.sdp) {
throw new Error('Offer SDP is empty')
}
await this.signaler.setOffer(offer)
}
/**
* Create the next connection for incoming peers
*/
private async createNextConnection(): Promise<void> {
if (!this.signaler || !this.running) {
return
}
// For now, we'll use the same offer for all connections
// In a production scenario, you'd create multiple offers
// This is a limitation of the current service model
// which publishes one offer per service
}
/**
* Stop hosting the service
*/
dispose(): void {
this.running = false
// Cleanup signaler
if (this.signaler) {
this.signaler.dispose()
this.signaler = null
}
// Disconnect all connections
for (const conn of this.connections) {
conn.disconnect()
}
this.connections = []
}
/**
* Get all active connections
*/
getConnections(): RTCDurableConnection[] {
return [...this.connections]
}
}

View File

@@ -1,104 +0,0 @@
import { Signaler } from './types.js'
import { Binnable } from './bin.js'
import { RondevuAPI } from './api.js'
/**
* RondevuSignaler - Handles ICE candidate exchange via Rondevu API
* Uses polling to retrieve remote candidates
*/
export class RondevuSignaler implements Signaler {
constructor(
private api: RondevuAPI,
private offerId: string
) {}
addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable {
throw new Error('Method not implemented.')
}
addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable {
throw new Error('Method not implemented.')
}
setOffer(offer: RTCSessionDescriptionInit): Promise<void> {
throw new Error('Method not implemented.')
}
setAnswer(answer: RTCSessionDescriptionInit): Promise<void> {
throw new Error('Method not implemented.')
}
/**
* Send a local ICE candidate to signaling server
*/
async addIceCandidate(candidate: RTCIceCandidate): Promise<void> {
const candidateData = candidate.toJSON()
// Skip empty candidates
if (!candidateData.candidate || candidateData.candidate === '') {
return
}
await this.api.addIceCandidates(this.offerId, [candidateData])
}
/**
* Poll for remote ICE candidates and call callback for each one
* Returns cleanup function to stop polling
*/
addListener(callback: (candidate: RTCIceCandidate) => void): Binnable {
let lastTimestamp = 0
let polling = true
const poll = async () => {
while (polling) {
try {
const candidates = await this.api.getIceCandidates(this.offerId, lastTimestamp)
// Process each candidate
for (const item of candidates) {
if (
item.candidate &&
item.candidate.candidate &&
item.candidate.candidate !== ''
) {
try {
const rtcCandidate = new RTCIceCandidate(item.candidate)
callback(rtcCandidate)
lastTimestamp = item.createdAt
} catch (err) {
console.warn('Failed to process ICE candidate:', err)
lastTimestamp = item.createdAt
}
} else {
lastTimestamp = item.createdAt
}
}
} catch (err) {
// If offer not found or expired, stop polling
if (
err instanceof Error &&
(err.message.includes('404') || err.message.includes('410'))
) {
console.warn('Offer not found or expired, stopping ICE polling')
polling = false
break
}
console.error('Error polling for ICE candidates:', err)
}
// Poll every second
if (polling) {
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
}
// Start polling in the background
poll().then(() => {
console.log('ICE polling started')
})
// Return cleanup function
return () => {
polling = false
}
}
}

View File

@@ -15,14 +15,16 @@ export interface ConnectionEvents {
message: Message
}
export const ConnectionStates = ['connected', 'disconnected', 'connecting'] as const
export const ConnectionStates = [
'connected',
'disconnected',
'connecting'
] as const
export const isConnectionState = (state: string): state is (typeof ConnectionStates)[number] =>
ConnectionStates.includes(state as any)
export interface ConnectionInterface {
id: string
service: string
state: (typeof ConnectionStates)[number]
lastActive: number
expiresAt?: number
@@ -33,7 +35,7 @@ export interface ConnectionInterface {
}
export interface Signaler {
addIceCandidate(candidate: RTCIceCandidate): Promise<void> | void
addIceCandidate(candidate: RTCIceCandidate): Promise<void>
addListener(callback: (candidate: RTCIceCandidate) => void): Binnable
addOfferListener(callback: (offer: RTCSessionDescriptionInit) => void): Binnable
addAnswerListener(callback: (answer: RTCSessionDescriptionInit) => void): Binnable

View File

@@ -30,7 +30,6 @@ const DEFAULT_RTC_CONFIGURATION: RTCConfiguration = {
export class WebRTCContext {
constructor(
public readonly signaler: Signaler,
private readonly config?: RTCConfiguration
) {}