You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							92 lines
						
					
					
						
							1.7 KiB
						
					
					
				
			
		
		
	
	
							92 lines
						
					
					
						
							1.7 KiB
						
					
					
				'use strict'; | 
						|
const Observable = require('any-observable'); | 
						|
 | 
						|
function or(option, alternate, required) { | 
						|
	const result = option === false ? false : option || alternate; | 
						|
 | 
						|
	if ((required && !result) || (result && typeof result !== 'string')) { | 
						|
		throw new TypeError(alternate + 'Event must be a string.'); | 
						|
	} | 
						|
 | 
						|
	return result; | 
						|
} | 
						|
 | 
						|
module.exports = (stream, opts) => { | 
						|
	opts = opts || {}; | 
						|
 | 
						|
	let complete = false; | 
						|
	let dataListeners = []; | 
						|
	const awaited = opts.await; | 
						|
	const dataEvent = or(opts.dataEvent, 'data', true); | 
						|
	const errorEvent = or(opts.errorEvent, 'error'); | 
						|
	const endEvent = or(opts.endEvent, 'end'); | 
						|
 | 
						|
	function cleanup() { | 
						|
		complete = true; | 
						|
		dataListeners.forEach(listener => { | 
						|
			stream.removeListener(dataEvent, listener); | 
						|
		}); | 
						|
		dataListeners = null; | 
						|
	} | 
						|
 | 
						|
	const completion = new Promise((resolve, reject) => { | 
						|
		function onEnd(result) { | 
						|
			if (awaited) { | 
						|
				awaited.then(resolve); | 
						|
			} else { | 
						|
				resolve(result); | 
						|
			} | 
						|
		} | 
						|
 | 
						|
		if (endEvent) { | 
						|
			stream.once(endEvent, onEnd); | 
						|
		} else if (awaited) { | 
						|
			onEnd(); | 
						|
		} | 
						|
 | 
						|
		if (errorEvent) { | 
						|
			stream.once(errorEvent, reject); | 
						|
		} | 
						|
 | 
						|
		if (awaited) { | 
						|
			awaited.catch(reject); | 
						|
		} | 
						|
	}).catch(err => { | 
						|
		cleanup(); | 
						|
		throw err; | 
						|
	}).then(result => { | 
						|
		cleanup(); | 
						|
		return result; | 
						|
	}); | 
						|
 | 
						|
	return new Observable(observer => { | 
						|
		completion | 
						|
			.then(observer.complete.bind(observer)) | 
						|
			.catch(observer.error.bind(observer)); | 
						|
 | 
						|
		if (complete) { | 
						|
			return null; | 
						|
		} | 
						|
 | 
						|
		const onData = data => { | 
						|
			observer.next(data); | 
						|
		}; | 
						|
 | 
						|
		stream.on(dataEvent, onData); | 
						|
		dataListeners.push(onData); | 
						|
 | 
						|
		return () => { | 
						|
			stream.removeListener(dataEvent, onData); | 
						|
 | 
						|
			if (complete) { | 
						|
				return; | 
						|
			} | 
						|
 | 
						|
			const idx = dataListeners.indexOf(onData); | 
						|
 | 
						|
			if (idx !== -1) { | 
						|
				dataListeners.splice(idx, 1); | 
						|
			} | 
						|
		}; | 
						|
	}); | 
						|
};
 | 
						|
 |