You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
5.9 KiB
283 lines
5.9 KiB
'use strict' |
|
|
|
const check = require('check-types') |
|
const eventify = require('./eventify') |
|
const events = require('./events') |
|
const JsonStream = require('./jsonstream') |
|
const Hoopy = require('hoopy') |
|
const promise = require('./promise') |
|
const tryer = require('tryer') |
|
|
|
const DEFAULT_BUFFER_LENGTH = 1024 |
|
|
|
module.exports = streamify |
|
|
|
/** |
|
* Public function `streamify`. |
|
* |
|
* Asynchronously serialises a data structure to a stream of JSON |
|
* data. Sanely handles promises, buffers, maps and other iterables. |
|
* |
|
* @param data: The data to transform. |
|
* |
|
* @option space: Indentation string, or the number of spaces |
|
* to indent each nested level by. |
|
* |
|
* @option promises: 'resolve' or 'ignore', default is 'resolve'. |
|
* |
|
* @option buffers: 'toString' or 'ignore', default is 'toString'. |
|
* |
|
* @option maps: 'object' or 'ignore', default is 'object'. |
|
* |
|
* @option iterables: 'array' or 'ignore', default is 'array'. |
|
* |
|
* @option circular: 'error' or 'ignore', default is 'error'. |
|
* |
|
* @option yieldRate: The number of data items to process per timeslice, |
|
* default is 16384. |
|
* |
|
* @option bufferLength: The length of the buffer, default is 1024. |
|
* |
|
* @option highWaterMark: If set, will be passed to the readable stream constructor |
|
* as the value for the highWaterMark option. |
|
* |
|
* @option Promise: The promise constructor to use, defaults to bluebird. |
|
**/ |
|
function streamify (data, options = {}) { |
|
const emitter = eventify(data, options) |
|
const json = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH) |
|
const Promise = promise(options) |
|
const space = normaliseSpace(options) |
|
let streamOptions |
|
const { highWaterMark } = options |
|
if (highWaterMark) { |
|
streamOptions = { highWaterMark } |
|
} |
|
const stream = new JsonStream(read, streamOptions) |
|
|
|
let awaitPush = true |
|
let index = 0 |
|
let indentation = '' |
|
let isEnded |
|
let isPaused = false |
|
let isProperty |
|
let length = 0 |
|
let mutex = Promise.resolve() |
|
let needsComma |
|
|
|
emitter.on(events.array, noRacing(array)) |
|
emitter.on(events.object, noRacing(object)) |
|
emitter.on(events.property, noRacing(property)) |
|
emitter.on(events.string, noRacing(string)) |
|
emitter.on(events.number, noRacing(value)) |
|
emitter.on(events.literal, noRacing(value)) |
|
emitter.on(events.endArray, noRacing(endArray)) |
|
emitter.on(events.endObject, noRacing(endObject)) |
|
emitter.on(events.end, noRacing(end)) |
|
emitter.on(events.error, noRacing(error)) |
|
emitter.on(events.dataError, noRacing(dataError)) |
|
|
|
return stream |
|
|
|
function read () { |
|
if (awaitPush) { |
|
awaitPush = false |
|
|
|
if (isEnded) { |
|
if (length > 0) { |
|
after() |
|
} |
|
|
|
return endStream() |
|
} |
|
} |
|
|
|
if (isPaused) { |
|
after() |
|
} |
|
} |
|
|
|
function after () { |
|
if (awaitPush) { |
|
return |
|
} |
|
|
|
let i |
|
|
|
for (i = 0; i < length && ! awaitPush; ++i) { |
|
if (! stream.push(json[i + index], 'utf8')) { |
|
awaitPush = true |
|
} |
|
} |
|
|
|
if (i === length) { |
|
index = length = 0 |
|
} else { |
|
length -= i |
|
index += i |
|
} |
|
} |
|
|
|
function endStream () { |
|
if (! awaitPush) { |
|
stream.push(null) |
|
} |
|
} |
|
|
|
function noRacing (handler) { |
|
return eventData => mutex = mutex.then(() => handler(eventData)) |
|
} |
|
|
|
function array () { |
|
return beforeScope() |
|
.then(() => addJson('[')) |
|
.then(() => afterScope()) |
|
} |
|
|
|
function beforeScope () { |
|
return before(true) |
|
} |
|
|
|
function before (isScope) { |
|
if (isProperty) { |
|
isProperty = false |
|
|
|
if (space) { |
|
return addJson(' ') |
|
} |
|
|
|
return Promise.resolve() |
|
} |
|
|
|
return Promise.resolve() |
|
.then(() => { |
|
if (needsComma) { |
|
if (isScope) { |
|
needsComma = false |
|
} |
|
|
|
return addJson(',') |
|
} |
|
|
|
if (! isScope) { |
|
needsComma = true |
|
} |
|
}) |
|
.then(() => { |
|
if (space && indentation) { |
|
return indent() |
|
} |
|
}) |
|
} |
|
|
|
function addJson (chunk) { |
|
if (length + 1 <= json.length) { |
|
json[index + length++] = chunk |
|
after() |
|
return Promise.resolve() |
|
} |
|
|
|
isPaused = true |
|
return new Promise(resolve => { |
|
const unpause = emitter.pause() |
|
tryer({ |
|
interval: -10, |
|
until () { |
|
return length + 1 <= json.length |
|
}, |
|
pass () { |
|
isPaused = false |
|
json[index + length++] = chunk |
|
resolve() |
|
setImmediate(unpause) |
|
} |
|
}) |
|
}) |
|
} |
|
|
|
function indent () { |
|
return addJson(`\n${indentation}`) |
|
} |
|
|
|
function afterScope () { |
|
needsComma = false |
|
|
|
if (space) { |
|
indentation += space |
|
} |
|
} |
|
|
|
function object () { |
|
return beforeScope() |
|
.then(() => addJson('{')) |
|
.then(() => afterScope()) |
|
} |
|
|
|
function property (name) { |
|
return before() |
|
.then(() => addJson(`"${name}":`)) |
|
.then(() => { |
|
isProperty = true |
|
}) |
|
} |
|
|
|
function string (s) { |
|
return value(`"${s}"`) |
|
} |
|
|
|
function value (v) { |
|
return before() |
|
.then(() => addJson(`${v}`)) |
|
} |
|
|
|
function endArray () { |
|
return beforeScopeEnd() |
|
.then(() => addJson(']')) |
|
.then(() => afterScopeEnd()) |
|
} |
|
|
|
function beforeScopeEnd () { |
|
if (space) { |
|
indentation = indentation.substr(space.length) |
|
|
|
return indent() |
|
} |
|
|
|
return Promise.resolve() |
|
} |
|
|
|
function afterScopeEnd () { |
|
needsComma = true |
|
} |
|
|
|
function endObject () { |
|
return beforeScopeEnd() |
|
.then(() => addJson('}')) |
|
.then(() => afterScopeEnd()) |
|
} |
|
|
|
function end () { |
|
after() |
|
|
|
isEnded = true |
|
endStream() |
|
} |
|
|
|
function error (err) { |
|
stream.emit('error', err) |
|
} |
|
|
|
function dataError (err) { |
|
stream.emit('dataError', err) |
|
} |
|
} |
|
|
|
function normaliseSpace (options) { |
|
if (check.positive(options.space)) { |
|
return new Array(options.space + 1).join(' ') |
|
} |
|
|
|
if (check.nonEmptyString(options.space)) { |
|
return options.space |
|
} |
|
}
|
|
|