You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							725 lines
						
					
					
						
							19 KiB
						
					
					
				
			
		
		
	
	
							725 lines
						
					
					
						
							19 KiB
						
					
					
				/** | 
						|
 * Support for concurrent task management and synchronization in web | 
						|
 * applications. | 
						|
 * | 
						|
 * @author Dave Longley | 
						|
 * @author David I. Lehn <dlehn@digitalbazaar.com> | 
						|
 * | 
						|
 * Copyright (c) 2009-2013 Digital Bazaar, Inc. | 
						|
 */ | 
						|
var forge = require('./forge'); | 
						|
require('./debug'); | 
						|
require('./log'); | 
						|
require('./util'); | 
						|
 | 
						|
// logging category | 
						|
var cat = 'forge.task'; | 
						|
 | 
						|
// verbose level | 
						|
// 0: off, 1: a little, 2: a whole lot | 
						|
// Verbose debug logging is surrounded by a level check to avoid the | 
						|
// performance issues with even calling the logging code regardless if it | 
						|
// is actually logged.  For performance reasons this should not be set to 2 | 
						|
// for production use. | 
						|
// ex: if(sVL >= 2) forge.log.verbose(....) | 
						|
var sVL = 0; | 
						|
 | 
						|
// track tasks for debugging | 
						|
var sTasks = {}; | 
						|
var sNextTaskId = 0; | 
						|
// debug access | 
						|
forge.debug.set(cat, 'tasks', sTasks); | 
						|
 | 
						|
// a map of task type to task queue | 
						|
var sTaskQueues = {}; | 
						|
// debug access | 
						|
forge.debug.set(cat, 'queues', sTaskQueues); | 
						|
 | 
						|
// name for unnamed tasks | 
						|
var sNoTaskName = '?'; | 
						|
 | 
						|
// maximum number of doNext() recursions before a context swap occurs | 
						|
// FIXME: might need to tweak this based on the browser | 
						|
var sMaxRecursions = 30; | 
						|
 | 
						|
// time slice for doing tasks before a context swap occurs | 
						|
// FIXME: might need to tweak this based on the browser | 
						|
var sTimeSlice = 20; | 
						|
 | 
						|
/** | 
						|
 * Task states. | 
						|
 * | 
						|
 * READY: ready to start processing | 
						|
 * RUNNING: task or a subtask is running | 
						|
 * BLOCKED: task is waiting to acquire N permits to continue | 
						|
 * SLEEPING: task is sleeping for a period of time | 
						|
 * DONE: task is done | 
						|
 * ERROR: task has an error | 
						|
 */ | 
						|
var READY = 'ready'; | 
						|
var RUNNING = 'running'; | 
						|
var BLOCKED = 'blocked'; | 
						|
var SLEEPING = 'sleeping'; | 
						|
var DONE = 'done'; | 
						|
var ERROR = 'error'; | 
						|
 | 
						|
/** | 
						|
 * Task actions.  Used to control state transitions. | 
						|
 * | 
						|
 * STOP: stop processing | 
						|
 * START: start processing tasks | 
						|
 * BLOCK: block task from continuing until 1 or more permits are released | 
						|
 * UNBLOCK: release one or more permits | 
						|
 * SLEEP: sleep for a period of time | 
						|
 * WAKEUP: wakeup early from SLEEPING state | 
						|
 * CANCEL: cancel further tasks | 
						|
 * FAIL: a failure occured | 
						|
 */ | 
						|
var STOP = 'stop'; | 
						|
var START = 'start'; | 
						|
var BLOCK = 'block'; | 
						|
var UNBLOCK = 'unblock'; | 
						|
var SLEEP = 'sleep'; | 
						|
var WAKEUP = 'wakeup'; | 
						|
var CANCEL = 'cancel'; | 
						|
var FAIL = 'fail'; | 
						|
 | 
						|
/** | 
						|
 * State transition table. | 
						|
 * | 
						|
 * nextState = sStateTable[currentState][action] | 
						|
 */ | 
						|
var sStateTable = {}; | 
						|
 | 
						|
sStateTable[READY] = {}; | 
						|
sStateTable[READY][STOP] = READY; | 
						|
sStateTable[READY][START] = RUNNING; | 
						|
sStateTable[READY][CANCEL] = DONE; | 
						|
sStateTable[READY][FAIL] = ERROR; | 
						|
 | 
						|
sStateTable[RUNNING] = {}; | 
						|
sStateTable[RUNNING][STOP] = READY; | 
						|
sStateTable[RUNNING][START] = RUNNING; | 
						|
sStateTable[RUNNING][BLOCK] = BLOCKED; | 
						|
sStateTable[RUNNING][UNBLOCK] = RUNNING; | 
						|
sStateTable[RUNNING][SLEEP] = SLEEPING; | 
						|
sStateTable[RUNNING][WAKEUP] = RUNNING; | 
						|
sStateTable[RUNNING][CANCEL] = DONE; | 
						|
sStateTable[RUNNING][FAIL] = ERROR; | 
						|
 | 
						|
sStateTable[BLOCKED] = {}; | 
						|
sStateTable[BLOCKED][STOP] = BLOCKED; | 
						|
sStateTable[BLOCKED][START] = BLOCKED; | 
						|
sStateTable[BLOCKED][BLOCK] = BLOCKED; | 
						|
sStateTable[BLOCKED][UNBLOCK] = BLOCKED; | 
						|
sStateTable[BLOCKED][SLEEP] = BLOCKED; | 
						|
sStateTable[BLOCKED][WAKEUP] = BLOCKED; | 
						|
sStateTable[BLOCKED][CANCEL] = DONE; | 
						|
sStateTable[BLOCKED][FAIL] = ERROR; | 
						|
 | 
						|
sStateTable[SLEEPING] = {}; | 
						|
sStateTable[SLEEPING][STOP] = SLEEPING; | 
						|
sStateTable[SLEEPING][START] = SLEEPING; | 
						|
sStateTable[SLEEPING][BLOCK] = SLEEPING; | 
						|
sStateTable[SLEEPING][UNBLOCK] = SLEEPING; | 
						|
sStateTable[SLEEPING][SLEEP] = SLEEPING; | 
						|
sStateTable[SLEEPING][WAKEUP] = SLEEPING; | 
						|
sStateTable[SLEEPING][CANCEL] = DONE; | 
						|
sStateTable[SLEEPING][FAIL] = ERROR; | 
						|
 | 
						|
sStateTable[DONE] = {}; | 
						|
sStateTable[DONE][STOP] = DONE; | 
						|
sStateTable[DONE][START] = DONE; | 
						|
sStateTable[DONE][BLOCK] = DONE; | 
						|
sStateTable[DONE][UNBLOCK] = DONE; | 
						|
sStateTable[DONE][SLEEP] = DONE; | 
						|
sStateTable[DONE][WAKEUP] = DONE; | 
						|
sStateTable[DONE][CANCEL] = DONE; | 
						|
sStateTable[DONE][FAIL] = ERROR; | 
						|
 | 
						|
sStateTable[ERROR] = {}; | 
						|
sStateTable[ERROR][STOP] = ERROR; | 
						|
sStateTable[ERROR][START] = ERROR; | 
						|
sStateTable[ERROR][BLOCK] = ERROR; | 
						|
sStateTable[ERROR][UNBLOCK] = ERROR; | 
						|
sStateTable[ERROR][SLEEP] = ERROR; | 
						|
sStateTable[ERROR][WAKEUP] = ERROR; | 
						|
sStateTable[ERROR][CANCEL] = ERROR; | 
						|
sStateTable[ERROR][FAIL] = ERROR; | 
						|
 | 
						|
/** | 
						|
 * Creates a new task. | 
						|
 * | 
						|
 * @param options options for this task | 
						|
 *   run: the run function for the task (required) | 
						|
 *   name: the run function for the task (optional) | 
						|
 *   parent: parent of this task (optional) | 
						|
 * | 
						|
 * @return the empty task. | 
						|
 */ | 
						|
var Task = function(options) { | 
						|
  // task id | 
						|
  this.id = -1; | 
						|
 | 
						|
  // task name | 
						|
  this.name = options.name || sNoTaskName; | 
						|
 | 
						|
  // task has no parent | 
						|
  this.parent = options.parent || null; | 
						|
 | 
						|
  // save run function | 
						|
  this.run = options.run; | 
						|
 | 
						|
  // create a queue of subtasks to run | 
						|
  this.subtasks = []; | 
						|
 | 
						|
  // error flag | 
						|
  this.error = false; | 
						|
 | 
						|
  // state of the task | 
						|
  this.state = READY; | 
						|
 | 
						|
  // number of times the task has been blocked (also the number | 
						|
  // of permits needed to be released to continue running) | 
						|
  this.blocks = 0; | 
						|
 | 
						|
  // timeout id when sleeping | 
						|
  this.timeoutId = null; | 
						|
 | 
						|
  // no swap time yet | 
						|
  this.swapTime = null; | 
						|
 | 
						|
  // no user data | 
						|
  this.userData = null; | 
						|
 | 
						|
  // initialize task | 
						|
  // FIXME: deal with overflow | 
						|
  this.id = sNextTaskId++; | 
						|
  sTasks[this.id] = this; | 
						|
  if(sVL >= 1) { | 
						|
    forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this); | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Logs debug information on this task and the system state. | 
						|
 */ | 
						|
Task.prototype.debug = function(msg) { | 
						|
  msg = msg || ''; | 
						|
  forge.log.debug(cat, msg, | 
						|
    '[%s][%s] task:', this.id, this.name, this, | 
						|
    'subtasks:', this.subtasks.length, | 
						|
    'queue:', sTaskQueues); | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Adds a subtask to run after task.doNext() or task.fail() is called. | 
						|
 * | 
						|
 * @param name human readable name for this task (optional). | 
						|
 * @param subrun a function to run that takes the current task as | 
						|
 *          its first parameter. | 
						|
 * | 
						|
 * @return the current task (useful for chaining next() calls). | 
						|
 */ | 
						|
Task.prototype.next = function(name, subrun) { | 
						|
  // juggle parameters if it looks like no name is given | 
						|
  if(typeof(name) === 'function') { | 
						|
    subrun = name; | 
						|
 | 
						|
    // inherit parent's name | 
						|
    name = this.name; | 
						|
  } | 
						|
  // create subtask, set parent to this task, propagate callbacks | 
						|
  var subtask = new Task({ | 
						|
    run: subrun, | 
						|
    name: name, | 
						|
    parent: this | 
						|
  }); | 
						|
  // start subtasks running | 
						|
  subtask.state = RUNNING; | 
						|
  subtask.type = this.type; | 
						|
  subtask.successCallback = this.successCallback || null; | 
						|
  subtask.failureCallback = this.failureCallback || null; | 
						|
 | 
						|
  // queue a new subtask | 
						|
  this.subtasks.push(subtask); | 
						|
 | 
						|
  return this; | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Adds subtasks to run in parallel after task.doNext() or task.fail() | 
						|
 * is called. | 
						|
 * | 
						|
 * @param name human readable name for this task (optional). | 
						|
 * @param subrun functions to run that take the current task as | 
						|
 *          their first parameter. | 
						|
 * | 
						|
 * @return the current task (useful for chaining next() calls). | 
						|
 */ | 
						|
Task.prototype.parallel = function(name, subrun) { | 
						|
  // juggle parameters if it looks like no name is given | 
						|
  if(forge.util.isArray(name)) { | 
						|
    subrun = name; | 
						|
 | 
						|
    // inherit parent's name | 
						|
    name = this.name; | 
						|
  } | 
						|
  // Wrap parallel tasks in a regular task so they are started at the | 
						|
  // proper time. | 
						|
  return this.next(name, function(task) { | 
						|
    // block waiting for subtasks | 
						|
    var ptask = task; | 
						|
    ptask.block(subrun.length); | 
						|
 | 
						|
    // we pass the iterator from the loop below as a parameter | 
						|
    // to a function because it is otherwise included in the | 
						|
    // closure and changes as the loop changes -- causing i | 
						|
    // to always be set to its highest value | 
						|
    var startParallelTask = function(pname, pi) { | 
						|
      forge.task.start({ | 
						|
        type: pname, | 
						|
        run: function(task) { | 
						|
           subrun[pi](task); | 
						|
        }, | 
						|
        success: function(task) { | 
						|
           ptask.unblock(); | 
						|
        }, | 
						|
        failure: function(task) { | 
						|
           ptask.unblock(); | 
						|
        } | 
						|
      }); | 
						|
    }; | 
						|
 | 
						|
    for(var i = 0; i < subrun.length; i++) { | 
						|
      // Type must be unique so task starts in parallel: | 
						|
      //    name + private string + task id + sub-task index | 
						|
      // start tasks in parallel and unblock when the finish | 
						|
      var pname = name + '__parallel-' + task.id + '-' + i; | 
						|
      var pi = i; | 
						|
      startParallelTask(pname, pi); | 
						|
    } | 
						|
  }); | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Stops a running task. | 
						|
 */ | 
						|
Task.prototype.stop = function() { | 
						|
  this.state = sStateTable[this.state][STOP]; | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Starts running a task. | 
						|
 */ | 
						|
Task.prototype.start = function() { | 
						|
  this.error = false; | 
						|
  this.state = sStateTable[this.state][START]; | 
						|
 | 
						|
  // try to restart | 
						|
  if(this.state === RUNNING) { | 
						|
    this.start = new Date(); | 
						|
    this.run(this); | 
						|
    runNext(this, 0); | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Blocks a task until it one or more permits have been released. The | 
						|
 * task will not resume until the requested number of permits have | 
						|
 * been released with call(s) to unblock(). | 
						|
 * | 
						|
 * @param n number of permits to wait for(default: 1). | 
						|
 */ | 
						|
Task.prototype.block = function(n) { | 
						|
  n = typeof(n) === 'undefined' ? 1 : n; | 
						|
  this.blocks += n; | 
						|
  if(this.blocks > 0) { | 
						|
    this.state = sStateTable[this.state][BLOCK]; | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Releases a permit to unblock a task. If a task was blocked by | 
						|
 * requesting N permits via block(), then it will only continue | 
						|
 * running once enough permits have been released via unblock() calls. | 
						|
 * | 
						|
 * If multiple processes need to synchronize with a single task then | 
						|
 * use a condition variable (see forge.task.createCondition). It is | 
						|
 * an error to unblock a task more times than it has been blocked. | 
						|
 * | 
						|
 * @param n number of permits to release (default: 1). | 
						|
 * | 
						|
 * @return the current block count (task is unblocked when count is 0) | 
						|
 */ | 
						|
Task.prototype.unblock = function(n) { | 
						|
  n = typeof(n) === 'undefined' ? 1 : n; | 
						|
  this.blocks -= n; | 
						|
  if(this.blocks === 0 && this.state !== DONE) { | 
						|
    this.state = RUNNING; | 
						|
    runNext(this, 0); | 
						|
  } | 
						|
  return this.blocks; | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Sleep for a period of time before resuming tasks. | 
						|
 * | 
						|
 * @param n number of milliseconds to sleep (default: 0). | 
						|
 */ | 
						|
Task.prototype.sleep = function(n) { | 
						|
  n = typeof(n) === 'undefined' ? 0 : n; | 
						|
  this.state = sStateTable[this.state][SLEEP]; | 
						|
  var self = this; | 
						|
  this.timeoutId = setTimeout(function() { | 
						|
    self.timeoutId = null; | 
						|
    self.state = RUNNING; | 
						|
    runNext(self, 0); | 
						|
  }, n); | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Waits on a condition variable until notified. The next task will | 
						|
 * not be scheduled until notification. A condition variable can be | 
						|
 * created with forge.task.createCondition(). | 
						|
 * | 
						|
 * Once cond.notify() is called, the task will continue. | 
						|
 * | 
						|
 * @param cond the condition variable to wait on. | 
						|
 */ | 
						|
Task.prototype.wait = function(cond) { | 
						|
  cond.wait(this); | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * If sleeping, wakeup and continue running tasks. | 
						|
 */ | 
						|
Task.prototype.wakeup = function() { | 
						|
  if(this.state === SLEEPING) { | 
						|
    cancelTimeout(this.timeoutId); | 
						|
    this.timeoutId = null; | 
						|
    this.state = RUNNING; | 
						|
    runNext(this, 0); | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Cancel all remaining subtasks of this task. | 
						|
 */ | 
						|
Task.prototype.cancel = function() { | 
						|
  this.state = sStateTable[this.state][CANCEL]; | 
						|
  // remove permits needed | 
						|
  this.permitsNeeded = 0; | 
						|
  // cancel timeouts | 
						|
  if(this.timeoutId !== null) { | 
						|
    cancelTimeout(this.timeoutId); | 
						|
    this.timeoutId = null; | 
						|
  } | 
						|
  // remove subtasks | 
						|
  this.subtasks = []; | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Finishes this task with failure and sets error flag. The entire | 
						|
 * task will be aborted unless the next task that should execute | 
						|
 * is passed as a parameter. This allows levels of subtasks to be | 
						|
 * skipped. For instance, to abort only this tasks's subtasks, then | 
						|
 * call fail(task.parent). To abort this task's subtasks and its | 
						|
 * parent's subtasks, call fail(task.parent.parent). To abort | 
						|
 * all tasks and simply call the task callback, call fail() or | 
						|
 * fail(null). | 
						|
 * | 
						|
 * The task callback (success or failure) will always, eventually, be | 
						|
 * called. | 
						|
 * | 
						|
 * @param next the task to continue at, or null to abort entirely. | 
						|
 */ | 
						|
Task.prototype.fail = function(next) { | 
						|
  // set error flag | 
						|
  this.error = true; | 
						|
 | 
						|
  // finish task | 
						|
  finish(this, true); | 
						|
 | 
						|
  if(next) { | 
						|
    // propagate task info | 
						|
    next.error = this.error; | 
						|
    next.swapTime = this.swapTime; | 
						|
    next.userData = this.userData; | 
						|
 | 
						|
    // do next task as specified | 
						|
    runNext(next, 0); | 
						|
  } else { | 
						|
    if(this.parent !== null) { | 
						|
      // finish root task (ensures it is removed from task queue) | 
						|
      var parent = this.parent; | 
						|
      while(parent.parent !== null) { | 
						|
        // propagate task info | 
						|
        parent.error = this.error; | 
						|
        parent.swapTime = this.swapTime; | 
						|
        parent.userData = this.userData; | 
						|
        parent = parent.parent; | 
						|
      } | 
						|
      finish(parent, true); | 
						|
    } | 
						|
 | 
						|
    // call failure callback if one exists | 
						|
    if(this.failureCallback) { | 
						|
      this.failureCallback(this); | 
						|
    } | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Asynchronously start a task. | 
						|
 * | 
						|
 * @param task the task to start. | 
						|
 */ | 
						|
var start = function(task) { | 
						|
  task.error = false; | 
						|
  task.state = sStateTable[task.state][START]; | 
						|
  setTimeout(function() { | 
						|
    if(task.state === RUNNING) { | 
						|
      task.swapTime = +new Date(); | 
						|
      task.run(task); | 
						|
      runNext(task, 0); | 
						|
    } | 
						|
  }, 0); | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Run the next subtask or finish this task. | 
						|
 * | 
						|
 * @param task the task to process. | 
						|
 * @param recurse the recursion count. | 
						|
 */ | 
						|
var runNext = function(task, recurse) { | 
						|
  // get time since last context swap (ms), if enough time has passed set | 
						|
  // swap to true to indicate that doNext was performed asynchronously | 
						|
  // also, if recurse is too high do asynchronously | 
						|
  var swap = | 
						|
    (recurse > sMaxRecursions) || | 
						|
    (+new Date() - task.swapTime) > sTimeSlice; | 
						|
 | 
						|
  var doNext = function(recurse) { | 
						|
    recurse++; | 
						|
    if(task.state === RUNNING) { | 
						|
      if(swap) { | 
						|
        // update swap time | 
						|
        task.swapTime = +new Date(); | 
						|
      } | 
						|
 | 
						|
      if(task.subtasks.length > 0) { | 
						|
        // run next subtask | 
						|
        var subtask = task.subtasks.shift(); | 
						|
        subtask.error = task.error; | 
						|
        subtask.swapTime = task.swapTime; | 
						|
        subtask.userData = task.userData; | 
						|
        subtask.run(subtask); | 
						|
        if(!subtask.error) { | 
						|
           runNext(subtask, recurse); | 
						|
        } | 
						|
      } else { | 
						|
        finish(task); | 
						|
 | 
						|
        if(!task.error) { | 
						|
          // chain back up and run parent | 
						|
          if(task.parent !== null) { | 
						|
            // propagate task info | 
						|
            task.parent.error = task.error; | 
						|
            task.parent.swapTime = task.swapTime; | 
						|
            task.parent.userData = task.userData; | 
						|
 | 
						|
            // no subtasks left, call run next subtask on parent | 
						|
            runNext(task.parent, recurse); | 
						|
          } | 
						|
        } | 
						|
      } | 
						|
    } | 
						|
  }; | 
						|
 | 
						|
  if(swap) { | 
						|
    // we're swapping, so run asynchronously | 
						|
    setTimeout(doNext, 0); | 
						|
  } else { | 
						|
    // not swapping, so run synchronously | 
						|
    doNext(recurse); | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Finishes a task and looks for the next task in the queue to start. | 
						|
 * | 
						|
 * @param task the task to finish. | 
						|
 * @param suppressCallbacks true to suppress callbacks. | 
						|
 */ | 
						|
var finish = function(task, suppressCallbacks) { | 
						|
  // subtask is now done | 
						|
  task.state = DONE; | 
						|
 | 
						|
  delete sTasks[task.id]; | 
						|
  if(sVL >= 1) { | 
						|
    forge.log.verbose(cat, '[%s][%s] finish', | 
						|
      task.id, task.name, task); | 
						|
  } | 
						|
 | 
						|
  // only do queue processing for root tasks | 
						|
  if(task.parent === null) { | 
						|
    // report error if queue is missing | 
						|
    if(!(task.type in sTaskQueues)) { | 
						|
      forge.log.error(cat, | 
						|
        '[%s][%s] task queue missing [%s]', | 
						|
        task.id, task.name, task.type); | 
						|
    } else if(sTaskQueues[task.type].length === 0) { | 
						|
      // report error if queue is empty | 
						|
      forge.log.error(cat, | 
						|
        '[%s][%s] task queue empty [%s]', | 
						|
        task.id, task.name, task.type); | 
						|
    } else if(sTaskQueues[task.type][0] !== task) { | 
						|
      // report error if this task isn't the first in the queue | 
						|
      forge.log.error(cat, | 
						|
        '[%s][%s] task not first in queue [%s]', | 
						|
        task.id, task.name, task.type); | 
						|
    } else { | 
						|
      // remove ourselves from the queue | 
						|
      sTaskQueues[task.type].shift(); | 
						|
      // clean up queue if it is empty | 
						|
      if(sTaskQueues[task.type].length === 0) { | 
						|
        if(sVL >= 1) { | 
						|
          forge.log.verbose(cat, '[%s][%s] delete queue [%s]', | 
						|
            task.id, task.name, task.type); | 
						|
        } | 
						|
        /* Note: Only a task can delete a queue of its own type. This | 
						|
         is used as a way to synchronize tasks. If a queue for a certain | 
						|
         task type exists, then a task of that type is running. | 
						|
         */ | 
						|
        delete sTaskQueues[task.type]; | 
						|
      } else { | 
						|
        // dequeue the next task and start it | 
						|
        if(sVL >= 1) { | 
						|
          forge.log.verbose(cat, | 
						|
            '[%s][%s] queue start next [%s] remain:%s', | 
						|
            task.id, task.name, task.type, | 
						|
            sTaskQueues[task.type].length); | 
						|
        } | 
						|
        sTaskQueues[task.type][0].start(); | 
						|
      } | 
						|
    } | 
						|
 | 
						|
    if(!suppressCallbacks) { | 
						|
      // call final callback if one exists | 
						|
      if(task.error && task.failureCallback) { | 
						|
        task.failureCallback(task); | 
						|
      } else if(!task.error && task.successCallback) { | 
						|
        task.successCallback(task); | 
						|
      } | 
						|
    } | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/* Tasks API */ | 
						|
module.exports = forge.task = forge.task || {}; | 
						|
 | 
						|
/** | 
						|
 * Starts a new task that will run the passed function asynchronously. | 
						|
 * | 
						|
 * In order to finish the task, either task.doNext() or task.fail() | 
						|
 * *must* be called. | 
						|
 * | 
						|
 * The task must have a type (a string identifier) that can be used to | 
						|
 * synchronize it with other tasks of the same type. That type can also | 
						|
 * be used to cancel tasks that haven't started yet. | 
						|
 * | 
						|
 * To start a task, the following object must be provided as a parameter | 
						|
 * (each function takes a task object as its first parameter): | 
						|
 * | 
						|
 * { | 
						|
 *   type: the type of task. | 
						|
 *   run: the function to run to execute the task. | 
						|
 *   success: a callback to call when the task succeeds (optional). | 
						|
 *   failure: a callback to call when the task fails (optional). | 
						|
 * } | 
						|
 * | 
						|
 * @param options the object as described above. | 
						|
 */ | 
						|
forge.task.start = function(options) { | 
						|
  // create a new task | 
						|
  var task = new Task({ | 
						|
    run: options.run, | 
						|
    name: options.name || sNoTaskName | 
						|
  }); | 
						|
  task.type = options.type; | 
						|
  task.successCallback = options.success || null; | 
						|
  task.failureCallback = options.failure || null; | 
						|
 | 
						|
  // append the task onto the appropriate queue | 
						|
  if(!(task.type in sTaskQueues)) { | 
						|
    if(sVL >= 1) { | 
						|
      forge.log.verbose(cat, '[%s][%s] create queue [%s]', | 
						|
        task.id, task.name, task.type); | 
						|
    } | 
						|
    // create the queue with the new task | 
						|
    sTaskQueues[task.type] = [task]; | 
						|
    start(task); | 
						|
  } else { | 
						|
    // push the task onto the queue, it will be run after a task | 
						|
    // with the same type completes | 
						|
    sTaskQueues[options.type].push(task); | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Cancels all tasks of the given type that haven't started yet. | 
						|
 * | 
						|
 * @param type the type of task to cancel. | 
						|
 */ | 
						|
forge.task.cancel = function(type) { | 
						|
  // find the task queue | 
						|
  if(type in sTaskQueues) { | 
						|
    // empty all but the current task from the queue | 
						|
    sTaskQueues[type] = [sTaskQueues[type][0]]; | 
						|
  } | 
						|
}; | 
						|
 | 
						|
/** | 
						|
 * Creates a condition variable to synchronize tasks. To make a task wait | 
						|
 * on the condition variable, call task.wait(condition). To notify all | 
						|
 * tasks that are waiting, call condition.notify(). | 
						|
 * | 
						|
 * @return the condition variable. | 
						|
 */ | 
						|
forge.task.createCondition = function() { | 
						|
  var cond = { | 
						|
    // all tasks that are blocked | 
						|
    tasks: {} | 
						|
  }; | 
						|
 | 
						|
  /** | 
						|
   * Causes the given task to block until notify is called. If the task | 
						|
   * is already waiting on this condition then this is a no-op. | 
						|
   * | 
						|
   * @param task the task to cause to wait. | 
						|
   */ | 
						|
  cond.wait = function(task) { | 
						|
    // only block once | 
						|
    if(!(task.id in cond.tasks)) { | 
						|
       task.block(); | 
						|
       cond.tasks[task.id] = task; | 
						|
    } | 
						|
  }; | 
						|
 | 
						|
  /** | 
						|
   * Notifies all waiting tasks to wake up. | 
						|
   */ | 
						|
  cond.notify = function() { | 
						|
    // since unblock() will run the next task from here, make sure to | 
						|
    // clear the condition's blocked task list before unblocking | 
						|
    var tmp = cond.tasks; | 
						|
    cond.tasks = {}; | 
						|
    for(var id in tmp) { | 
						|
      tmp[id].unblock(); | 
						|
    } | 
						|
  }; | 
						|
 | 
						|
  return cond; | 
						|
};
 | 
						|
 |