You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							47 lines
						
					
					
						
							1.4 KiB
						
					
					
				
			
		
		
	
	
							47 lines
						
					
					
						
							1.4 KiB
						
					
					
				'use strict'; | 
						|
 | 
						|
var Cell   = require('./cell'), | 
						|
    Pledge = require('./pledge'); | 
						|
 | 
						|
var Pipeline = function(sessions) { | 
						|
  this._cells   = sessions.map(function(session) { return new Cell(session) }); | 
						|
  this._stopped = { incoming: false, outgoing: false }; | 
						|
}; | 
						|
 | 
						|
Pipeline.prototype.processIncomingMessage = function(message, callback, context) { | 
						|
  if (this._stopped.incoming) return; | 
						|
  this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context); | 
						|
}; | 
						|
 | 
						|
Pipeline.prototype.processOutgoingMessage = function(message, callback, context) { | 
						|
  if (this._stopped.outgoing) return; | 
						|
  this._loop('outgoing', 0, this._cells.length, 1, message, callback, context); | 
						|
}; | 
						|
 | 
						|
Pipeline.prototype.close = function(callback, context) { | 
						|
  this._stopped = { incoming: true, outgoing: true }; | 
						|
 | 
						|
  var closed = this._cells.map(function(a) { return a.close() }); | 
						|
  if (callback) | 
						|
    Pledge.all(closed).then(function() { callback.call(context) }); | 
						|
}; | 
						|
 | 
						|
Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) { | 
						|
  var cells = this._cells, | 
						|
      n     = cells.length, | 
						|
      self  = this; | 
						|
 | 
						|
  while (n--) cells[n].pending(direction); | 
						|
 | 
						|
  var pipe = function(index, error, msg) { | 
						|
    if (index === end) return callback.call(context, error, msg); | 
						|
 | 
						|
    cells[index][direction](error, msg, function(err, m) { | 
						|
      if (err) self._stopped[direction] = true; | 
						|
      pipe(index + step, err, m); | 
						|
    }); | 
						|
  }; | 
						|
  pipe(start, null, message); | 
						|
}; | 
						|
 | 
						|
module.exports = Pipeline;
 | 
						|
 |