@telefunc/rxjs
Beta — Telefunc Stream is in beta: breaking changes may occur in any version update.
The @telefunc/rxjs integration lets you pass RxJS Observable and Subject instances directly between client and server — in both directions, and every RxJS operator works across the boundary.
Automatic runtime type validation: all values sent from the client to the server are validated against your TypeScript types at runtime (no need for Zod).
The
@telefunc/rxjsintegration is powered by Telefunc Stream.
Install
npm install @telefunc/rxjs rxjspnpm add @telefunc/rxjs rxjsbun add @telefunc/rxjs rxjsyarn add @telefunc/rxjs rxjsThat's it.
The Telefunc bundler plugin (Vite, webpack, Next.js, Babel) detects
@telefunc/rxjsin your dependencies and registers it automatically. Without a Telefunc bundler plugin, register it manually —import '@telefunc/rxjs/server'in your server entry andimport '@telefunc/rxjs/client'in your client entry.
Example: live stock ticker
The server pushes prices every second. The client filters and limits them locally with standard RxJS operators.
// StockPrice.telefunc.js
// Environment: server
import { interval, map } from 'rxjs'
export async function onStockPrice(symbol) {
return interval(1000).pipe(
map(() => ({ symbol, price: getLatestPrice(symbol), time: Date.now() }))
)
}// StockPrice.telefunc.ts
// Environment: server
import { interval, map } from 'rxjs'
export async function onStockPrice(symbol: string) {
return interval(1000).pipe(
map(() => ({ symbol, price: getLatestPrice(symbol), time: Date.now() }))
)
}// StockPrice.jsx
// Environment: client
import { filter, take } from 'rxjs'
const price$ = await onStockPrice('AAPL')
price$
.pipe(
filter((p) => p.price > 150),
take(10)
)
.subscribe((p) => updateChart(p))// StockPrice.tsx
// Environment: client
import { filter, take } from 'rxjs'
const price$ = await onStockPrice('AAPL')
price$.pipe(
filter(p => p.price > 150),
take(10)
).subscribe(p => updateChart(p))Example: collaborative editor
Return a shared Subject to multiple clients and it multicasts among them: when one client emits with next(), every other client's subscribers receive the value through the server. The emitting client's own subscribers receive it locally too — the server doesn't echo a client's next() back to that same client.
// Editor.telefunc.js
// Environment: server
import { Subject } from 'rxjs'
const edits = new Subject()
export async function onJoinEditor() {
return edits
}// Editor.telefunc.ts
// Environment: server
import { Subject } from 'rxjs'
const edits = new Subject<{ userId: string; text: string }>()
export async function onJoinEditor() {
return edits
}// Editor.jsx
// Environment: client
const edits = await onJoinEditor()
edits.subscribe((edit) => applyEdit(edit))
edits.next({ userId: 'alice', text: 'Hello' })// Editor.tsx
// Environment: client
const edits = await onJoinEditor()
edits.subscribe(edit => applyEdit(edit))
edits.next({ userId: 'alice', text: 'Hello' })Single server. A module-level
Subjectlives in one server process. These multicast examples — the editor above and Live cursors below — work as-is on a single instance. Across multiple instances, each server process has its ownSubject, so route shared state through a broadcast transport instead — see Stream at Scale.
Example: click heatmap (client → server)
Pass an Observable as a telefunction argument. The server subscribes and processes the stream — a pattern useful for telemetry, analytics, or any client-driven event stream.
// Heatmap.telefunc.js
// Environment: server
import { Observable, bufferTime, filter } from 'rxjs'
export async function onTrackClicks(clicks$) {
clicks$
.pipe(
bufferTime(5000),
filter((batch) => batch.length > 0)
)
.subscribe((batch) => saveHeatmapBatch(batch))
}// Heatmap.telefunc.ts
// Environment: server
import { Observable, bufferTime, filter } from 'rxjs'
export async function onTrackClicks(clicks$: Observable<{ x: number; y: number }>) {
clicks$.pipe(
bufferTime(5000),
filter(batch => batch.length > 0)
).subscribe(batch => saveHeatmapBatch(batch))
}// Heatmap.jsx
// Environment: client
import { fromEvent, map } from 'rxjs'
const clicks$ = fromEvent(document, 'click').pipe(map((e) => ({ x: e.clientX, y: e.clientY })))
await onTrackClicks(clicks$)// Heatmap.tsx
// Environment: client
import { fromEvent, map } from 'rxjs'
const clicks$ = fromEvent<MouseEvent>(document, 'click').pipe(
map(e => ({ x: e.clientX, y: e.clientY }))
)
await onTrackClicks(clicks$)Example: live cursors
A shared Subject multicasts cursor positions among all connected users. The server attaches the user identity from context.
// Whiteboard.telefunc.js
// Environment: server
import { Subject } from 'rxjs'
import { getContext } from 'telefunc'
const cursors = new Subject()
export async function onCursors() {
const { user, onClose } = getContext()
const input = new Subject()
const sub = input.subscribe((pos) => cursors.next({ ...pos, userId: user.id }))
onClose(() => sub.unsubscribe()) // release the per-client subscription on disconnect
return { cursors, input }
}// Whiteboard.telefunc.ts
// Environment: server
import { Subject } from 'rxjs'
import { getContext } from 'telefunc'
const cursors = new Subject<{ userId: string; x: number; y: number }>()
export async function onCursors() {
const { user, onClose } = getContext()
const input = new Subject<{ x: number; y: number }>()
const sub = input.subscribe(pos => cursors.next({ ...pos, userId: user.id }))
onClose(() => sub.unsubscribe()) // release the per-client subscription on disconnect
return { cursors, input }
}// Whiteboard.jsx
// Environment: client
const { cursors, input } = await onCursors()
cursors.subscribe((pos) => drawCursor(pos.userId, pos.x, pos.y))
document.addEventListener('mousemove', (e) => {
input.next({ x: e.clientX, y: e.clientY })
})// Whiteboard.tsx
// Environment: client
const { cursors, input } = await onCursors()
cursors.subscribe(pos => drawCursor(pos.userId, pos.x, pos.y))
document.addEventListener('mousemove', e => {
input.next({ x: e.clientX, y: e.clientY })
})Example: Angular
Angular uses RxJS extensively. With @telefunc/rxjs, telefunctions return Observables directly — pipe them into templates with | async, use them in services, or compose them with the rest of your RxJS code.
// Dashboard.telefunc.js
// Environment: server
import { interval, map } from 'rxjs'
export async function onMetrics() {
return interval(1000).pipe(map(() => ({ cpu: getCpu(), memory: getMemory() })))
}// Dashboard.telefunc.ts
// Environment: server
import { interval, map } from 'rxjs'
export async function onMetrics() {
return interval(1000).pipe(
map(() => ({ cpu: getCpu(), memory: getMemory() }))
)
}The | async pipe subscribes and auto-unsubscribes when the component is destroyed:
// dashboard.component.js
// Environment: client
export class DashboardComponent {
async ngOnInit() {
this.metrics$ = await onMetrics()
}
}// dashboard.component.ts
// Environment: client
export class DashboardComponent {
metrics$!: Observable<{ cpu: number; memory: number }>
async ngOnInit() {
this.metrics$ = await onMetrics()
}
}<!-- dashboard.component.html -->
<!-- Environment: client -->
@if (metrics$ | async; as m) {
CPU: {{ m.cpu }}% — Memory: {{ m.memory }}MB
}Lifecycle
Unsubscribing stops data flow immediately. The underlying channel is cleaned up automatically via GC, or immediately if you use close().