You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							128 lines
						
					
					
						
							3.3 KiB
						
					
					
				
			
		
		
	
	
							128 lines
						
					
					
						
							3.3 KiB
						
					
					
				const Minipass = require('minipass') | 
						|
const EE = require('events') | 
						|
const isStream = s => s && s instanceof EE && ( | 
						|
  typeof s.pipe === 'function' || // readable | 
						|
  (typeof s.write === 'function' && typeof s.end === 'function') // writable | 
						|
) | 
						|
 | 
						|
const _head = Symbol('_head') | 
						|
const _tail = Symbol('_tail') | 
						|
const _linkStreams = Symbol('_linkStreams') | 
						|
const _setHead = Symbol('_setHead') | 
						|
const _setTail = Symbol('_setTail') | 
						|
const _onError = Symbol('_onError') | 
						|
const _onData = Symbol('_onData') | 
						|
const _onEnd = Symbol('_onEnd') | 
						|
const _onDrain = Symbol('_onDrain') | 
						|
const _streams = Symbol('_streams') | 
						|
class Pipeline extends Minipass { | 
						|
  constructor (opts, ...streams) { | 
						|
    if (isStream(opts)) { | 
						|
      streams.unshift(opts) | 
						|
      opts = {} | 
						|
    } | 
						|
 | 
						|
    super(opts) | 
						|
    this[_streams] = [] | 
						|
    if (streams.length) | 
						|
      this.push(...streams) | 
						|
  } | 
						|
 | 
						|
  [_linkStreams] (streams) { | 
						|
    // reduce takes (left,right), and we return right to make it the | 
						|
    // new left value. | 
						|
    return streams.reduce((src, dest) => { | 
						|
      src.on('error', er => dest.emit('error', er)) | 
						|
      src.pipe(dest) | 
						|
      return dest | 
						|
    }) | 
						|
  } | 
						|
 | 
						|
  push (...streams) { | 
						|
    this[_streams].push(...streams) | 
						|
    if (this[_tail]) | 
						|
      streams.unshift(this[_tail]) | 
						|
 | 
						|
    const linkRet = this[_linkStreams](streams) | 
						|
 | 
						|
    this[_setTail](linkRet) | 
						|
    if (!this[_head]) | 
						|
      this[_setHead](streams[0]) | 
						|
  } | 
						|
 | 
						|
  unshift (...streams) { | 
						|
    this[_streams].unshift(...streams) | 
						|
    if (this[_head]) | 
						|
      streams.push(this[_head]) | 
						|
 | 
						|
    const linkRet = this[_linkStreams](streams) | 
						|
    this[_setHead](streams[0]) | 
						|
    if (!this[_tail]) | 
						|
      this[_setTail](linkRet) | 
						|
  } | 
						|
 | 
						|
  destroy (er) { | 
						|
    // set fire to the whole thing. | 
						|
    this[_streams].forEach(s => | 
						|
      typeof s.destroy === 'function' && s.destroy()) | 
						|
    return super.destroy(er) | 
						|
  } | 
						|
 | 
						|
  // readable interface -> tail | 
						|
  [_setTail] (stream) { | 
						|
    this[_tail] = stream | 
						|
    stream.on('error', er => this[_onError](stream, er)) | 
						|
    stream.on('data', chunk => this[_onData](stream, chunk)) | 
						|
    stream.on('end', () => this[_onEnd](stream)) | 
						|
    stream.on('finish', () => this[_onEnd](stream)) | 
						|
  } | 
						|
 | 
						|
  // errors proxied down the pipeline | 
						|
  // they're considered part of the "read" interface | 
						|
  [_onError] (stream, er) { | 
						|
    if (stream === this[_tail]) | 
						|
      this.emit('error', er) | 
						|
  } | 
						|
  [_onData] (stream, chunk) { | 
						|
    if (stream === this[_tail]) | 
						|
      super.write(chunk) | 
						|
  } | 
						|
  [_onEnd] (stream) { | 
						|
    if (stream === this[_tail]) | 
						|
      super.end() | 
						|
  } | 
						|
  pause () { | 
						|
    super.pause() | 
						|
    return this[_tail] && this[_tail].pause && this[_tail].pause() | 
						|
  } | 
						|
 | 
						|
  // NB: Minipass calls its internal private [RESUME] method during | 
						|
  // pipe drains, to avoid hazards where stream.resume() is overridden. | 
						|
  // Thus, we need to listen to the resume *event*, not override the | 
						|
  // resume() method, and proxy *that* to the tail. | 
						|
  emit (ev, ...args) { | 
						|
    if (ev === 'resume' && this[_tail] && this[_tail].resume) | 
						|
      this[_tail].resume() | 
						|
    return super.emit(ev, ...args) | 
						|
  } | 
						|
 | 
						|
  // writable interface -> head | 
						|
  [_setHead] (stream) { | 
						|
    this[_head] = stream | 
						|
    stream.on('drain', () => this[_onDrain](stream)) | 
						|
  } | 
						|
  [_onDrain] (stream) { | 
						|
    if (stream === this[_head]) | 
						|
      this.emit('drain') | 
						|
  } | 
						|
  write (chunk, enc, cb) { | 
						|
    return this[_head].write(chunk, enc, cb) && | 
						|
      (this.flowing || this.buffer.length === 0) | 
						|
  } | 
						|
  end (chunk, enc, cb) { | 
						|
    this[_head].end(chunk, enc, cb) | 
						|
    return this | 
						|
  } | 
						|
} | 
						|
 | 
						|
module.exports = Pipeline
 | 
						|
 |