Skip to main content

Streaming

Every AI application deals with streaming data. LLM responses arrive token by token, progress updates flow continuously, and data accumulates over time. Most frameworks treat streaming as infrastructure you wire up yourself—WebSocket event handlers, state synchronization logic, cleanup callbacks. You spend your time building plumbing instead of features. Idyllic takes a different approach. When you declare a field with stream<T>, you’re telling the framework that this field receives incremental updates that should be pushed to clients in real-time. The framework sets up WebSocket infrastructure, handles buffering, and ensures updates arrive in order. You just call append() with each chunk.
@field result = stream<string>('');  // This field streams
@field title = '';                    // This field doesn't
That single type annotation encapsulates the entire streaming system: transport, synchronization, and lifecycle management.

The Streaming Interface

A streamable value has different interfaces for reading and writing. Clients see a read-only interface; the server sees mutation methods for appending data, completing the stream, or signaling failures.

Client Interface

interface StreamableValue<T> {
  readonly current: T;
  readonly status: 'idle' | 'streaming' | 'done' | 'error';
  readonly error?: Error;
  value(): Promise<T>;
}
The current property holds the accumulated value, updating automatically as chunks arrive. The status indicates lifecycle state: 'idle' means not started, 'streaming' means data is arriving, 'done' means completed successfully, 'error' means failure occurred. The value() method returns a promise that resolves with the final accumulated value once streaming completes, or rejects if the stream encounters an error.

Server Interface

interface WritableStreamableValue<T> extends StreamableValue<T> {
  append(chunk: T extends string ? string : Partial<T>): void;
  set(value: T): void;
  complete(): void;
  fail(error: Error): void;
  reset(): void;
}
Five mutation methods control how data flows into the stream:
  • append(chunk) adds data incrementally. Strings concatenate, objects shallow-merge, arrays concatenate.
  • set(value) replaces the entire value at once.
  • complete() marks the stream as finished, resolving any pending value() promises.
  • fail(error) marks the stream as failed, rejecting any pending value() promises.
  • reset() clears accumulated value, sets status to 'idle', prepares for reuse.

Writing to Streams

The typical pattern: reset the stream, append chunks as they arrive, mark complete when done.
@action()
async generate(prompt: string) {
  this.result.reset();

  for await (const chunk of ai.stream(prompt)) {
    this.result.append(chunk);
  }

  this.result.complete();
}
Each append() immediately updates current, broadcasts to all connected clients, and returns without blocking. Clients see their UI update progressively as tokens arrive.

Error Handling

Wrap streaming in try-catch and call fail() to communicate errors to clients:
@action()
async generate(prompt: string) {
  this.result.reset();

  try {
    for await (const chunk of ai.stream(prompt)) {
      this.result.append(chunk);
    }
    this.result.complete();
  } catch (error) {
    this.result.fail(error);
  }
}
When you call fail(), status becomes 'error', pending value() promises reject, and current retains whatever partial content arrived before failure.

Replacing Values

Use set() when each update represents a complete new state rather than additional data:
@action()
async analyze(data: Data) {
  this.analysis.set({ score: 0, stage: 'parsing' });
  await parseData(data);

  this.analysis.set({ score: 0.3, stage: 'evaluating' });
  const score = await evaluate(data);

  this.analysis.set({ score, stage: 'complete' });
  this.analysis.complete();
}

Reading Streams in React

Switch on status to show appropriate UI for each lifecycle state:
function Output() {
  const { result } = useSystem<Writer>();

  return (
    <div>
      {result.status === 'idle' && (
        <span className="text-gray-400">Ready</span>
      )}

      {result.status === 'streaming' && (
        <>
          <span>{result.current}</span>
          <span className="animate-pulse">|</span>
        </>
      )}

      {result.status === 'done' && (
        <span>{result.current}</span>
      )}

      {result.status === 'error' && (
        <span className="text-red-500">Error: {result.error?.message}</span>
      )}
    </div>
  );
}
React re-renders each time a chunk arrives. The blinking cursor during streaming gives users visual feedback that content is arriving.

Awaiting Completion

When you need the final value rather than reacting to incremental updates:
const final = await result.value();
This promise resolves when complete() is called, rejects when fail() is called, and remains pending during streaming.

Object and Array Streams

Object Streams

For objects, append() performs shallow merge. Each call merges only the properties you include:
@field data = stream<{ title: string; body: string }>({ title: '', body: '' });

// In an action
this.data.append({ title: 'My Document' });  // { title: 'My Document', body: '' }
this.data.append({ body: 'Content...' });    // { title: 'My Document', body: 'Content...' }
Note: the merge operates at the property level, not within property values. To append to a string property within an object, read current value and concatenate, or use a separate string stream.

Array Streams

For arrays, append() concatenates items to the end:
@field items = stream<string[]>([]);

this.items.append(['first']);           // ['first']
this.items.append(['second', 'third']); // ['first', 'second', 'third']

Patterns

Parallel Streams

Multiple streams can update simultaneously. Each operates independently:
@field title = stream<string>('');
@field body = stream<string>('');

@action()
async generate(topic: string) {
  await Promise.all([
    this.streamTitle(topic),
    this.streamBody(topic)
  ]);
}

private async streamTitle(topic: string) {
  this.title.reset();
  for await (const chunk of titleStream) {
    this.title.append(chunk);
  }
  this.title.complete();
}
Clients see both streams updating in real-time, with chunks interleaved as they arrive.

Await All Streams

Wait for multiple streams before proceeding:
const [title, body] = await Promise.all([
  this.title.value(),
  this.body.value()
]);
await saveDocument({ title, body });

Conditional Streaming

Bypass streaming when data is cached:
@action()
async respond(query: string) {
  this.result.reset();

  if (shouldUseCache(query)) {
    this.result.set(getCachedResponse(query));
    this.result.complete();
    return;
  }

  for await (const chunk of ai.stream(query)) {
    this.result.append(chunk);
  }
  this.result.complete();
}

Edge Cases

ScenarioBehavior
Client connects mid-streamReceives accumulated value, continues receiving appends
Server errors mid-streamStatus becomes 'error', current contains partial content
Multiple clientsAll receive same chunks in same order
append() after completeThrows error—reset first
complete() twiceNo-op (idempotent)

FAQ

What happens if append() is called very fast?

Idyllic batches rapid updates for network efficiency. If you call append() 100 times in a tight loop, the framework may combine updates into fewer WebSocket messages. Clients still receive all content in order—batching is transparent.

Can I have multiple active streams on the same field?

No. Each stream field supports one active stream at a time. Call reset() to start a new stream. For multiple simultaneous streams, use separate fields.

What if I forget to call complete()?

The stream remains in 'streaming' status indefinitely. Any value() promises never resolve. Always ensure streaming code paths call either complete() or fail(). Consider a try-finally pattern.

What’s the difference between stream<T> and regular fields?

Regular fields synchronize their values when assigned but don’t have lifecycle semantics or incremental updates. Stream fields provide: current that updates progressively, status tracking, value() promises, and mutation methods. Use stream<T> when clients need to see data arriving progressively.

Next: Actions

How methods become typed RPC endpoints