The Streams API provides a web-standard interface for reading, writing, and transforming streaming data. It enables efficient processing of large datasets and real-time data without loading everything into memory.
Overview
The Streams API in Andromeda allows you to:
- Create and consume readable streams
- Write data to writable streams
- Transform data with transform streams
- Handle backpressure and flow control
- Process data in chunks for memory efficiency
Stream Types
ReadableStream
A source of data that can be read in chunks.
WritableStream
A destination for data that can be written in chunks.
TransformStream
Transforms data as it flows from a readable stream to a writable stream.
Basic Usage
Creating a ReadableStream
// Create a simple readable stream
const readableStream = new ReadableStream({
start(controller) {
// Called immediately when the stream is created
console.log("Stream started");
},
pull(controller) {
// Called when the stream needs more data
const chunk = generateData();
if (chunk) {
controller.enqueue(chunk);
} else {
controller.close();
}
},
cancel(reason) {
// Called when the stream is cancelled
console.log("Stream cancelled:", reason);
},
});
Reading from a Stream
const reader = readableStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log("Stream finished");
break;
}
console.log("Received chunk:", value);
}
} finally {
reader.releaseLock();
}
Creating a WritableStream
const writableStream = new WritableStream({
start(controller) {
console.log("Writable stream started");
},
write(chunk, controller) {
console.log("Writing chunk:", chunk);
// Process the chunk (save to file, send over network, etc.)
return processChunk(chunk);
},
close() {
console.log("Stream closed");
},
abort(reason) {
console.log("Stream aborted:", reason);
},
});
Writing to a Stream
const writer = writableStream.getWriter();
try {
await writer.write("Hello, ");
await writer.write("World!");
await writer.close();
} finally {
writer.releaseLock();
}
API Reference
ReadableStream Class
ReadableStream Constructor
new ReadableStream(underlyingSource?, strategy?)
Parameters:
underlyingSource
(object, optional): Source configurationstrategy
(object, optional): Queuing strategy
Underlying Source:
start(controller)
: Called when stream is createdpull(controller)
: Called when more data is neededcancel(reason)
: Called when stream is cancelled
ReadableStream Methods
getReader(options?)
Gets a reader for the stream.
Parameters:
options
(object, optional): Reader optionsmode
: "byob" for bring-your-own-buffer mode
Returns:
ReadableStreamDefaultReader | ReadableStreamBYOBReader
const reader = stream.getReader();
pipeThrough(transform, options?)
Pipes the stream through a transform stream.
Parameters:
transform
(object): Transform stream withreadable
andwritable
propertiesoptions
(object, optional): Pipe options
Returns:
ReadableStream
: The readable side of the transform
const transformedStream = sourceStream.pipeThrough(transformStream);
pipeTo(destination, options?)
Pipes the stream to a writable stream.
Parameters:
destination
(WritableStream): Target writable streamoptions
(object, optional): Pipe optionspreventClose
: Don't close destination when source endspreventAbort
: Don't abort destination on errorpreventCancel
: Don't cancel source on error
Returns:
Promise<void>
: Resolves when piping completes
await sourceStream.pipeTo(destinationStream);
tee()
Creates two identical copies of the stream.
Returns:
[ReadableStream, ReadableStream]
: Array of two identical streams
const [stream1, stream2] = originalStream.tee();
WritableStream Class
WritableStream Constructor
new WritableStream(underlyingSink?, strategy?)
Parameters:
underlyingSink
(object, optional): Sink configurationstrategy
(object, optional): Queuing strategy
Underlying Sink:
start(controller)
: Called when stream is createdwrite(chunk, controller)
: Called for each chunkclose()
: Called when stream is closingabort(reason)
: Called when stream is aborted
WritableStream Methods
getWriter()
Gets a writer for the stream.
Returns:
WritableStreamDefaultWriter
const writer = stream.getWriter();
TransformStream Class
TransformStream Constructor
new TransformStream(transformer?, writableStrategy?, readableStrategy?)
Parameters:
transformer
(object, optional): Transform configurationwritableStrategy
(object, optional): Writable side queuing strategyreadableStrategy
(object, optional): Readable side queuing strategy
Transformer:
start(controller)
: Called when stream is createdtransform(chunk, controller)
: Called for each chunkflush(controller)
: Called when stream is closing
Properties
readable
The readable side of the transform stream.
Type: ReadableStream
writable
The writable side of the transform stream.
Type: WritableStream
const { readable, writable } = new TransformStream();
Examples
Data Generator Stream
function createNumberStream(
start: number,
end: number,
): ReadableStream<number> {
let current = start;
return new ReadableStream({
pull(controller) {
if (current <= end) {
controller.enqueue(current);
current++;
} else {
controller.close();
}
},
});
}
// Usage
const numberStream = createNumberStream(1, 10);
const reader = numberStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log("Number:", value);
}
File Processing Stream
async function createFileStream(
filename: string,
): Promise<ReadableStream<Uint8Array>> {
const file = await Andromeda.open(filename, { read: true });
const chunkSize = 64 * 1024; // 64KB chunks
return new ReadableStream({
async pull(controller) {
try {
const buffer = new Uint8Array(chunkSize);
const bytesRead = await file.read(buffer);
if (bytesRead === 0) {
// End of file
await file.close();
controller.close();
} else {
controller.enqueue(buffer.slice(0, bytesRead));
}
} catch (error) {
controller.error(error);
}
},
async cancel() {
await file.close();
},
});
}
// Usage
const fileStream = await createFileStream("large-file.txt");
await fileStream.pipeTo(
new WritableStream({
write(chunk) {
console.log(`Read ${chunk.length} bytes`);
},
}),
);
Data Transformation Pipeline
// Transform stream that converts text to uppercase
class UpperCaseTransform extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
if (typeof chunk === "string") {
controller.enqueue(chunk.toUpperCase());
} else {
controller.enqueue(chunk);
}
},
});
}
}
// Transform stream that adds line numbers
class LineNumberTransform extends TransformStream {
private lineNumber = 1;
constructor() {
super({
transform(chunk, controller) {
if (typeof chunk === "string") {
const lines = chunk.split("\n");
const numberedLines = lines.map((line) =>
line ? `${this.lineNumber++}: ${line}` : line
);
controller.enqueue(numberedLines.join("\n"));
} else {
controller.enqueue(chunk);
}
},
});
}
}
// Create processing pipeline
async function processText(input: string): Promise<string> {
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue(input);
controller.close();
},
});
let result = "";
const outputStream = new WritableStream({
write(chunk) {
result += chunk;
},
});
await inputStream
.pipeThrough(new UpperCaseTransform())
.pipeThrough(new LineNumberTransform())
.pipeTo(outputStream);
return result;
}
// Usage
const processed = await processText("hello\nworld\nhow are you?");
console.log("Processed text:", processed);
HTTP Response Streaming
async function streamResponse(url: string): Promise<void> {
const response = await fetch(url);
if (!response.body) {
throw new Error("No response body");
}
// Create a transform stream to process chunks
const processor = new TransformStream({
transform(chunk, controller) {
// Process each chunk (e.g., parse JSON lines)
const text = new TextDecoder().decode(chunk);
const lines = text.split("\n");
for (const line of lines) {
if (line.trim()) {
try {
const data = JSON.parse(line);
controller.enqueue(data);
} catch (error) {
console.warn("Invalid JSON line:", line);
}
}
}
},
});
// Create output stream
const output = new WritableStream({
write(data) {
console.log("Received data:", data);
},
});
// Process the response stream
await response.body
.pipeThrough(processor)
.pipeTo(output);
}
Compression Stream
class SimpleCompressionTransform extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
// Simple compression: replace repeated characters
if (typeof chunk === "string") {
const compressed = chunk.replace(/(.)\1+/g, (match, char) => {
return `${char}${match.length}`;
});
controller.enqueue(compressed);
} else {
controller.enqueue(chunk);
}
},
});
}
}
class SimpleDecompressionTransform extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
if (typeof chunk === "string") {
const decompressed = chunk.replace(/(.)\d+/g, (match, char) => {
const count = parseInt(match.slice(1));
return char.repeat(count);
});
controller.enqueue(decompressed);
} else {
controller.enqueue(chunk);
}
},
});
}
}
// Usage
const originalText = "aaabbbccccdddd";
let compressedText = "";
let decompressedText = "";
// Compression pipeline
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue(originalText);
controller.close();
},
});
await inputStream
.pipeThrough(new SimpleCompressionTransform())
.pipeTo(
new WritableStream({
write(chunk) {
compressedText += chunk;
},
}),
);
console.log("Original:", originalText);
console.log("Compressed:", compressedText);
// Decompression pipeline
const compressedStream = new ReadableStream({
start(controller) {
controller.enqueue(compressedText);
controller.close();
},
});
await compressedStream
.pipeThrough(new SimpleDecompressionTransform())
.pipeTo(
new WritableStream({
write(chunk) {
decompressedText += chunk;
},
}),
);
console.log("Decompressed:", decompressedText);
Real-time Data Processing
class DataAnalyzer {
private count = 0;
private sum = 0;
private min = Infinity;
private max = -Infinity;
createAnalysisStream(): TransformStream<number, any> {
return new TransformStream({
transform: (value, controller) => {
this.count++;
this.sum += value;
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
const stats = {
count: this.count,
sum: this.sum,
average: this.sum / this.count,
min: this.min,
max: this.max,
current: value,
};
controller.enqueue(stats);
},
});
}
}
// Simulate real-time data
function createRandomDataStream(): ReadableStream<number> {
return new ReadableStream({
start(controller) {
const interval = setInterval(() => {
const value = Math.random() * 100;
controller.enqueue(value);
}, 100);
// Stop after 5 seconds
setTimeout(() => {
clearInterval(interval);
controller.close();
}, 5000);
},
});
}
// Process real-time data
const analyzer = new DataAnalyzer();
const dataStream = createRandomDataStream();
await dataStream
.pipeThrough(analyzer.createAnalysisStream())
.pipeTo(
new WritableStream({
write(stats) {
console.log(
`Stats: avg=${stats.average.toFixed(2)}, min=${
stats.min.toFixed(2)
}, max=${stats.max.toFixed(2)}, count=${stats.count}`,
);
},
}),
);
Stream Multiplexing
function createMultiplexer<T>(): {
input: WritableStream<T>;
outputs: ReadableStream<T>[];
} {
const outputs: ReadableStreamDefaultController<T>[] = [];
const input = new WritableStream({
write(chunk) {
// Send chunk to all outputs
outputs.forEach((controller) => {
try {
controller.enqueue(chunk);
} catch (error) {
console.warn("Failed to enqueue to output:", error);
}
});
},
close() {
outputs.forEach((controller) => {
try {
controller.close();
} catch (error) {
console.warn("Failed to close output:", error);
}
});
},
});
function createOutput(): ReadableStream<T> {
return new ReadableStream({
start(controller) {
outputs.push(controller);
},
cancel() {
const index = outputs.indexOf(controller);
if (index >= 0) {
outputs.splice(index, 1);
}
},
});
}
return {
input,
outputs: [createOutput(), createOutput(), createOutput()],
};
}
// Usage
const multiplexer = createMultiplexer<string>();
// Set up multiple consumers
multiplexer.outputs[0].pipeTo(
new WritableStream({
write(chunk) {
console.log("Output 1:", chunk);
},
}),
);
multiplexer.outputs[1].pipeTo(
new WritableStream({
write(chunk) {
console.log("Output 2:", chunk.toUpperCase());
},
}),
);
multiplexer.outputs[2].pipeTo(
new WritableStream({
write(chunk) {
console.log("Output 3 length:", chunk.length);
},
}),
);
// Send data to all outputs
const writer = multiplexer.input.getWriter();
await writer.write("Hello");
await writer.write("World");
await writer.close();
Best Practices
Error Handling
async function robustStreamProcessing(stream: ReadableStream) {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
try {
await processChunk(value);
} catch (chunkError) {
console.error("Chunk processing failed:", chunkError);
// Continue with next chunk
}
}
} catch (streamError) {
console.error("Stream error:", streamError);
throw streamError;
} finally {
reader.releaseLock();
}
}
Resource Management
class ManagedStream {
private resources: Set<any> = new Set();
createStream<T>(): ReadableStream<T> {
return new ReadableStream({
start: (controller) => {
// Track resources
},
cancel: () => {
this.cleanup();
},
});
}
private cleanup() {
for (const resource of this.resources) {
try {
if (resource.close) {
resource.close();
}
} catch (error) {
console.warn("Resource cleanup failed:", error);
}
}
this.resources.clear();
}
}
Backpressure Handling
function createControlledStream<T>(): ReadableStream<T> {
const queue: T[] = [];
let controller: ReadableStreamDefaultController<T>;
return new ReadableStream({
start(c) {
controller = c;
},
pull() {
// Only add more data when requested
if (queue.length > 0) {
controller.enqueue(queue.shift()!);
}
},
});
}
Notes
- Streams are designed for efficient memory usage with large datasets
- Always properly release stream locks when done
- Use backpressure mechanisms to prevent memory overflow
- Transform streams can be chained for complex processing pipelines
- Error handling in streams should be implemented at multiple levels
See Also
- File API - File objects with stream support
- Fetch API - HTTP responses as streams
- File System API - File streaming operations