Channels
So far our operations have been one-way streets: run, return a value, done. But real applications need ongoing conversations—streams of data flowing between concurrent operations.
- Messages between workers
- Events in an internal event bus
- Data flowing through a pipeline
Think of a Channel like a hallway intercom system. Operations can broadcast messages, and any operation that's listening gets a copy. Effection's Channels make this communication structured and safe.
What's a Channel?
A Channel is a pub/sub system for Effection operations. One operation sends messages, others receive them.
import type { Operation, Channel, Subscription } from "effection";
import { main, createChannel, spawn, sleep } from "effection";
await main(function* () {
// Create a channel that sends strings
const channel: Channel<string, void> = createChannel<string, void>();
// Subscribe to the channel
// When you yield* a channel, you get a Subscription - your personal message queue
const subscription: Subscription<string, void> = yield* channel;
// Send some messages in the background
yield* spawn(function* (): Operation<void> {
yield* channel.send("hello");
yield* sleep(100);
yield* channel.send("world");
yield* sleep(100);
yield* channel.close(); // Close the channel
});
// Receive messages
let result = yield* subscription.next();
while (!result.done) {
console.log("Received:", result.value);
result = yield* subscription.next();
}
console.log("Channel closed");
});
Output:
Received: hello
Received: world
Channel closed
Important: Subscribe Before Sending
Channels are not buffered. If no one is subscribed, messages are dropped:
import type { Channel, Subscription } from "effection";
import { main, createChannel } from "effection";
await main(function* () {
const channel: Channel<string, void> = createChannel<string, void>();
// Send before subscribing - message is LOST!
yield* channel.send("this is lost");
// Now subscribe
const subscription: Subscription<string, void> = yield* channel;
yield* channel.send("this is received");
const result = yield* subscription.next();
console.log(result.value); // 'this is received'
});
The for yield* each Pattern
Manually calling next() is tedious. Use each for cleaner iteration:
import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";
await main(function* () {
const channel: Channel<number, void> = createChannel<number, void>();
// Producer
yield* spawn(function* (): Operation<void> {
for (let i = 1; i <= 5; i++) {
yield* channel.send(i);
yield* sleep(100);
}
yield* channel.close();
});
// Consumer with each()
for (const value of yield* each(channel)) {
console.log("Got:", value);
yield* each.next(); // REQUIRED!
}
console.log("Done");
});
Output:
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Done
Important: You MUST call yield* each.next() at the end of each loop iteration!
Why yield* each.next()?
This might seem strange. The reason is that it allows you to do async work between receiving a value and requesting the next one:
import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";
await main(function* () {
const channel: Channel<string, void> = createChannel<string, void>();
yield* spawn(function* (): Operation<void> {
yield* channel.send("task-1");
yield* channel.send("task-2");
yield* channel.send("task-3");
yield* channel.close();
});
for (const task of yield* each(channel)) {
console.log("Processing:", task);
yield* sleep(500); // Simulate slow processing
console.log("Finished:", task);
yield* each.next(); // Now request next item
}
});
This gives you backpressure control - you only request the next item when you're ready.
Multiple Subscribers
Channels support multiple subscribers - each gets their own copy of every message:
import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";
await main(function* () {
const channel: Channel<string, void> = createChannel<string, void>();
// Two subscribers
yield* spawn(function* (): Operation<void> {
console.log("Subscriber A starting");
for (const msg of yield* each(channel)) {
console.log("A received:", msg);
yield* each.next();
}
console.log("Subscriber A done");
});
yield* spawn(function* (): Operation<void> {
console.log("Subscriber B starting");
for (const msg of yield* each(channel)) {
console.log("B received:", msg);
yield* each.next();
}
console.log("Subscriber B done");
});
// Give subscribers time to start
yield* sleep(10);
// Send messages
yield* channel.send("hello");
yield* channel.send("world");
yield* channel.close();
yield* sleep(100);
});
Output:
Subscriber A starting
Subscriber B starting
A received: hello
B received: hello
A received: world
B received: world
Subscriber A done
Subscriber B done
Each subscriber has their own queue and receives all messages independently.
Practical Example: Event Bus
Use a channel as an internal event bus:
import type { Operation, Channel } from "effection";
import { main, createChannel, spawn, sleep, each } from "effection";
interface AppEvent {
type: string;
payload: unknown;
}
// Create a global event bus
const eventBus: Channel<AppEvent, void> = createChannel<AppEvent, void>();
// Logger that prints all events
function* eventLogger(): Operation<void> {
for (const event of yield* each(eventBus)) {
console.log(`[LOG] ${event.type}:`, event.payload);
yield* each.next();
}
}
// Analytics that counts events
function* analytics(): Operation<void> {
const counts: Record<string, number> = {};
for (const event of yield* each(eventBus)) {
counts[event.type] = (counts[event.type] || 0) + 1;
console.log(`[ANALYTICS] Event counts:`, counts);
yield* each.next();
}
}
await main(function* () {
// Start consumers
yield* spawn(eventLogger);
yield* spawn(analytics);
yield* sleep(10);
// Emit some events
yield* eventBus.send({ type: "user.login", payload: { userId: 1 } });
yield* eventBus.send({ type: "page.view", payload: { page: "/home" } });
yield* eventBus.send({ type: "user.login", payload: { userId: 2 } });
yield* sleep(100);
});
Output:
[LOG] user.login: { userId: 1 }
[ANALYTICS] Event counts: { 'user.login': 1 }
[LOG] page.view: { page: '/home' }
[ANALYTICS] Event counts: { 'user.login': 1, 'page.view': 1 }
[LOG] user.login: { userId: 2 }
[ANALYTICS] Event counts: { 'user.login': 2, 'page.view': 1 }
But Wait... What About Callbacks?
There's a limitation we haven't addressed:
// This doesn't work!
await main(function* () {
const channel = createChannel<MouseEvent, void>();
document.addEventListener("click", (event) => {
yield * channel.send(event); // SyntaxError! Can't yield* in a callback
});
});
channel.send() is an operation - you can only call it with yield*. But callbacks are plain JavaScript functions!
This is a fundamental problem:
- Channels work great when both producer and consumer are Effection operations
- But external events (DOM clicks, Node.js EventEmitters, timers) come from callbacks
The next chapter introduces Signals - which solve this problem by providing a synchronous send() function that can be called from anywhere.
Key Takeaways
Channels are the hallway intercom for your operations:
- Channels are internal pub/sub - structured communication between Effection operations
- Subscribe before sending - if nobody's listening, messages vanish into the void
- Use
for yield* each- cleaner than manualnext()calls - Always call
yield* each.next()- explicitly request the next message - Multiple subscribers - everyone on the intercom hears every message