Effection Logo

Streams

We've now learned two ways to produce sequences of values:

  • Channels - for communication between Effection operations
  • Signals - for bridging external events into Effection

You may have noticed they work the same way when consuming:

// Consuming a Channel
for (const msg of yield * each(channel)) {
  console.log(msg);
  yield * each.next();
}

// Consuming a Signal
for (const event of yield * each(signal)) {
  console.log(event);
  yield * each.next();
}

That's because both Channels and Signals are Streams.

What is a Stream?

A Stream is a type you can import from Effection:

import type { Stream, Subscription } from "effection";

// Stream is just a type alias:
type Stream<T, TReturn> = Operation<Subscription<T, TReturn>>;

In plain English: a Stream is an Operation that, when you yield* it, gives you a Subscription.

  • Stream = stateless recipe (like an AsyncIterable)
  • Subscription = stateful queue you read from (like an AsyncIterator)
// Channel implements Stream
const channel: Channel<string, void> = createChannel();
const sub: Subscription<string, void> = yield * channel; // yield* a Stream → get a Subscription

// Signal implements Stream
const signal: Signal<string, void> = createSignal();
const sub: Subscription<string, void> = yield * signal; // same pattern!

Each yield* Creates a Fresh Subscription

This is the key insight: every time you yield* a Stream, you get a new, independent Subscription with its own queue.

import type { Channel, Subscription } from "effection";
import { main, createChannel, spawn, sleep } from "effection";

await main(function* () {
  const channel: Channel<number, void> = createChannel<number, void>();

  // Each yield* creates a NEW subscription with its own queue
  const sub1: Subscription<number, void> = yield* channel;
  const sub2: Subscription<number, void> = yield* channel;

  yield* spawn(function* () {
    yield* channel.send(1);
    yield* channel.send(2);
  });

  yield* sleep(10);

  // Both subscriptions receive ALL messages independently
  console.log("sub1:", (yield* sub1.next()).value); // 1
  console.log("sub1:", (yield* sub1.next()).value); // 2
  console.log("sub2:", (yield* sub2.next()).value); // 1
  console.log("sub2:", (yield* sub2.next()).value); // 2
});

Built-in Stream Producers

Effection provides several functions that return Streams:

interval(ms) - Periodic Ticks

import { main, interval, each } from "effection";

await main(function* () {
  let count = 0;

  for (const _ of yield* each(interval(1000))) {
    console.log("tick", ++count);
    if (count >= 3) break;
    yield* each.next();
  }
});

Output:

tick 1
tick 2
tick 3

on(target, eventName) - DOM Events

For browser EventTarget objects:

import { main, on, each } from "effection";

await main(function* () {
  const button = document.querySelector("button")!;

  for (const event of yield* each(on(button, "click"))) {
    console.log("Clicked at:", event.clientX, event.clientY);
    yield* each.next();
  }
});

Writing Functions That Accept Any Stream

Because Stream is a unifying type, you can write functions that work with Channels, Signals, intervals, or any other Stream:

import type { Operation, Stream } from "effection";
import { each, createChannel, createSignal, spawn, interval } from "effection";

// This function works with ANY Stream
function* logAll<T>(stream: Stream<T, unknown>): Operation<void> {
  for (const value of yield* each(stream)) {
    console.log("Value:", value);
    yield* each.next();
  }
}

// Works with a Channel
const channel = createChannel<string, void>();
yield * spawn(() => logAll(channel));

// Works with a Signal
const signal = createSignal<number, void>();
yield * spawn(() => logAll(signal));

// Works with interval
yield * spawn(() => logAll(interval(1000)));

Creating Custom Streams

You can create your own Stream by returning an Operation<Subscription<T, TClose>>. The easiest way is with a Signal inside a resource:

import type { Stream, Subscription, Signal } from "effection";
import { resource, createSignal, ensure } from "effection";
import { EventEmitter } from "events";

// A custom Stream factory for EventEmitter events
function eventsFrom<T>(
  emitter: EventEmitter,
  eventName: string,
): Stream<T, void> {
  return resource<Subscription<T, void>>(function* (provide) {
    const signal: Signal<T, void> = createSignal<T, void>();

    const handler = (value: T) => signal.send(value);
    emitter.on(eventName, handler);

    yield* ensure(() => {
      emitter.off(eventName, handler);
      signal.close();
    });

    const subscription: Subscription<T, void> = yield* signal;
    yield* provide(subscription);
  });
}

The Stream Type Hierarchy

Here's how it all fits together:

Stream<T, TClose> Channel<T, TC> createChannel() Signal<T, TC> createSignal() Other producers interval(ms) on(target, event) Your custom streams!

Key Takeaways

  1. Stream is a type alias - Operation<Subscription<T, TClose>>
  2. Channels and Signals are both Streams - that's why consuming them looks identical
  3. Each yield* creates a fresh Subscription - independent queues
  4. Use Stream as a parameter type - to write functions that accept any stream
  5. Create custom Streams - with Signals inside resources
  • PreviousSignals
  • NextContext