208 lines
7.9 KiB
JavaScript
208 lines
7.9 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.encode = exports.decode = void 0;
|
|
const flatten_js_1 = require("./flatten.js");
|
|
const unflatten_js_1 = require("./unflatten.js");
|
|
const utils_js_1 = require("./utils.js");
|
|
async function decode(readable, options) {
|
|
const { plugins } = options ?? {};
|
|
const done = new utils_js_1.Deferred();
|
|
const reader = readable
|
|
.pipeThrough((0, utils_js_1.createLineSplittingTransform)())
|
|
.getReader();
|
|
const decoder = {
|
|
values: [],
|
|
hydrated: [],
|
|
deferred: {},
|
|
plugins,
|
|
};
|
|
const decoded = await decodeInitial.call(decoder, reader);
|
|
let donePromise = done.promise;
|
|
if (decoded.done) {
|
|
done.resolve();
|
|
}
|
|
else {
|
|
donePromise = decodeDeferred
|
|
.call(decoder, reader)
|
|
.then(done.resolve)
|
|
.catch((reason) => {
|
|
for (const deferred of Object.values(decoder.deferred)) {
|
|
deferred.reject(reason);
|
|
}
|
|
done.reject(reason);
|
|
});
|
|
}
|
|
return {
|
|
done: donePromise.then(() => reader.closed),
|
|
value: decoded.value,
|
|
};
|
|
}
|
|
exports.decode = decode;
|
|
async function decodeInitial(reader) {
|
|
const read = await reader.read();
|
|
if (!read.value) {
|
|
throw new SyntaxError();
|
|
}
|
|
let line;
|
|
try {
|
|
line = JSON.parse(read.value);
|
|
}
|
|
catch (reason) {
|
|
throw new SyntaxError();
|
|
}
|
|
return {
|
|
done: read.done,
|
|
value: unflatten_js_1.unflatten.call(this, line),
|
|
};
|
|
}
|
|
async function decodeDeferred(reader) {
|
|
let read = await reader.read();
|
|
while (!read.done) {
|
|
if (!read.value)
|
|
continue;
|
|
const line = read.value;
|
|
switch (line[0]) {
|
|
case utils_js_1.TYPE_PROMISE: {
|
|
const colonIndex = line.indexOf(":");
|
|
const deferredId = Number(line.slice(1, colonIndex));
|
|
const deferred = this.deferred[deferredId];
|
|
if (!deferred) {
|
|
throw new Error(`Deferred ID ${deferredId} not found in stream`);
|
|
}
|
|
const lineData = line.slice(colonIndex + 1);
|
|
let jsonLine;
|
|
try {
|
|
jsonLine = JSON.parse(lineData);
|
|
}
|
|
catch (reason) {
|
|
throw new SyntaxError();
|
|
}
|
|
const value = unflatten_js_1.unflatten.call(this, jsonLine);
|
|
deferred.resolve(value);
|
|
break;
|
|
}
|
|
case utils_js_1.TYPE_ERROR: {
|
|
const colonIndex = line.indexOf(":");
|
|
const deferredId = Number(line.slice(1, colonIndex));
|
|
const deferred = this.deferred[deferredId];
|
|
if (!deferred) {
|
|
throw new Error(`Deferred ID ${deferredId} not found in stream`);
|
|
}
|
|
const lineData = line.slice(colonIndex + 1);
|
|
let jsonLine;
|
|
try {
|
|
jsonLine = JSON.parse(lineData);
|
|
}
|
|
catch (reason) {
|
|
throw new SyntaxError();
|
|
}
|
|
const value = unflatten_js_1.unflatten.call(this, jsonLine);
|
|
deferred.reject(value);
|
|
break;
|
|
}
|
|
default:
|
|
throw new SyntaxError();
|
|
}
|
|
read = await reader.read();
|
|
}
|
|
}
|
|
function encode(input, options) {
|
|
const { plugins, postPlugins, signal } = options ?? {};
|
|
const encoder = {
|
|
deferred: {},
|
|
index: 0,
|
|
indices: new Map(),
|
|
stringified: [],
|
|
plugins,
|
|
postPlugins,
|
|
signal,
|
|
};
|
|
const textEncoder = new TextEncoder();
|
|
let lastSentIndex = 0;
|
|
const readable = new ReadableStream({
|
|
async start(controller) {
|
|
const id = flatten_js_1.flatten.call(encoder, input);
|
|
if (Array.isArray(id)) {
|
|
throw new Error("This should never happen");
|
|
}
|
|
if (id < 0) {
|
|
controller.enqueue(textEncoder.encode(`${id}\n`));
|
|
}
|
|
else {
|
|
controller.enqueue(textEncoder.encode(`[${encoder.stringified.join(",")}]\n`));
|
|
lastSentIndex = encoder.stringified.length - 1;
|
|
}
|
|
const seenPromises = new WeakSet();
|
|
while (Object.keys(encoder.deferred).length > 0) {
|
|
for (const [deferredId, deferred] of Object.entries(encoder.deferred)) {
|
|
if (seenPromises.has(deferred))
|
|
continue;
|
|
seenPromises.add((encoder.deferred[Number(deferredId)] = raceSignal(deferred, encoder.signal)
|
|
.then((resolved) => {
|
|
const id = flatten_js_1.flatten.call(encoder, resolved);
|
|
if (Array.isArray(id)) {
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_PROMISE}${deferredId}:[["${utils_js_1.TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`));
|
|
encoder.index++;
|
|
lastSentIndex++;
|
|
}
|
|
else if (id < 0) {
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_PROMISE}${deferredId}:${id}\n`));
|
|
}
|
|
else {
|
|
const values = encoder.stringified
|
|
.slice(lastSentIndex + 1)
|
|
.join(",");
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_PROMISE}${deferredId}:[${values}]\n`));
|
|
lastSentIndex = encoder.stringified.length - 1;
|
|
}
|
|
}, (reason) => {
|
|
if (!reason ||
|
|
typeof reason !== "object" ||
|
|
!(reason instanceof Error)) {
|
|
reason = new Error("An unknown error occurred");
|
|
}
|
|
const id = flatten_js_1.flatten.call(encoder, reason);
|
|
if (Array.isArray(id)) {
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_ERROR}${deferredId}:[["${utils_js_1.TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`));
|
|
encoder.index++;
|
|
lastSentIndex++;
|
|
}
|
|
else if (id < 0) {
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_ERROR}${deferredId}:${id}\n`));
|
|
}
|
|
else {
|
|
const values = encoder.stringified
|
|
.slice(lastSentIndex + 1)
|
|
.join(",");
|
|
controller.enqueue(textEncoder.encode(`${utils_js_1.TYPE_ERROR}${deferredId}:[${values}]\n`));
|
|
lastSentIndex = encoder.stringified.length - 1;
|
|
}
|
|
})
|
|
.finally(() => {
|
|
delete encoder.deferred[Number(deferredId)];
|
|
})));
|
|
}
|
|
await Promise.race(Object.values(encoder.deferred));
|
|
}
|
|
await Promise.all(Object.values(encoder.deferred));
|
|
controller.close();
|
|
},
|
|
});
|
|
return readable;
|
|
}
|
|
exports.encode = encode;
|
|
function raceSignal(promise, signal) {
|
|
if (!signal)
|
|
return promise;
|
|
if (signal.aborted)
|
|
return Promise.reject(signal.reason || new Error("Signal was aborted."));
|
|
const abort = new Promise((resolve, reject) => {
|
|
signal.addEventListener("abort", (event) => {
|
|
reject(signal.reason || new Error("Signal was aborted."));
|
|
});
|
|
promise.then(resolve).catch(reject);
|
|
});
|
|
abort.catch(() => { });
|
|
return Promise.race([abort, promise]);
|
|
}
|