// AiChat.telefunc.js// Environment: serverexport async function* onAiPrompt(prompt) { const stream = await ai.streamText({ prompt }) // The messages from the AI reach the client over time as a stream of messages for await (const message of stream) { yield message }}
// AiChat.telefunc.ts// Environment: serverexport async function* onAiPrompt(prompt: string): AsyncGenerator<string> { const stream = await ai.streamText({ prompt }) // The messages from the AI reach the client over time as a stream of messages for await (const message of stream) { yield message }}
// Notifications.telefunc.js// Environment: serverexport async function onSubscribe(onNotification) { // Client → server: the server calls the client's callback as events arrive const unsubscribe = notifications.subscribe((text) => onNotification(text)) // Server → client: the client calls the returned function to stop listening return unsubscribe}
// Notifications.telefunc.ts// Environment: serverexport async function onSubscribe(onNotification: (text: string) => void) { // Client → server: the server calls the client's callback as events arrive const unsubscribe = notifications.subscribe((text) => onNotification(text)) // Server → client: the client calls the returned function to stop listening return unsubscribe}
// Dashboard.telefunc.ts// Environment: serverexport async function onLoadDashboard() { // The client receives each property at three different times return { title: 'Dashboard', // Delivered immediately summary: fetchSummary(), // Delivered quickly, but later report: fetchExtensiveReport() // Delivered much later }}
// Compress.telefunc.js// Environment: serverexport function onCompress(data) { // A stream flows in, a transformed stream flows back out return data.pipeThrough(new CompressionStream('gzip'))}
// Compress.telefunc.ts// Environment: serverexport function onCompress(data: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> { // A stream flows in, a transformed stream flows back out return data.pipeThrough(new CompressionStream('gzip'))}
An async function* (AsyncGenerator) is a good fit for short-lived streaming: a finite sequence of values that arrive one at a time and then completes. Examples:
AI answer stream
Progress updates for a long-running task (e.g. upload)
Streaming search results as they're found
Rows of a large SQL query, streamed to the UI
It's a good fit because a generator closes itself when it completes. (Other streaming primitives like callbacks have no such completion signal.)
You can also use it for any other use case such as never-ending streams — the deciding factor is usually just DX. (Telefunc picks a different default transport depending on the primitive, but you can change the transport to match your needs.)
Example: countdown
A simple example:
// Countdown.telefunc.js// Environment: serverexport async function* onCountdown(from) { for (let i = from; i >= 0; i--) { yield i await new Promise((resolve) => setTimeout(resolve, 1000)) }}
// Countdown.telefunc.ts// Environment: serverexport async function* onCountdown(from: number) { for (let i = from; i >= 0; i--) { yield i await new Promise((resolve) => setTimeout(resolve, 1000)) }}
JavaScript
TypeScript
// Countdown.jsx// Environment: clientimport { onCountdown } from './Countdown.telefunc'for await (const n of onCountdown(3)) { console.log(n) // 3, 2, 1, 0 — one per second}
// Countdown.tsx// Environment: clientimport { onCountdown } from './Countdown.telefunc'for await (const n of onCountdown(3)) { console.log(n) // 3, 2, 1, 0 — one per second}
Example: AI chat
A full-fledged example:
// AiChat.telefunc.js// Environment: serverimport { getContext } from 'telefunc'export async function* onAiPrompt(prompt) { const context = getContext() const { stream, cancel } = await ai(prompt) // When the client stops the stream (e.g. the user presses "Stop"), cancel the // upstream AI request — otherwise it keeps running and burning AI tokens context.onClose(cancel) for await (const message of stream) { yield message }}
// AiChat.telefunc.ts// Environment: serverimport { getContext } from 'telefunc'export async function* onAiPrompt(prompt: string): AsyncGenerator<string> { const context = getContext() const { stream, cancel } = await ai(prompt) // When the client stops the stream (e.g. the user presses "Stop"), cancel the // upstream AI request — otherwise it keeps running and burning AI tokens context.onClose(cancel) for await (const message of stream) { yield message }}
JavaScript
TypeScript
// AiChat.jsx// Environment: clientimport { onAiPrompt } from './AiChat.telefunc'const gen = onAiPrompt('Tell me a joke')// "Stop" button — close the stream early; this fires onClose() on the serverstopButton.onclick = () => gen.return()for await (const message of gen) { console.log(message) // Each message arrives as it's yielded}
// AiChat.tsx// Environment: clientimport { onAiPrompt } from './AiChat.telefunc'const gen = onAiPrompt('Tell me a joke')// "Stop" button — close the stream early; this fires onClose() on the serverstopButton.onclick = () => gen.return()for await (const message of gen) { console.log(message) // Each message arrives as it's yielded}
The underlying stream automatically closes itself when the client stops using (i.e. stops referencing) onProgress, but you can also manually close it (eagerly), see API > close().
Under the hood, a passed function is just a Channel. A callback has the same lifecycle as a channel, only exposed with a nicer JavaScript DX.
Reach for a raw Channel if you need more than call-and-return — e.g. two-way messaging, broadcast, or binary.
Primitive: Multiple Promise
If you return promises without awaiting them, the client receives the rest of the object immediately, and each promise resolves on its own.
// Dashboard.telefunc.js// Environment: serverexport async function onLoadDashboard() { const title = 'Dashboard' const summaryPromise = fetchSummary() const reportPromise = fetchExtensiveReport() return { // Delivered to the client immediately title, // Delivered to the client when summaryPromise resolves summary: summaryPromise, // Delivered to the client when reportPromise resolves report: reportPromise }}
// Dashboard.telefunc.ts// Environment: serverexport async function onLoadDashboard() { const title = 'Dashboard' const summaryPromise = fetchSummary() const reportPromise = fetchExtensiveReport() return { // Delivered to the client immediately title, // Delivered to the client when summaryPromise resolves summary: summaryPromise, // Delivered to the client when reportPromise resolves report: reportPromise }}
// Dashboard.tsx// Environment: clientimport { onLoadDashboard } from './Dashboard.telefunc'const res = await onLoadDashboard()console.log(res.title) // Available right awayconst summary = await res.summary // Await when neededconst report = await res.report
Don't await inside the telefunction — that blocks the entire response:
export async function onLoadDashboard() { // ❌ Blocks the entire call const data = await prepare() return { title: 'Dashboard', data }}
Return the promise instead:
export async function onLoadDashboard() { // ✅ title delivered immediately, promise resolves in the background return { title: 'Dashboard', data: prepare() }}
As with other streaming primitives, you can use onClose() and close to eagerly cancel (e.g. fetchExtensiveReport()).
Primitive: ReadableStream
Stream raw bytes with a standard ReadableStream — in either direction.
Server → client
Return a ReadableStream:
// Download.telefunc.js// Environment: serverimport fs from 'node:fs'import { Readable } from 'node:stream'export function onDownload(fileName) { return Readable.toWeb(fs.createReadStream(`./files/${fileName}`))}
// Download.telefunc.ts// Environment: serverimport fs from 'node:fs'import { Readable } from 'node:stream'export function onDownload(fileName: string): ReadableStream<Uint8Array> { return Readable.toWeb(fs.createReadStream(`./files/${fileName}`)) as ReadableStream<Uint8Array>}
JavaScript
TypeScript
// Download.jsx// Environment: clientimport { onDownload } from './Download.telefunc'const stream = await onDownload('report.pdf')const reader = stream.getReader()while (true) { const { done, value } = await reader.read() if (done) break // Process raw bytes}
// Download.tsx// Environment: clientimport { onDownload } from './Download.telefunc'const stream = await onDownload('report.pdf')const reader = stream.getReader()while (true) { const { done, value } = await reader.read() if (done) break // Process raw bytes}
// Compress.jsx// Environment: clientimport { onCompress } from './Compress.telefunc'const compressed = await onCompress(data) // data: ReadableStream<Uint8Array>const blob = await new Response(compressed).blob() // drain the gzipped bytes
// Compress.tsx// Environment: clientimport { onCompress } from './Compress.telefunc'const compressed = await onCompress(data) // data: ReadableStream<Uint8Array>const blob = await new Response(compressed).blob() // drain the gzipped bytes
The transform can be arbitrarily heavy — for example, compressing a large video into a smaller MP4 and streaming it back to the client as it encodes:
// CompressVideo.telefunc.js// Environment: serverimport { getContext } from 'telefunc'import { spawn } from 'node:child_process'import { Readable, Writable } from 'node:stream'export function onCompressVideo(video) { const { onClose } = getContext() // Re-encode to a smaller H.264/MP4, on the fly const args = ['-i', 'pipe:0', '-movflags', 'frag_keyframe+empty_moov', '-f', 'mp4', 'pipe:1'] const ffmpeg = spawn('ffmpeg', args) // Kill the encoder if the client disconnects mid-stream — otherwise the process leaks onClose(() => ffmpeg.kill()) video.pipeTo(Writable.toWeb(ffmpeg.stdin)) return Readable.toWeb(ffmpeg.stdout)}
// CompressVideo.telefunc.ts// Environment: serverimport { getContext } from 'telefunc'import { spawn } from 'node:child_process'import { Readable, Writable } from 'node:stream'export function onCompressVideo(video: ReadableStream<Uint8Array>): ReadableStream<Uint8Array> { const { onClose } = getContext() // Re-encode to a smaller H.264/MP4, on the fly const args = ['-i', 'pipe:0', '-movflags', 'frag_keyframe+empty_moov', '-f', 'mp4', 'pipe:1'] const ffmpeg = spawn('ffmpeg', args) // Kill the encoder if the client disconnects mid-stream — otherwise the process leaks onClose(() => ffmpeg.kill()) video.pipeTo(Writable.toWeb(ffmpeg.stdin)) return Readable.toWeb(ffmpeg.stdout) as ReadableStream<Uint8Array>}
JavaScript
TypeScript
The memory consumption is constant regardless of file size.
A spawned process outlives the call and keeps burning CPU if the client navigates away mid-encode — make sure to terminate it in onClose(), see Cleanup.
Primitive: Channel
For ongoing two-way or broadcast messaging, you can use a channel, see:
This is the low-level primitive that powers most of Telefunc Stream.
Instead of directly using channels, most users reach for high-level primitives (e.g. callbacks) and high-level integrations (e.g. @telefunc/tanstack-query).
// Counter.telefunc.js// Environment: serverimport { getContext } from 'telefunc'export async function onInit(onTick) { const context = getContext() let n = 0 const interval = setInterval(() => onTick(++n), 1000) context.onClose(() => { // ⚠️ Must be released otherwise it runs forever clearInterval(interval) }) }
// Counter.telefunc.ts// Environment: serverimport { getContext } from 'telefunc'export async function onInit(onTick: (n: number) => void) { const context = getContext() let n = 0 const interval = setInterval(() => onTick(++n), 1000) context.onClose(() => { // ⚠️ Must be released otherwise it runs forever clearInterval(interval) }) }
JavaScript
TypeScript
⚠️
Any resource a telefunction opens (setInterval, an event subscription, a DB cursor, an upstream stream) outlives the telefunction call — make sure to always clear resources when streams close.
You can also use a signal (AbortSignal) and channel.onClose() to listen for when a stream closes, see:
A streaming telefunction authorizes like any other telefunction (see Guides > Permissions): check getContext() and throw Abort()at open time, before you stream anything back to the client. Here the client passes a callback and the server calls it:
// TeamFeed.telefunc.js// Environment: serverimport { getContext, Abort } from 'telefunc'export async function onTeamFeed(teamId, onMessage) { const context = getContext() if (!context.user?.teams.includes(teamId)) throw Abort() // Authorized — push this team's messages to the client const unwatch = watchTeam(teamId, onMessage) // Stop watching when the client disconnects — otherwise the subscription leaks context.onClose(unwatch)}
// TeamFeed.telefunc.ts// Environment: serverimport { getContext, Abort } from 'telefunc'export async function onTeamFeed(teamId: string, onMessage: (text: string) => void) { const context = getContext() if (!context.user?.teams.includes(teamId)) throw Abort() // Authorized — push this team's messages to the client const unwatch = watchTeam(teamId, onMessage) // Stop watching when the client disconnects — otherwise the subscription leaks context.onClose(unwatch)}
getContext() and throw Abort() only work before the telefunction returns — so authorize at open time. The context.user you capture stays valid inside callbacks which run later (via JavaScript closure).
Consider re-checking. For sensitive operations, you can re-check authorization with the captured context.user each time the callback runs, although most use cases don't need this.
Error handling
Environment: server & client.
onClose((err) => { if (!err) { // Graceful close } else if (err instanceof Abort) { // Either side called abort(value) console.log(err.abortValue) } else if (err instanceof NetworkError) { // Connection lost (couldn't reconnect) }})
// Search.jsx// Environment: clientconst results = onSearch('computers')cancelButton.onclick = () => { // Close it results.return() // fires onClose() on the server}for await (const row of results) render(row)
// Search.tsx// Environment: clientconst results = onSearch('computers')cancelButton.onclick = () => { // Close it results.return() // fires onClose() on the server}for await (const row of results) render(row)
// Search.telefunc.js// Environment: serverimport { getContext } from 'telefunc'export async function* onSearch(query) { const context = getContext() const cursor = db.search(query) context.onClose(() => { // Release the database cursor cursor.close() }) for await (const row of cursor) yield row}
// Search.telefunc.ts// Environment: serverimport { getContext } from 'telefunc'export async function* onSearch(query: string) { const context = getContext() const cursor = db.search(query) context.onClose(() => { // Release the database cursor cursor.close() }) for await (const row of cursor) yield row}
JavaScript
TypeScript
Scaling
You can scale Telefunc horizontally (multiple server instances across multiple processes, containers, or machines) by adding sticky sessions and a cross-instance broadcast transport.
On Cloudflare Workers, Telefunc routes channels through Durable Objects and fans out broadcasts across regions automatically — so neither a sticky load balancer nor a broadcast transport is needed.