NodeJS by Example: Pipelines

Pipelines connect streams together, automatically handling backpressure and cleanup. The pipeline() function is the recommended way to pipe streams in Node.js.

Import pipeline and stream utilities

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip, createGunzip } from 'node:zlib';
import { Transform } from 'node:stream';

Basic Pipeline pipeline() connects a source stream through optional transforms to a destination. It returns a promise.

async function basicPipeline() {
  await pipeline(
    createReadStream('./input.txt'),
    createWriteStream('./output.txt')
  );
  console.log('File copied successfully');
}

Pipeline with Compression You can include transform streams in the pipeline, like gzip compression.

async function compressFile() {
  await pipeline(
    createReadStream('./data.txt'),
    createGzip(),
    createWriteStream('./data.txt.gz')
  );
  console.log('File compressed');
}

async function decompressFile() {
  await pipeline(
    createReadStream('./data.txt.gz'),
    createGunzip(),
    createWriteStream('./data-restored.txt')
  );
  console.log('File decompressed');
}

Custom Transform Streams Transform streams process data as it flows through the pipeline.

const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

async function transformPipeline() {
  await pipeline(
    createReadStream('./input.txt'),
    uppercase,
    createWriteStream('./uppercase.txt')
  );
  console.log('File transformed to uppercase');
}

Multiple Transforms Chain multiple transforms together for complex processing.

const addLineNumbers = new Transform({
  transform(chunk, encoding, callback) {
    this.lineNumber = (this.lineNumber || 0);
    const lines = chunk.toString().split('\n');
    const numbered = lines
      .map(line => line ? `${++this.lineNumber}: ${line}` : '')
      .join('\n');
    this.push(numbered);
    callback();
  }
});

const addTimestamp = new Transform({
  transform(chunk, encoding, callback) {
    const timestamp = new Date().toISOString();
    this.push(`[${timestamp}]\n${chunk}`);
    callback();
  }
});

async function multiTransformPipeline() {
  await pipeline(
    createReadStream('./log.txt'),
    addLineNumbers,
    addTimestamp,
    createWriteStream('./processed-log.txt')
  );
}

Async Generator in Pipeline You can use async generators as sources or transforms in pipelines.

async function* generateLines() {
  for (let i = 1; i <= 100; i++) {
    yield `Line ${i}\n`;
  }
}

async function generatorPipeline() {
  await pipeline(
    generateLines,
    createWriteStream('./generated.txt')
  );
  console.log('Generated file from async iterator');
}

Async Transform with Generator Transform data using an async generator function.

async function* doubleNumbers(source) {
  for await (const chunk of source) {
    const numbers = chunk.toString().trim().split('\n');
    for (const num of numbers) {
      const doubled = parseInt(num) * 2;
      yield `${doubled}\n`;
    }
  }
}

async function asyncTransformPipeline() {
  await pipeline(
    createReadStream('./numbers.txt'),
    doubleNumbers,
    createWriteStream('./doubled.txt')
  );
}

Error Handling pipeline() automatically handles errors and cleans up all streams.

async function errorHandlingPipeline() {
  try {
    await pipeline(
      createReadStream('./nonexistent.txt'),
      createWriteStream('./output.txt')
    );
  } catch (err) {
    console.error('Pipeline failed:', err.message);
    // All streams are automatically destroyed
  }
}

AbortController for Cancellation Use AbortController to cancel a pipeline in progress.

async function cancellablePipeline() {
  const controller = new AbortController();
  
  // Cancel after 1 second
  setTimeout(() => controller.abort(), 1000);
  
  try {
    await pipeline(
      createReadStream('./large-file.txt'),
      createWriteStream('./output.txt'),
      { signal: controller.signal }
    );
  } catch (err) {
    if (err.name === 'AbortError') {
      console.log('Pipeline was cancelled');
    } else {
      throw err;
    }
  }
}

Practical Example: Log Processor A complete example that reads logs, filters errors, and writes to a new file.

function createErrorFilter() {
  return new Transform({
    transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n');
      const errors = lines
        .filter(line => line.includes('ERROR'))
        .join('\n');
      if (errors) {
        this.push(errors + '\n');
      }
      callback();
    }
  });
}

async function processLogs() {
  await pipeline(
    createReadStream('./application.log'),
    createErrorFilter(),
    createGzip(),
    createWriteStream('./errors.log.gz')
  );
  console.log('Error logs extracted and compressed');
}

Pipeline with Standard I/O You can use process.stdin and process.stdout in pipelines. Run examples

async function stdioTransform() {
  const reverse = new Transform({
    transform(chunk, encoding, callback) {
      const reversed = chunk.toString().split('').reverse().join('');
      this.push(reversed);
      callback();
    }
  });
  
  // This reads from stdin, reverses, and writes to stdout
  // Uncomment to use: await pipeline(process.stdin, reverse, process.stdout);
}

basicPipeline().catch(console.error);
generatorPipeline().catch(console.error);

Create test files

$ echo "Hello World" > input.txt
$ echo "some test data" > data.txt

Run the pipeline examples

$ node pipelines.js
# File copied successfully
# Generated file from async iterator

Verify the output

$ cat output.txt
# Hello World

$ cat generated.txt
# Line 1
# Line 2
# ...