Edit

@telefunc/rxjs

BetaTelefunc Stream is in beta: breaking changes may occur in any version update.

The @telefunc/rxjs integration lets you pass RxJS Observable and Subject instances directly between client and server — in both directions, and every RxJS operator works across the boundary.

Automatic runtime type validation: all values sent from the client to the server are validated against your TypeScript types at runtime (no need for Zod).

The @telefunc/rxjs integration is powered by Telefunc Stream.

Install

npm install @telefunc/rxjs rxjs
pnpm add @telefunc/rxjs rxjs
bun add @telefunc/rxjs rxjs
yarn add @telefunc/rxjs rxjs

That's it.

The Telefunc bundler plugin (Vite, webpack, Next.js, Babel) detects @telefunc/rxjs in your dependencies and registers it automatically. Without a Telefunc bundler plugin, register it manually — import '@telefunc/rxjs/server' in your server entry and import '@telefunc/rxjs/client' in your client entry.

Example: live stock ticker

The server pushes prices every second. The client filters and limits them locally with standard RxJS operators.

// StockPrice.telefunc.js
// Environment: server
 
import { interval, map } from 'rxjs'
 
export async function onStockPrice(symbol) {
  return interval(1000).pipe(
    map(() => ({ symbol, price: getLatestPrice(symbol), time: Date.now() }))
  )
}
// StockPrice.telefunc.ts
// Environment: server
 
import { interval, map } from 'rxjs'
 
export async function onStockPrice(symbol: string) {
  return interval(1000).pipe(
    map(() => ({ symbol, price: getLatestPrice(symbol), time: Date.now() }))
  )
}
// StockPrice.jsx
// Environment: client
 
import { filter, take } from 'rxjs'
 
const price$ = await onStockPrice('AAPL')
price$
  .pipe(
    filter((p) => p.price > 150),
    take(10)
  )
  .subscribe((p) => updateChart(p))
// StockPrice.tsx
// Environment: client
 
import { filter, take } from 'rxjs'
 
const price$ = await onStockPrice('AAPL')
price$.pipe(
  filter(p => p.price > 150),
  take(10)
).subscribe(p => updateChart(p))

Example: collaborative editor

Return a shared Subject to multiple clients and it multicasts among them: when one client emits with next(), every other client's subscribers receive the value through the server. The emitting client's own subscribers receive it locally too — the server doesn't echo a client's next() back to that same client.

// Editor.telefunc.js
// Environment: server
 
import { Subject } from 'rxjs'
 
const edits = new Subject()
 
export async function onJoinEditor() {
  return edits
}
// Editor.telefunc.ts
// Environment: server
 
import { Subject } from 'rxjs'
 
const edits = new Subject<{ userId: string; text: string }>()
 
export async function onJoinEditor() {
  return edits
}
// Editor.jsx
// Environment: client
 
const edits = await onJoinEditor()
 
edits.subscribe((edit) => applyEdit(edit))
edits.next({ userId: 'alice', text: 'Hello' })
// Editor.tsx
// Environment: client
 
const edits = await onJoinEditor()
 
edits.subscribe(edit => applyEdit(edit))
edits.next({ userId: 'alice', text: 'Hello' })

Single server. A module-level Subject lives in one server process. These multicast examples — the editor above and Live cursors below — work as-is on a single instance. Across multiple instances, each server process has its own Subject, so route shared state through a broadcast transport instead — see Stream at Scale.

Example: click heatmap (client → server)

Pass an Observable as a telefunction argument. The server subscribes and processes the stream — a pattern useful for telemetry, analytics, or any client-driven event stream.

// Heatmap.telefunc.js
// Environment: server
 
import { Observable, bufferTime, filter } from 'rxjs'
 
export async function onTrackClicks(clicks$) {
  clicks$
    .pipe(
      bufferTime(5000),
      filter((batch) => batch.length > 0)
    )
    .subscribe((batch) => saveHeatmapBatch(batch))
}
// Heatmap.telefunc.ts
// Environment: server
 
import { Observable, bufferTime, filter } from 'rxjs'
 
export async function onTrackClicks(clicks$: Observable<{ x: number; y: number }>) {
  clicks$.pipe(
    bufferTime(5000),
    filter(batch => batch.length > 0)
  ).subscribe(batch => saveHeatmapBatch(batch))
}
// Heatmap.jsx
// Environment: client
 
import { fromEvent, map } from 'rxjs'
 
const clicks$ = fromEvent(document, 'click').pipe(map((e) => ({ x: e.clientX, y: e.clientY })))
await onTrackClicks(clicks$)
// Heatmap.tsx
// Environment: client
 
import { fromEvent, map } from 'rxjs'
 
const clicks$ = fromEvent<MouseEvent>(document, 'click').pipe(
  map(e => ({ x: e.clientX, y: e.clientY }))
)
await onTrackClicks(clicks$)

Example: live cursors

A shared Subject multicasts cursor positions among all connected users. The server attaches the user identity from context.

// Whiteboard.telefunc.js
// Environment: server
 
import { Subject } from 'rxjs'
import { getContext } from 'telefunc'
 
const cursors = new Subject()
 
export async function onCursors() {
  const { user, onClose } = getContext()
  const input = new Subject()
  const sub = input.subscribe((pos) => cursors.next({ ...pos, userId: user.id }))
  onClose(() => sub.unsubscribe()) // release the per-client subscription on disconnect
  return { cursors, input }
}
// Whiteboard.telefunc.ts
// Environment: server
 
import { Subject } from 'rxjs'
import { getContext } from 'telefunc'
 
const cursors = new Subject<{ userId: string; x: number; y: number }>()
 
export async function onCursors() {
  const { user, onClose } = getContext()
  const input = new Subject<{ x: number; y: number }>()
  const sub = input.subscribe(pos => cursors.next({ ...pos, userId: user.id }))
  onClose(() => sub.unsubscribe()) // release the per-client subscription on disconnect
  return { cursors, input }
}
// Whiteboard.jsx
// Environment: client
 
const { cursors, input } = await onCursors()
 
cursors.subscribe((pos) => drawCursor(pos.userId, pos.x, pos.y))
 
document.addEventListener('mousemove', (e) => {
  input.next({ x: e.clientX, y: e.clientY })
})
// Whiteboard.tsx
// Environment: client
 
const { cursors, input } = await onCursors()
 
cursors.subscribe(pos => drawCursor(pos.userId, pos.x, pos.y))
 
document.addEventListener('mousemove', e => {
  input.next({ x: e.clientX, y: e.clientY })
})

Example: Angular

Angular uses RxJS extensively. With @telefunc/rxjs, telefunctions return Observables directly — pipe them into templates with | async, use them in services, or compose them with the rest of your RxJS code.

// Dashboard.telefunc.js
// Environment: server
 
import { interval, map } from 'rxjs'
 
export async function onMetrics() {
  return interval(1000).pipe(map(() => ({ cpu: getCpu(), memory: getMemory() })))
}
// Dashboard.telefunc.ts
// Environment: server
 
import { interval, map } from 'rxjs'
 
export async function onMetrics() {
  return interval(1000).pipe(
    map(() => ({ cpu: getCpu(), memory: getMemory() }))
  )
}

The | async pipe subscribes and auto-unsubscribes when the component is destroyed:

// dashboard.component.js
// Environment: client
 
export class DashboardComponent {
  async ngOnInit() {
    this.metrics$ = await onMetrics()
  }
}
// dashboard.component.ts
// Environment: client
 
export class DashboardComponent {
  metrics$!: Observable<{ cpu: number; memory: number }>
 
  async ngOnInit() {
    this.metrics$ = await onMetrics()
  }
}
<!-- dashboard.component.html -->
<!-- Environment: client -->
 
@if (metrics$ | async; as m) {
  CPU: {{ m.cpu }}% — Memory: {{ m.memory }}MB
}

Lifecycle

Unsubscribing stops data flow immediately. The underlying channel is cleaned up automatically via GC, or immediately if you use close().

See also