|
Pipelines connect streams together, automatically handling backpressure and cleanup. The pipeline() function is the recommended way to pipe streams in Node.js.
|
|
|
Import pipeline and stream utilities
|
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip, createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';
|
|
Basic Pipeline
pipeline() connects a source stream through optional transforms to a destination. It returns a promise.
|
async function basicPipeline() {
await pipeline(
createReadStream('./input.txt'),
createWriteStream('./output.txt')
);
console.log('File copied successfully');
}
|
|
Pipeline with Compression
You can include transform streams in the pipeline, like gzip compression.
|
async function compressFile() {
await pipeline(
createReadStream('./data.txt'),
createGzip(),
createWriteStream('./data.txt.gz')
);
console.log('File compressed');
}
async function decompressFile() {
await pipeline(
createReadStream('./data.txt.gz'),
createGunzip(),
createWriteStream('./data-restored.txt')
);
console.log('File decompressed');
}
|
|
Custom Transform Streams
Transform streams process data as it flows through the pipeline.
|
const uppercase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
async function transformPipeline() {
await pipeline(
createReadStream('./input.txt'),
uppercase,
createWriteStream('./uppercase.txt')
);
console.log('File transformed to uppercase');
}
|
|
Multiple Transforms
Chain multiple transforms together for complex processing.
|
const addLineNumbers = new Transform({
transform(chunk, encoding, callback) {
this.lineNumber = (this.lineNumber || 0);
const lines = chunk.toString().split('\n');
const numbered = lines
.map(line => line ? `${++this.lineNumber}: ${line}` : '')
.join('\n');
this.push(numbered);
callback();
}
});
const addTimestamp = new Transform({
transform(chunk, encoding, callback) {
const timestamp = new Date().toISOString();
this.push(`[${timestamp}]\n${chunk}`);
callback();
}
});
async function multiTransformPipeline() {
await pipeline(
createReadStream('./log.txt'),
addLineNumbers,
addTimestamp,
createWriteStream('./processed-log.txt')
);
}
|
|
Async Generator in Pipeline
You can use async generators as sources or transforms in pipelines.
|
async function* generateLines() {
for (let i = 1; i <= 100; i++) {
yield `Line ${i}\n`;
}
}
async function generatorPipeline() {
await pipeline(
generateLines,
createWriteStream('./generated.txt')
);
console.log('Generated file from async iterator');
}
|
|
Async Transform with Generator
Transform data using an async generator function.
|
async function* doubleNumbers(source) {
for await (const chunk of source) {
const numbers = chunk.toString().trim().split('\n');
for (const num of numbers) {
const doubled = parseInt(num) * 2;
yield `${doubled}\n`;
}
}
}
async function asyncTransformPipeline() {
await pipeline(
createReadStream('./numbers.txt'),
doubleNumbers,
createWriteStream('./doubled.txt')
);
}
|
|
Error Handling
pipeline() automatically handles errors and cleans up all streams.
|
async function errorHandlingPipeline() {
try {
await pipeline(
createReadStream('./nonexistent.txt'),
createWriteStream('./output.txt')
);
} catch (err) {
console.error('Pipeline failed:', err.message);
}
}
|
|
AbortController for Cancellation
Use AbortController to cancel a pipeline in progress.
|
async function cancellablePipeline() {
const controller = new AbortController();
setTimeout(() => controller.abort(), 1000);
try {
await pipeline(
createReadStream('./large-file.txt'),
createWriteStream('./output.txt'),
{ signal: controller.signal }
);
} catch (err) {
if (err.name === 'AbortError') {
console.log('Pipeline was cancelled');
} else {
throw err;
}
}
}
|
|
Practical Example: Log Processor
A complete example that reads logs, filters errors, and writes to a new file.
|
function createErrorFilter() {
return new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const errors = lines
.filter(line => line.includes('ERROR'))
.join('\n');
if (errors) {
this.push(errors + '\n');
}
callback();
}
});
}
async function processLogs() {
await pipeline(
createReadStream('./application.log'),
createErrorFilter(),
createGzip(),
createWriteStream('./errors.log.gz')
);
console.log('Error logs extracted and compressed');
}
|
|
Pipeline with Standard I/O
You can use process.stdin and process.stdout in pipelines.
Run examples
|
async function stdioTransform() {
const reverse = new Transform({
transform(chunk, encoding, callback) {
const reversed = chunk.toString().split('').reverse().join('');
this.push(reversed);
callback();
}
});
}
basicPipeline().catch(console.error);
generatorPipeline().catch(console.error);
|