Edit

Stream

BetaTelefunc Stream is in beta: breaking changes may occur in any version update.

Telefunc Stream supports streaming (one-way streams) and real-time (two-way streams) with:

  • Primitives
  • Integrations
  • Seamless DX

Here the word stream means — broadly speaking, as Wikipedia defines it — not just a ReadableStream but any sequence of data made available over time.

Primitives

  1. AsyncGenerator (function*):

    // AiChat.telefunc.js
    // Environment: server
     
    export async function* onAiPrompt(prompt) {
      const stream = await ai.streamText({ prompt })
      // The messages from the AI reach the client over time as a stream of messages
      for await (const message of stream) {
        yield message
      }
    }
    // AiChat.telefunc.ts
    // Environment: server
     
    export async function* onAiPrompt(prompt: string): AsyncGenerator<string> {
      const stream = await ai.streamText({ prompt })
      // The messages from the AI reach the client over time as a stream of messages
      for await (const message of stream) {
        yield message
      }
    }
  2. Callback (function passing):

    // Notifications.telefunc.js
    // Environment: server
     
    export async function onSubscribe(onNotification) {
      // Client → server: the server calls the client's callback as events arrive
      const unsubscribe = notifications.subscribe((text) => onNotification(text))
      // Server → client: the client calls the returned function to stop listening
      return unsubscribe
    }
    // Notifications.telefunc.ts
    // Environment: server
     
    export async function onSubscribe(onNotification: (text: string) => void) {
      // Client → server: the server calls the client's callback as events arrive
      const unsubscribe = notifications.subscribe((text) => onNotification(text))
      // Server → client: the client calls the returned function to stop listening
      return unsubscribe
    }
  3. Multiple Promise:

    // Dashboard.telefunc.ts
    // Environment: server
     
    export async function onLoadDashboard() {
      // The client receives each property at three different times
      return {
        title: 'Dashboard',             // Delivered immediately
        summary: fetchSummary(),        // Delivered quickly, but later
        report: fetchExtensiveReport()  // Delivered much later
      }
    }
  4. ReadableStream:

    // Compress.telefunc.js
    // Environment: server
     
    export function onCompress(data) {
      // A stream flows in, a transformed stream flows back out
      return data.pipeThrough(new CompressionStream('gzip'))
    }
    // Compress.telefunc.ts
    // Environment: server
     
    export function onCompress(data: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> {
      // A stream flows in, a transformed stream flows back out
      return data.pipeThrough(new CompressionStream('gzip'))
    }
  5. Channel: API for advanced real-time use cases.

A single telefunction can mix primitives (generators, channels, promises, ...) side by side, each resolving independently.

Integrations

@telefunc/tanstack-query: automatically synced TanStack queries.
@telefunc/rxjs: reactive streams and operators.

Integrations provide an even more seamless DX: they handle more for you. They're powered by the primitives listed on this page.

Seamless DX

Runtime type validation — automatically validates every value sent from the client to the server against your TypeScript types (no need for Zod).
Transport — automatically picks the most performant available transport (HTTP, SSE, WebSocket).
Backpressureautomatic backpressure when the network is the bottleneck.
Reconnectionautomatic recovery from network issues.

Primitive: AsyncGenerator (function*)

An async function* (AsyncGenerator) is a good fit for short-lived streaming: a finite sequence of values that arrive one at a time and then completes. Examples:

  • AI answer stream
  • Progress updates for a long-running task (e.g. upload)
  • Streaming search results as they're found
  • Rows of a large SQL query, streamed to the UI

It's a good fit because a generator closes itself when it completes. (Other streaming primitives like callbacks have no such completion signal.)

You can also use it for any other use case such as never-ending streams — the deciding factor is usually just DX. (Telefunc picks a different default transport depending on the primitive, but you can change the transport to match your needs.)

Example: countdown

A simple example:

// Countdown.telefunc.js
// Environment: server
 
export async function* onCountdown(from) {
  for (let i = from; i >= 0; i--) {
    yield i
    await new Promise((resolve) => setTimeout(resolve, 1000))
  }
}
// Countdown.telefunc.ts
// Environment: server
 
export async function* onCountdown(from: number) {
  for (let i = from; i >= 0; i--) {
    yield i
    await new Promise((resolve) => setTimeout(resolve, 1000))
  }
}
// Countdown.jsx
// Environment: client
 
import { onCountdown } from './Countdown.telefunc'
 
for await (const n of onCountdown(3)) {
  console.log(n) // 3, 2, 1, 0 — one per second
}
// Countdown.tsx
// Environment: client
 
import { onCountdown } from './Countdown.telefunc'
 
for await (const n of onCountdown(3)) {
  console.log(n) // 3, 2, 1, 0 — one per second
}

Example: AI chat

A full-fledged example:

// AiChat.telefunc.js
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function* onAiPrompt(prompt) {
  const context = getContext()
  const { stream, cancel } = await ai(prompt)
 
  // When the client stops the stream (e.g. the user presses "Stop"), cancel the
  // upstream AI request — otherwise it keeps running and burning AI tokens
  context.onClose(cancel)
 
  for await (const message of stream) {
    yield message
  }
}
// AiChat.telefunc.ts
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function* onAiPrompt(prompt: string): AsyncGenerator<string> {
  const context = getContext()
  const { stream, cancel } = await ai(prompt)
 
  // When the client stops the stream (e.g. the user presses "Stop"), cancel the
  // upstream AI request — otherwise it keeps running and burning AI tokens
  context.onClose(cancel)
 
  for await (const message of stream) {
    yield message
  }
}
// AiChat.jsx
// Environment: client
 
import { onAiPrompt } from './AiChat.telefunc'
 
const gen = onAiPrompt('Tell me a joke')
 
// "Stop" button — close the stream early; this fires onClose() on the server
stopButton.onclick = () => gen.return()
 
for await (const message of gen) {
  console.log(message) // Each message arrives as it's yielded
}
// AiChat.tsx
// Environment: client
 
import { onAiPrompt } from './AiChat.telefunc'
 
const gen = onAiPrompt('Tell me a joke')
 
// "Stop" button — close the stream early; this fires onClose() on the server
stopButton.onclick = () => gen.return()
 
for await (const message of gen) {
  console.log(message) // Each message arrives as it's yielded
}

See also:

Primitive: Callback (function passing)

You can pass functions between client and server:

  • Client → server: pass a callback as a telefunction argument — the server calls it.
  • Server → client: return a function from a telefunction — the client calls it.
// Upload.telefunc.js
// Environment: server
 
export async function onUpload(file, onProgress) {
  let loaded = 0
  for await (const chunk of file.stream()) {
    writeChunk(chunk)
    loaded += chunk.byteLength
    onProgress(Math.round((loaded / file.size) * 100))
  }
}
// Upload.telefunc.ts
// Environment: server
 
export async function onUpload(file: File, onProgress: (percent: number) => void) {
  let loaded = 0
  for await (const chunk of file.stream()) {
    writeChunk(chunk)
    loaded += chunk.byteLength
    onProgress(Math.round((loaded / file.size) * 100))
  }
}
// Upload.jsx
// Environment: client
 
await onUpload(file, (percent) => setProgress(percent))
// Upload.tsx
// Environment: client
 
await onUpload(file, (percent) => setProgress(percent))

The underlying stream automatically closes itself when the client stops using (i.e. stops referencing) onProgress, but you can also manually close it (eagerly), see API > close().

Under the hood, a passed function is just a Channel. A callback has the same lifecycle as a channel, only exposed with a nicer JavaScript DX.

Reach for a raw Channel if you need more than call-and-return — e.g. two-way messaging, broadcast, or binary.

Primitive: Multiple Promise

If you return promises without awaiting them, the client receives the rest of the object immediately, and each promise resolves on its own.

// Dashboard.telefunc.js
// Environment: server
 
export async function onLoadDashboard() {
  const title = 'Dashboard'
  const summaryPromise = fetchSummary()
  const reportPromise = fetchExtensiveReport()
  return {
    // Delivered to the client immediately
    title,
    // Delivered to the client when summaryPromise resolves
    summary: summaryPromise,
    // Delivered to the client when reportPromise resolves
    report: reportPromise
  }
}
// Dashboard.telefunc.ts
// Environment: server
 
export async function onLoadDashboard() {
  const title = 'Dashboard'
  const summaryPromise = fetchSummary()
  const reportPromise = fetchExtensiveReport()
  return {
    // Delivered to the client immediately
    title,
    // Delivered to the client when summaryPromise resolves
    summary: summaryPromise,
    // Delivered to the client when reportPromise resolves
    report: reportPromise
  }
}
// Dashboard.tsx
// Environment: client
 
import { onLoadDashboard } from './Dashboard.telefunc'
 
const res = await onLoadDashboard()
 
console.log(res.title)             // Available right away
 
const summary = await res.summary  // Await when needed
const report = await res.report

Don't await inside the telefunction — that blocks the entire response:

export async function onLoadDashboard() {
  // ❌ Blocks the entire call
  const data = await prepare()
  return { title: 'Dashboard', data }
}

Return the promise instead:

export async function onLoadDashboard() {
  // ✅ title delivered immediately, promise resolves in the background
  return {
    title: 'Dashboard',
    data: prepare()
  }
}

As with other streaming primitives, you can use onClose() and close to eagerly cancel (e.g. fetchExtensiveReport()).

Primitive: ReadableStream

Stream raw bytes with a standard ReadableStream — in either direction.

Server → client

Return a ReadableStream:

// Download.telefunc.js
// Environment: server
 
import fs from 'node:fs'
import { Readable } from 'node:stream'
 
export function onDownload(fileName) {
  return Readable.toWeb(fs.createReadStream(`./files/${fileName}`))
}
// Download.telefunc.ts
// Environment: server
 
import fs from 'node:fs'
import { Readable } from 'node:stream'
 
export function onDownload(fileName: string): ReadableStream<Uint8Array> {
  return Readable.toWeb(fs.createReadStream(`./files/${fileName}`)) as ReadableStream<Uint8Array>
}
// Download.jsx
// Environment: client
 
import { onDownload } from './Download.telefunc'
 
const stream = await onDownload('report.pdf')
const reader = stream.getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  // Process raw bytes
}
// Download.tsx
// Environment: client
 
import { onDownload } from './Download.telefunc'
 
const stream = await onDownload('report.pdf')
const reader = stream.getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  // Process raw bytes
}

You can also return a File or Blob instead, see: Guides > File download.

Client → server

Pass a ReadableStream as a telefunction argument:

// Ingest.telefunc.js
// Environment: server
 
import fs from 'node:fs'
import { Writable } from 'node:stream'
 
export async function onIngest(stream, filename) {
  const dest = fs.createWriteStream(`./uploads/${filename}`)
  await stream.pipeTo(Writable.toWeb(dest))
}
// Ingest.telefunc.ts
// Environment: server
 
import fs from 'node:fs'
import { Writable } from 'node:stream'
 
export async function onIngest(stream: ReadableStream<Uint8Array>, filename: string) {
  const dest = fs.createWriteStream(`./uploads/${filename}`)
  await stream.pipeTo(Writable.toWeb(dest))
}
// Ingest.jsx
// Environment: client
 
import { onIngest } from './Ingest.telefunc'
 
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue(new TextEncoder().encode('chunk 1'))
    controller.enqueue(new TextEncoder().encode('chunk 2'))
    controller.close()
  }
})
 
await onIngest(stream, 'data.txt')
// Ingest.tsx
// Environment: client
 
import { onIngest } from './Ingest.telefunc'
 
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue(new TextEncoder().encode('chunk 1'))
    controller.enqueue(new TextEncoder().encode('chunk 2'))
    controller.close()
  }
})
 
await onIngest(stream, 'data.txt')

Both directions

Accept a ReadableStream and return one, transforming the bytes as they pass through.

A simple example transform with gzip compression:

// Compress.telefunc.js
// Environment: server
 
export function onCompress(data) {
  return data.pipeThrough(new CompressionStream('gzip'))
}
// Compress.telefunc.ts
// Environment: server
 
export function onCompress(data: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> {
  return data.pipeThrough(new CompressionStream('gzip'))
}
// Compress.jsx
// Environment: client
 
import { onCompress } from './Compress.telefunc'
 
const compressed = await onCompress(data) // data: ReadableStream<Uint8Array>
const blob = await new Response(compressed).blob() // drain the gzipped bytes
// Compress.tsx
// Environment: client
 
import { onCompress } from './Compress.telefunc'
 
const compressed = await onCompress(data) // data: ReadableStream<Uint8Array>
const blob = await new Response(compressed).blob() // drain the gzipped bytes

The transform can be arbitrarily heavy — for example, compressing a large video into a smaller MP4 and streaming it back to the client as it encodes:

// CompressVideo.telefunc.js
// Environment: server
 
import { getContext } from 'telefunc'
import { spawn } from 'node:child_process'
import { Readable, Writable } from 'node:stream'
 
export function onCompressVideo(video) {
  const { onClose } = getContext()
 
  // Re-encode to a smaller H.264/MP4, on the fly
  const args = ['-i', 'pipe:0', '-movflags', 'frag_keyframe+empty_moov', '-f', 'mp4', 'pipe:1']
  const ffmpeg = spawn('ffmpeg', args)
  // Kill the encoder if the client disconnects mid-stream — otherwise the process leaks
  onClose(() => ffmpeg.kill())
 
  video.pipeTo(Writable.toWeb(ffmpeg.stdin))
  return Readable.toWeb(ffmpeg.stdout)
}
// CompressVideo.telefunc.ts
// Environment: server
 
import { getContext } from 'telefunc'
import { spawn } from 'node:child_process'
import { Readable, Writable } from 'node:stream'
 
export function onCompressVideo(video: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> {
  const { onClose } = getContext()
 
  // Re-encode to a smaller H.264/MP4, on the fly
  const args = ['-i', 'pipe:0', '-movflags', 'frag_keyframe+empty_moov', '-f', 'mp4', 'pipe:1']
  const ffmpeg = spawn('ffmpeg', args)
  // Kill the encoder if the client disconnects mid-stream — otherwise the process leaks
  onClose(() => ffmpeg.kill())
 
  video.pipeTo(Writable.toWeb(ffmpeg.stdin))
  return Readable.toWeb(ffmpeg.stdout) as ReadableStream<Uint8Array>
}

The memory consumption is constant regardless of file size.

A spawned process outlives the call and keeps burning CPU if the client navigates away mid-encode — make sure to terminate it in onClose(), see Cleanup.

Primitive: Channel

For ongoing two-way or broadcast messaging, you can use a channel, see:

This is the low-level primitive that powers most of Telefunc Stream.

Instead of directly using channels, most users reach for high-level primitives (e.g. callbacks) and high-level integrations (e.g. @telefunc/tanstack-query).

// Dashboard.telefunc.js
// Environment: server
 
import { Channel } from 'telefunc'
 
export async function onDashboard() {
  const channel = new Channel()
 
  const interval = setInterval(() => {
    channel.send(getMetrics())
  }, 1000)
 
  channel.onClose(() => clearInterval(interval))
 
  return channel.client
}
// Dashboard.jsx
// Environment: client
 
import { onDashboard } from './Dashboard.telefunc'
 
const channel = await onDashboard()
channel.listen((metrics) => updateCharts(metrics))

Cleanup

Environment: server.

Make sure you always clean up resources:

// Counter.telefunc.js
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function onInit(onTick) {
  const context = getContext()
  let n = 0
  const interval = setInterval(() => onTick(++n), 1000)
  context.onClose(() => {
    // ⚠️ Must be released otherwise it runs forever
    clearInterval(interval) 
  }) 
}
// Counter.telefunc.ts
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function onInit(onTick: (n: number) => void) {
  const context = getContext()
  let n = 0
  const interval = setInterval(() => onTick(++n), 1000)
  context.onClose(() => { 
    // ⚠️ Must be released otherwise it runs forever
    clearInterval(interval) 
  }) 
}
⚠️

Any resource a telefunction opens (setInterval, an event subscription, a DB cursor, an upstream stream) outlives the telefunction call — make sure to always clear resources when streams close.

You can also use a signal (AbortSignal) and channel.onClose() to listen for when a stream closes, see:

A stream automatically closes when the client stops using it — you usually don't have to manually close streams, see:

Authorization

Environment: server.

A streaming telefunction authorizes like any other telefunction (see Guides > Permissions): check getContext() and throw Abort() at open time, before you stream anything back to the client. Here the client passes a callback and the server calls it:

// TeamFeed.telefunc.js
// Environment: server
 
import { getContext, Abort } from 'telefunc'
 
export async function onTeamFeed(teamId, onMessage) {
  const context = getContext()
  if (!context.user?.teams.includes(teamId)) throw Abort()
 
  // Authorized — push this team's messages to the client
  const unwatch = watchTeam(teamId, onMessage)
  // Stop watching when the client disconnects — otherwise the subscription leaks
  context.onClose(unwatch)
}
// TeamFeed.telefunc.ts
// Environment: server
 
import { getContext, Abort } from 'telefunc'
 
export async function onTeamFeed(teamId: string, onMessage: (text: string) => void) {
  const context = getContext()
  if (!context.user?.teams.includes(teamId)) throw Abort()
 
  // Authorized — push this team's messages to the client
  const unwatch = watchTeam(teamId, onMessage)
  // Stop watching when the client disconnects — otherwise the subscription leaks
  context.onClose(unwatch)
}
// TeamFeed.jsx
// Environment: client
 
import { onTeamFeed } from './TeamFeed.telefunc'
 
await onTeamFeed('acme', (text) => showMessage(text))
// TeamFeed.tsx
// Environment: client
 
import { onTeamFeed } from './TeamFeed.telefunc'
 
await onTeamFeed('acme', (text) => showMessage(text))

getContext() and throw Abort() only work before the telefunction returns — so authorize at open time. The context.user you capture stays valid inside callbacks which run later (via JavaScript closure).

Consider re-checking. For sensitive operations, you can re-check authorization with the captured context.user each time the callback runs, although most use cases don't need this.

Error handling

Environment: server & client.

onClose((err) => {
  if (!err) {
    // Graceful close
  } else if (err instanceof Abort) {
    // Either side called abort(value)
    console.log(err.abortValue)
  } else if (err instanceof NetworkError) {
    // Connection lost (couldn't reconnect)
  }
})

See: API > onClose()

The expected-vs-bug distinction described at Guides > Error handling applies to streams too:

See also:

Cancelling

Environment: server & client.

You can cancel a stream at any time by manually closing it — you can then use the onClose() hook to clear resources (server-side).

// Search.jsx
// Environment: client
 
const results = onSearch('computers')
cancelButton.onclick = () => {
  // Close it
  results.return() // fires onClose() on the server
}
 
for await (const row of results) render(row)
// Search.tsx
// Environment: client
 
const results = onSearch('computers')
cancelButton.onclick = () => {
  // Close it
  results.return() // fires onClose() on the server
}
 
for await (const row of results) render(row)
// Search.telefunc.js
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function* onSearch(query) {
  const context = getContext()
  const cursor = db.search(query)
  context.onClose(() => {
    // Release the database cursor
    cursor.close()
  })
 
  for await (const row of cursor) yield row
}
// Search.telefunc.ts
// Environment: server
 
import { getContext } from 'telefunc'
 
export async function* onSearch(query: string) {
  const context = getContext()
  const cursor = db.search(query)
  context.onClose(() => {
    // Release the database cursor
    cursor.close()
  })
 
  for await (const row of cursor) yield row
}

Scaling

You can scale Telefunc horizontally (multiple server instances across multiple processes, containers, or machines) by adding sticky sessions and a cross-instance broadcast transport.

See: Stream at Scale.

Cloudflare

On Cloudflare Workers, Telefunc routes channels through Durable Objects and fans out broadcasts across regions automatically — so neither a sticky load balancer nor a broadcast transport is needed.

See: Stream on Cloudflare.

See also