You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					73 lines
				
				1.5 KiB
			
		
		
			
		
	
	
					73 lines
				
				1.5 KiB
			| 
								 
											4 years ago
										 
									 | 
							
								'use strict';
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								var RingBuffer = require('./ring_buffer');
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								var Functor = function(session, method) {
							 | 
						||
| 
								 | 
							
								  this._session = session;
							 | 
						||
| 
								 | 
							
								  this._method  = method;
							 | 
						||
| 
								 | 
							
								  this._queue   = new RingBuffer(Functor.QUEUE_SIZE);
							 | 
						||
| 
								 | 
							
								  this._stopped = false;
							 | 
						||
| 
								 | 
							
								  this.pending  = 0;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Functor.QUEUE_SIZE = 8;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Functor.prototype.call = function(error, message, callback, context) {
							 | 
						||
| 
								 | 
							
								  if (this._stopped) return;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  var record = { error: error, message: message, callback: callback, context: context, done: false },
							 | 
						||
| 
								 | 
							
								      called = false,
							 | 
						||
| 
								 | 
							
								      self   = this;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  this._queue.push(record);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (record.error) {
							 | 
						||
| 
								 | 
							
								    record.done = true;
							 | 
						||
| 
								 | 
							
								    this._stop();
							 | 
						||
| 
								 | 
							
								    return this._flushQueue();
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  var handler = function(err, msg) {
							 | 
						||
| 
								 | 
							
								    if (!(called ^ (called = true))) return;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if (err) {
							 | 
						||
| 
								 | 
							
								      self._stop();
							 | 
						||
| 
								 | 
							
								      record.error   = err;
							 | 
						||
| 
								 | 
							
								      record.message = null;
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      record.message = msg;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    record.done = true;
							 | 
						||
| 
								 | 
							
								    self._flushQueue();
							 | 
						||
| 
								 | 
							
								  };
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  try {
							 | 
						||
| 
								 | 
							
								    this._session[this._method](message, handler);
							 | 
						||
| 
								 | 
							
								  } catch (err) {
							 | 
						||
| 
								 | 
							
								    handler(err);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Functor.prototype._stop = function() {
							 | 
						||
| 
								 | 
							
								  this.pending  = this._queue.length;
							 | 
						||
| 
								 | 
							
								  this._stopped = true;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Functor.prototype._flushQueue = function() {
							 | 
						||
| 
								 | 
							
								  var queue = this._queue, record;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  while (queue.length > 0 && queue.peek().done) {
							 | 
						||
| 
								 | 
							
								    record = queue.shift();
							 | 
						||
| 
								 | 
							
								    if (record.error) {
							 | 
						||
| 
								 | 
							
								      this.pending = 0;
							 | 
						||
| 
								 | 
							
								      queue.clear();
							 | 
						||
| 
								 | 
							
								    } else {
							 | 
						||
| 
								 | 
							
								      this.pending -= 1;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    record.callback.call(record.context, record.error, record.message);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								module.exports = Functor;
							 |