1125 lines
35 KiB
JavaScript
1125 lines
35 KiB
JavaScript
import {
|
|
getAddress,
|
|
getBufferAddress,
|
|
write,
|
|
compress,
|
|
lmdbError,
|
|
} from './native.js';
|
|
import { when } from './util/when.js';
|
|
var backpressureArray;
|
|
|
|
const WAITING_OPERATION = 0x2000000;
|
|
const BACKPRESSURE_THRESHOLD = 300000;
|
|
const TXN_DELIMITER = 0x8000000;
|
|
const TXN_COMMITTED = 0x10000000;
|
|
const TXN_FLUSHED = 0x20000000;
|
|
const TXN_FAILED = 0x40000000;
|
|
export const FAILED_CONDITION = 0x4000000;
|
|
const REUSE_BUFFER_MODE = 512;
|
|
const RESET_BUFFER_MODE = 1024;
|
|
const NO_RESOLVE = 16;
|
|
const HAS_TXN = 8;
|
|
const CONDITIONAL_VERSION_LESS_THAN = 0x800;
|
|
const CONDITIONAL_ALLOW_NOTFOUND = 0x800;
|
|
|
|
const SYNC_PROMISE_SUCCESS = Promise.resolve(true);
|
|
const SYNC_PROMISE_FAIL = Promise.resolve(false);
|
|
SYNC_PROMISE_SUCCESS.isSync = true;
|
|
SYNC_PROMISE_SUCCESS.result = true;
|
|
SYNC_PROMISE_FAIL.isSync = true;
|
|
SYNC_PROMISE_FAIL.result = false;
|
|
const PROMISE_SUCCESS = Promise.resolve(true);
|
|
const arch = process.arch;
|
|
export const ABORT = 4.452694326329068e-106; // random/unguessable numbers, which work across module/versions and native
|
|
export const IF_EXISTS = 3.542694326329068e-103;
|
|
const CALLBACK_THREW = {};
|
|
const LocalSharedArrayBuffer =
|
|
typeof Deno != 'undefined' || // Deno can't handle SharedArrayBuffer as an FFI
|
|
// argument due to https://github.com/denoland/deno/issues/12678
|
|
typeof SharedArrayBuffer == 'undefined' // Sometimes electron doesn't have a SharedArrayBuffer
|
|
? ArrayBuffer
|
|
: SharedArrayBuffer;
|
|
const ByteArray =
|
|
typeof Buffer != 'undefined'
|
|
? function (buffer) {
|
|
return Buffer.from(buffer);
|
|
}
|
|
: Uint8Array;
|
|
const queueTask =
|
|
typeof setImmediate != 'undefined' ? setImmediate : setTimeout; // TODO: Or queueMicrotask?
|
|
//let debugLog = []
|
|
const WRITE_BUFFER_SIZE = 0x10000;
|
|
var log = [];
|
|
export function addWriteMethods(
|
|
LMDBStore,
|
|
{
|
|
env,
|
|
fixedBuffer,
|
|
resetReadTxn,
|
|
useWritemap,
|
|
maxKeySize,
|
|
eventTurnBatching,
|
|
txnStartThreshold,
|
|
batchStartThreshold,
|
|
overlappingSync,
|
|
commitDelay,
|
|
separateFlushed,
|
|
maxFlushDelay,
|
|
},
|
|
) {
|
|
// stands for write instructions
|
|
var dynamicBytes;
|
|
function allocateInstructionBuffer(lastPosition) {
|
|
// Must use a shared buffer on older node in order to use Atomics, and it is also more correct since we are
|
|
// indeed accessing and modifying it from another thread (in C). However, Deno can't handle it for
|
|
// FFI so aliased above
|
|
let buffer = new LocalSharedArrayBuffer(WRITE_BUFFER_SIZE);
|
|
let lastBytes = dynamicBytes;
|
|
dynamicBytes = new ByteArray(buffer);
|
|
let uint32 = (dynamicBytes.uint32 = new Uint32Array(
|
|
buffer,
|
|
0,
|
|
WRITE_BUFFER_SIZE >> 2,
|
|
));
|
|
uint32[2] = 0;
|
|
dynamicBytes.float64 = new Float64Array(buffer, 0, WRITE_BUFFER_SIZE >> 3);
|
|
buffer.address = getBufferAddress(dynamicBytes);
|
|
uint32.address = buffer.address + uint32.byteOffset;
|
|
dynamicBytes.position = 1; // we start at position 1 to save space for writing the txn id before the txn delimiter
|
|
if (lastPosition) {
|
|
lastBytes.float64[lastPosition + 1] =
|
|
dynamicBytes.uint32.address + (dynamicBytes.position << 3);
|
|
lastBytes.uint32[lastPosition << 1] = 3; // pointer instruction
|
|
}
|
|
return dynamicBytes;
|
|
}
|
|
var newBufferThreshold = (WRITE_BUFFER_SIZE - maxKeySize - 64) >> 3; // need to reserve more room if we do inline values
|
|
var outstandingWriteCount = 0;
|
|
var startAddress = 0;
|
|
var writeTxn = null;
|
|
var committed;
|
|
var abortedNonChildTransactionWarn;
|
|
var nextTxnCallbacks = [];
|
|
var commitPromise,
|
|
flushPromise,
|
|
flushResolvers = [],
|
|
batchFlushResolvers = [];
|
|
commitDelay = commitDelay || 0;
|
|
eventTurnBatching = eventTurnBatching === false ? false : true;
|
|
var enqueuedCommit;
|
|
var afterCommitCallbacks = [];
|
|
var beforeCommitCallbacks = [];
|
|
var enqueuedEventTurnBatch;
|
|
var batchDepth = 0;
|
|
var lastWritePromise;
|
|
var writeBatchStart,
|
|
outstandingBatchCount,
|
|
lastSyncTxnFlush,
|
|
lastFlushTimeout,
|
|
lastFlushCallback;
|
|
var hasUnresolvedTxns;
|
|
txnStartThreshold = txnStartThreshold || 5;
|
|
batchStartThreshold = batchStartThreshold || 1000;
|
|
maxFlushDelay = maxFlushDelay || 500;
|
|
|
|
allocateInstructionBuffer();
|
|
dynamicBytes.uint32[2] = TXN_DELIMITER | TXN_COMMITTED | TXN_FLUSHED;
|
|
var txnResolution,
|
|
nextResolution = {
|
|
uint32: dynamicBytes.uint32,
|
|
flagPosition: 2,
|
|
flag: 0,
|
|
valueBuffer: null,
|
|
next: null,
|
|
meta: null,
|
|
};
|
|
var uncommittedResolution = {
|
|
uint32: null,
|
|
flagPosition: 2,
|
|
flag: 0,
|
|
valueBuffer: null,
|
|
next: nextResolution,
|
|
meta: null,
|
|
};
|
|
var unwrittenResolution = nextResolution;
|
|
var lastPromisedResolution = uncommittedResolution;
|
|
var lastQueuedResolution = uncommittedResolution;
|
|
function writeInstructions(flags, store, key, value, version, ifVersion) {
|
|
let writeStatus;
|
|
let targetBytes, position, encoder;
|
|
let valueSize, valueBuffer, valueBufferStart;
|
|
if (flags & 2) {
|
|
// encode first in case we have to write a shared structure
|
|
encoder = store.encoder;
|
|
if (value && value['\x10binary-data\x02'])
|
|
valueBuffer = value['\x10binary-data\x02'];
|
|
else if (encoder) {
|
|
if (encoder.copyBuffers)
|
|
// use this as indicator for support buffer reuse for now
|
|
valueBuffer = encoder.encode(
|
|
value,
|
|
REUSE_BUFFER_MODE | (writeTxn ? RESET_BUFFER_MODE : 0),
|
|
);
|
|
// in addition, if we are writing sync, after using, we can immediately reset the encoder's position to reuse that space, which can improve performance
|
|
else {
|
|
// various other encoders, including JSON.stringify, that might serialize to a string
|
|
valueBuffer = encoder.encode(value);
|
|
if (typeof valueBuffer == 'string')
|
|
valueBuffer = Buffer.from(valueBuffer); // TODO: Would be nice to write strings inline in the instructions
|
|
}
|
|
} else if (typeof value == 'string') {
|
|
valueBuffer = Buffer.from(value); // TODO: Would be nice to write strings inline in the instructions
|
|
} else if (value instanceof Uint8Array) valueBuffer = value;
|
|
else
|
|
throw new Error(
|
|
'Invalid value to put in database ' +
|
|
value +
|
|
' (' +
|
|
typeof value +
|
|
'), consider using encoder',
|
|
);
|
|
valueBufferStart = valueBuffer.start;
|
|
if (valueBufferStart > -1)
|
|
// if we have buffers with start/end position
|
|
valueSize = valueBuffer.end - valueBufferStart; // size
|
|
else valueSize = valueBuffer.length;
|
|
if (store.dupSort && valueSize > maxKeySize)
|
|
throw new Error(
|
|
'The value is larger than the maximum size (' +
|
|
maxKeySize +
|
|
') for a value in a dupSort database',
|
|
);
|
|
} else valueSize = 0;
|
|
if (writeTxn) {
|
|
targetBytes = fixedBuffer;
|
|
position = 0;
|
|
} else {
|
|
if (eventTurnBatching && !enqueuedEventTurnBatch && batchDepth == 0) {
|
|
enqueuedEventTurnBatch = queueTask(() => {
|
|
try {
|
|
for (let i = 0, l = beforeCommitCallbacks.length; i < l; i++) {
|
|
try {
|
|
beforeCommitCallbacks[i]();
|
|
} catch (error) {
|
|
console.error('In beforecommit callback', error);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error(error);
|
|
}
|
|
enqueuedEventTurnBatch = null;
|
|
batchDepth--;
|
|
finishBatch();
|
|
if (writeBatchStart) writeBatchStart(); // TODO: When we support delay start of batch, optionally don't delay this
|
|
});
|
|
commitPromise = null; // reset the commit promise, can't know if it is really a new transaction prior to finishWrite being called
|
|
flushPromise = null;
|
|
writeBatchStart = writeInstructions(1, store);
|
|
outstandingBatchCount = 0;
|
|
batchDepth++;
|
|
}
|
|
targetBytes = dynamicBytes;
|
|
position = targetBytes.position;
|
|
}
|
|
let uint32 = targetBytes.uint32,
|
|
float64 = targetBytes.float64;
|
|
let flagPosition = position << 1; // flagPosition is the 32-bit word starting position
|
|
|
|
// don't increment position until we are sure we don't have any key writing errors
|
|
if (!uint32) {
|
|
throw new Error('Internal buffers have been corrupted');
|
|
}
|
|
uint32[flagPosition + 1] = store.db.dbi;
|
|
if (flags & 4) {
|
|
let keyStartPosition = (position << 3) + 12;
|
|
let endPosition;
|
|
try {
|
|
endPosition = store.writeKey(key, targetBytes, keyStartPosition);
|
|
if (!(keyStartPosition < endPosition) && (flags & 0xf) != 12)
|
|
throw new Error(
|
|
'Invalid key or zero length key is not allowed in LMDB ' + key,
|
|
);
|
|
} catch (error) {
|
|
targetBytes.fill(0, keyStartPosition);
|
|
if (error.name == 'RangeError')
|
|
error = new Error(
|
|
'Key size is larger than the maximum key size (' + maxKeySize + ')',
|
|
);
|
|
throw error;
|
|
}
|
|
let keySize = endPosition - keyStartPosition;
|
|
if (keySize > maxKeySize) {
|
|
targetBytes.fill(0, keyStartPosition); // restore zeros
|
|
throw new Error(
|
|
'Key size is larger than the maximum key size (' + maxKeySize + ')',
|
|
);
|
|
}
|
|
uint32[flagPosition + 2] = keySize;
|
|
position = (endPosition + 16) >> 3;
|
|
if (flags & 2) {
|
|
let mustCompress;
|
|
if (valueBufferStart > -1) {
|
|
// if we have buffers with start/end position
|
|
// record pointer to value buffer
|
|
float64[position] =
|
|
(valueBuffer.address ||
|
|
(valueBuffer.address = getAddress(valueBuffer.buffer))) +
|
|
valueBufferStart;
|
|
if (store.compression) {
|
|
let compressionFlagIndex =
|
|
valueBufferStart + (store.compression.startingOffset || 0);
|
|
// this is the compression indicator, so we must compress
|
|
mustCompress =
|
|
compressionFlagIndex < valueBuffer.end &&
|
|
valueBuffer[compressionFlagIndex] >= 250;
|
|
}
|
|
} else {
|
|
let valueArrayBuffer = valueBuffer.buffer;
|
|
// record pointer to value buffer
|
|
let address =
|
|
(valueArrayBuffer.address ||
|
|
(valueBuffer.length === 0
|
|
? 0 // externally allocated buffers of zero-length with the same non-null-pointer can crash node, #161
|
|
: (valueArrayBuffer.address = getAddress(valueArrayBuffer)))) +
|
|
valueBuffer.byteOffset;
|
|
if (address <= 0 && valueBuffer.length > 0)
|
|
console.error('Supplied buffer had an invalid address', address);
|
|
float64[position] = address;
|
|
if (store.compression) {
|
|
let compressionFlagIndex = store.compression.startingOffset || 0;
|
|
// this is the compression indicator, so we must compress
|
|
mustCompress =
|
|
compressionFlagIndex < valueBuffer.length &&
|
|
valueBuffer[compressionFlagIndex] >= 250;
|
|
}
|
|
}
|
|
uint32[(position++ << 1) - 1] = valueSize;
|
|
if (
|
|
store.compression &&
|
|
(valueSize >= store.compression.threshold || mustCompress)
|
|
) {
|
|
flags |= 0x100000;
|
|
float64[position] = store.compression.address;
|
|
if (!writeTxn)
|
|
compress(env.address, uint32.address + (position << 3), () => {
|
|
// this is never actually called in NodeJS, just use to pin the buffer in memory until it is finished
|
|
// and is a no-op in Deno
|
|
if (!float64) throw new Error('No float64 available');
|
|
});
|
|
position++;
|
|
}
|
|
}
|
|
if (ifVersion !== undefined) {
|
|
if (ifVersion === null)
|
|
flags |= 0x10; // if it does not exist, MDB_NOOVERWRITE
|
|
else {
|
|
flags |= 0x100;
|
|
float64[position++] = ifVersion;
|
|
}
|
|
}
|
|
if (version !== undefined) {
|
|
flags |= 0x200;
|
|
float64[position++] = version || 0;
|
|
}
|
|
} else position++;
|
|
targetBytes.position = position;
|
|
if (writeTxn) {
|
|
uint32[0] = flags;
|
|
write(env.address, uint32.address);
|
|
return () =>
|
|
uint32[0] & FAILED_CONDITION ? SYNC_PROMISE_FAIL : SYNC_PROMISE_SUCCESS;
|
|
}
|
|
// if we ever use buffers that haven't been zero'ed, need to clear out the next slot like this:
|
|
// uint32[position << 1] = 0 // clear out the next slot
|
|
let nextUint32;
|
|
if (position > newBufferThreshold) {
|
|
// make new buffer and make pointer to it
|
|
let lastPosition = position;
|
|
targetBytes = allocateInstructionBuffer(position);
|
|
position = targetBytes.position;
|
|
nextUint32 = targetBytes.uint32;
|
|
} else nextUint32 = uint32;
|
|
let resolution = nextResolution;
|
|
// create the placeholder next resolution
|
|
nextResolution = resolution.next = {
|
|
// we try keep resolutions exactly the same object type
|
|
uint32: nextUint32,
|
|
flagPosition: position << 1,
|
|
flag: 0, // TODO: eventually eliminate this, as we can probably signify HAS_TXN/NO_RESOLVE/FAILED_CONDITION in upper bits
|
|
valueBuffer: fixedBuffer, // these are all just placeholders so that we have the right hidden class initially allocated
|
|
next: null,
|
|
meta: null,
|
|
};
|
|
lastQueuedResolution = resolution;
|
|
|
|
let writtenBatchDepth = batchDepth;
|
|
|
|
return (callback) => {
|
|
if (writtenBatchDepth) {
|
|
// If we are in a batch, the transaction can't close, so we do the faster,
|
|
// but non-deterministic updates, knowing that the write thread can
|
|
// just poll for the status change if we miss a status update.
|
|
// That is, if we are on x64 architecture...
|
|
if (arch === 'x64') {
|
|
writeStatus = uint32[flagPosition];
|
|
uint32[flagPosition] = flags;
|
|
} else {
|
|
// However, on ARM processors, apparently more radical memory reordering can occur
|
|
// so we need to use the slower atomic operation to ensure that a memory barrier is set
|
|
// and that the value pointer is actually written before the flag is updated
|
|
writeStatus = Atomics.or(uint32, flagPosition, flags);
|
|
}
|
|
if (writeBatchStart && !writeStatus) {
|
|
outstandingBatchCount += 1 + (valueSize >> 12);
|
|
if (outstandingBatchCount > batchStartThreshold) {
|
|
outstandingBatchCount = 0;
|
|
writeBatchStart();
|
|
writeBatchStart = null;
|
|
}
|
|
}
|
|
} // otherwise the transaction could end at any time and we need to know the
|
|
// deterministically if it is ending, so we can reset the commit promise
|
|
// so we use the slower atomic operation
|
|
else writeStatus = Atomics.or(uint32, flagPosition, flags);
|
|
|
|
outstandingWriteCount++;
|
|
if (writeStatus & TXN_DELIMITER) {
|
|
commitPromise = null; // TODO: Don't reset these if this comes from the batch start operation on an event turn batch
|
|
flushPromise = null;
|
|
flushResolvers = [];
|
|
queueCommitResolution(resolution);
|
|
if (!startAddress) {
|
|
startAddress = uint32.address + (flagPosition << 2);
|
|
}
|
|
}
|
|
if (!writtenBatchDepth && batchFlushResolvers.length > 0) {
|
|
flushResolvers.push(...batchFlushResolvers);
|
|
batchFlushResolvers = [];
|
|
}
|
|
if (!flushPromise && overlappingSync) {
|
|
flushPromise = new Promise((resolve) => {
|
|
if (writtenBatchDepth) {
|
|
batchFlushResolvers.push(resolve);
|
|
} else {
|
|
flushResolvers.push(resolve);
|
|
}
|
|
});
|
|
}
|
|
if (writeStatus & WAITING_OPERATION) {
|
|
// write thread is waiting
|
|
write(env.address, 0);
|
|
}
|
|
if (outstandingWriteCount > BACKPRESSURE_THRESHOLD && !writeBatchStart) {
|
|
if (!backpressureArray)
|
|
backpressureArray = new Int32Array(new SharedArrayBuffer(4), 0, 1);
|
|
Atomics.wait(
|
|
backpressureArray,
|
|
0,
|
|
0,
|
|
Math.round(outstandingWriteCount / BACKPRESSURE_THRESHOLD),
|
|
);
|
|
}
|
|
if (startAddress) {
|
|
if (eventTurnBatching)
|
|
startWriting(); // start writing immediately because this has already been batched/queued
|
|
else if (!enqueuedCommit && txnStartThreshold) {
|
|
enqueuedCommit =
|
|
commitDelay == 0 && typeof setImmediate != 'undefined'
|
|
? setImmediate(() => startWriting())
|
|
: setTimeout(() => startWriting(), commitDelay);
|
|
} else if (outstandingWriteCount > txnStartThreshold) startWriting();
|
|
}
|
|
|
|
if ((outstandingWriteCount & 7) === 0) resolveWrites();
|
|
|
|
if (store.cache) {
|
|
resolution.meta = {
|
|
key,
|
|
store,
|
|
valueSize: valueBuffer ? valueBuffer.length : 0,
|
|
};
|
|
}
|
|
resolution.valueBuffer = valueBuffer;
|
|
|
|
if (callback) {
|
|
if (callback === IF_EXISTS) ifVersion = IF_EXISTS;
|
|
else {
|
|
let meta = resolution.meta || (resolution.meta = {});
|
|
meta.reject = callback;
|
|
meta.resolve = (value) => callback(null, value);
|
|
return;
|
|
}
|
|
}
|
|
// if it is not conditional because of ifVersion or has any flags that can make the write conditional
|
|
if (ifVersion === undefined && !(flags & 0x22030)) {
|
|
if (writtenBatchDepth > 1) {
|
|
if (!resolution.flag && !store.cache) resolution.flag = NO_RESOLVE;
|
|
return PROMISE_SUCCESS; // or return undefined?
|
|
}
|
|
if (commitPromise) {
|
|
if (!resolution.flag) resolution.flag = NO_RESOLVE;
|
|
} else {
|
|
commitPromise = new Promise((resolve, reject) => {
|
|
let meta = resolution.meta || (resolution.meta = {});
|
|
meta.resolve = resolve;
|
|
resolve.unconditional = true;
|
|
meta.reject = reject;
|
|
});
|
|
if (separateFlushed)
|
|
commitPromise.flushed = overlappingSync
|
|
? flushPromise
|
|
: commitPromise;
|
|
}
|
|
return commitPromise;
|
|
}
|
|
lastWritePromise = new Promise((resolve, reject) => {
|
|
let meta = resolution.meta || (resolution.meta = {});
|
|
meta.resolve = resolve;
|
|
meta.reject = reject;
|
|
});
|
|
if (separateFlushed)
|
|
lastWritePromise.flushed = overlappingSync
|
|
? flushPromise
|
|
: lastWritePromise;
|
|
return lastWritePromise;
|
|
};
|
|
}
|
|
let committedFlushResolvers,
|
|
lastSync = Promise.resolve();
|
|
function startWriting() {
|
|
if (enqueuedCommit) {
|
|
clearImmediate(enqueuedCommit);
|
|
enqueuedCommit = null;
|
|
}
|
|
let resolvers = flushResolvers;
|
|
let start = Date.now();
|
|
env.startWriting(startAddress, (status) => {
|
|
if (dynamicBytes.uint32[dynamicBytes.position << 1] & TXN_DELIMITER)
|
|
queueCommitResolution(nextResolution);
|
|
|
|
resolveWrites(true);
|
|
switch (status) {
|
|
case 0:
|
|
for (let resolver of resolvers) {
|
|
resolver();
|
|
}
|
|
break;
|
|
case 1:
|
|
break;
|
|
case 2:
|
|
hasUnresolvedTxns = false;
|
|
executeTxnCallbacks();
|
|
return hasUnresolvedTxns;
|
|
break;
|
|
default:
|
|
try {
|
|
lmdbError(status);
|
|
} catch (error) {
|
|
console.error(error);
|
|
if (commitRejectPromise) {
|
|
commitRejectPromise.reject(error);
|
|
commitRejectPromise = null;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
startAddress = 0;
|
|
}
|
|
|
|
function queueCommitResolution(resolution) {
|
|
if (!(resolution.flag & HAS_TXN)) {
|
|
resolution.flag = HAS_TXN;
|
|
if (txnResolution) {
|
|
txnResolution.nextTxn = resolution;
|
|
//outstandingWriteCount = 0
|
|
} else txnResolution = resolution;
|
|
}
|
|
}
|
|
var TXN_DONE = TXN_COMMITTED | TXN_FAILED;
|
|
function resolveWrites(async) {
|
|
// clean up finished instructions
|
|
let instructionStatus;
|
|
while (
|
|
(instructionStatus =
|
|
unwrittenResolution.uint32[unwrittenResolution.flagPosition]) &
|
|
0x1000000
|
|
) {
|
|
if (unwrittenResolution.callbacks) {
|
|
nextTxnCallbacks.push(unwrittenResolution.callbacks);
|
|
unwrittenResolution.callbacks = null;
|
|
}
|
|
outstandingWriteCount--;
|
|
if (unwrittenResolution.flag !== HAS_TXN) {
|
|
if (
|
|
unwrittenResolution.flag === NO_RESOLVE &&
|
|
!unwrittenResolution.meta
|
|
) {
|
|
// in this case we can completely remove from the linked list, clearing more memory
|
|
lastPromisedResolution.next = unwrittenResolution =
|
|
unwrittenResolution.next;
|
|
continue;
|
|
}
|
|
unwrittenResolution.uint32 = null;
|
|
}
|
|
unwrittenResolution.valueBuffer = null;
|
|
unwrittenResolution.flag = instructionStatus;
|
|
lastPromisedResolution = unwrittenResolution;
|
|
unwrittenResolution = unwrittenResolution.next;
|
|
}
|
|
while (
|
|
txnResolution &&
|
|
(instructionStatus =
|
|
txnResolution.uint32[txnResolution.flagPosition] & TXN_DONE)
|
|
) {
|
|
if (instructionStatus & TXN_FAILED) rejectCommit();
|
|
else resolveCommit(async);
|
|
}
|
|
}
|
|
|
|
function resolveCommit(async) {
|
|
afterCommit(txnResolution.uint32[txnResolution.flagPosition - 1]);
|
|
if (async) resetReadTxn();
|
|
else queueMicrotask(resetReadTxn); // TODO: only do this if there are actually committed writes?
|
|
do {
|
|
if (uncommittedResolution.meta && uncommittedResolution.meta.resolve) {
|
|
let resolve = uncommittedResolution.meta.resolve;
|
|
if (
|
|
uncommittedResolution.flag & FAILED_CONDITION &&
|
|
!resolve.unconditional
|
|
)
|
|
resolve(false);
|
|
else resolve(true);
|
|
}
|
|
} while (
|
|
(uncommittedResolution = uncommittedResolution.next) &&
|
|
uncommittedResolution != txnResolution
|
|
);
|
|
txnResolution = txnResolution.nextTxn;
|
|
}
|
|
var commitRejectPromise;
|
|
function rejectCommit() {
|
|
afterCommit();
|
|
if (!commitRejectPromise) {
|
|
let rejectFunction;
|
|
commitRejectPromise = new Promise(
|
|
(resolve, reject) => (rejectFunction = reject),
|
|
);
|
|
commitRejectPromise.reject = rejectFunction;
|
|
}
|
|
do {
|
|
if (uncommittedResolution.meta && uncommittedResolution.meta.reject) {
|
|
let flag = uncommittedResolution.flag & 0xf;
|
|
let error = new Error('Commit failed (see commitError for details)');
|
|
error.commitError = commitRejectPromise;
|
|
uncommittedResolution.meta.reject(error);
|
|
}
|
|
} while (
|
|
(uncommittedResolution = uncommittedResolution.next) &&
|
|
uncommittedResolution != txnResolution
|
|
);
|
|
txnResolution = txnResolution.nextTxn;
|
|
}
|
|
function atomicStatus(uint32, flagPosition, newStatus) {
|
|
if (batchDepth) {
|
|
// if we are in a batch, the transaction can't close, so we do the faster,
|
|
// but non-deterministic updates, knowing that the write thread can
|
|
// just poll for the status change if we miss a status update
|
|
let writeStatus = uint32[flagPosition];
|
|
uint32[flagPosition] = newStatus;
|
|
return writeStatus;
|
|
//return Atomics.or(uint32, flagPosition, newStatus)
|
|
} // otherwise the transaction could end at any time and we need to know the
|
|
// deterministically if it is ending, so we can reset the commit promise
|
|
// so we use the slower atomic operation
|
|
else
|
|
try {
|
|
return Atomics.or(uint32, flagPosition, newStatus);
|
|
} catch (error) {
|
|
console.error(error);
|
|
return;
|
|
}
|
|
}
|
|
function afterCommit(txnId) {
|
|
for (let i = 0, l = afterCommitCallbacks.length; i < l; i++) {
|
|
try {
|
|
afterCommitCallbacks[i]({
|
|
next: uncommittedResolution,
|
|
last: txnResolution,
|
|
txnId,
|
|
});
|
|
} catch (error) {
|
|
console.error('In aftercommit callback', error);
|
|
}
|
|
}
|
|
}
|
|
async function executeTxnCallbacks() {
|
|
env.writeTxn = writeTxn = { write: true };
|
|
nextTxnCallbacks.isExecuting = true;
|
|
for (let i = 0; i < nextTxnCallbacks.length; i++) {
|
|
let txnCallbacks = nextTxnCallbacks[i];
|
|
for (let j = 0, l = txnCallbacks.length; j < l; j++) {
|
|
let userTxnCallback = txnCallbacks[j];
|
|
let asChild = userTxnCallback.asChild;
|
|
if (asChild) {
|
|
env.beginTxn(1); // abortable
|
|
let parentTxn = writeTxn;
|
|
env.writeTxn = writeTxn = { write: true };
|
|
try {
|
|
let result = userTxnCallback.callback();
|
|
if (result && result.then) {
|
|
hasUnresolvedTxns = true;
|
|
await result;
|
|
}
|
|
if (result === ABORT) env.abortTxn();
|
|
else env.commitTxn();
|
|
clearWriteTxn(parentTxn);
|
|
txnCallbacks[j] = result;
|
|
} catch (error) {
|
|
clearWriteTxn(parentTxn);
|
|
env.abortTxn();
|
|
txnError(error, txnCallbacks, j);
|
|
}
|
|
} else {
|
|
try {
|
|
let result = userTxnCallback();
|
|
txnCallbacks[j] = result;
|
|
if (result && result.then) {
|
|
hasUnresolvedTxns = true;
|
|
await result;
|
|
}
|
|
} catch (error) {
|
|
txnError(error, txnCallbacks, j);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
nextTxnCallbacks = [];
|
|
clearWriteTxn(null);
|
|
if (hasUnresolvedTxns) {
|
|
env.resumeWriting();
|
|
}
|
|
function txnError(error, txnCallbacks, i) {
|
|
(txnCallbacks.errors || (txnCallbacks.errors = []))[i] = error;
|
|
txnCallbacks[i] = CALLBACK_THREW;
|
|
}
|
|
}
|
|
function finishBatch() {
|
|
let bytes = dynamicBytes;
|
|
let uint32 = bytes.uint32;
|
|
let nextPosition = bytes.position + 1;
|
|
let writeStatus;
|
|
if (nextPosition > newBufferThreshold) {
|
|
allocateInstructionBuffer(nextPosition);
|
|
nextResolution.flagPosition = dynamicBytes.position << 1;
|
|
nextResolution.uint32 = dynamicBytes.uint32;
|
|
writeStatus = atomicStatus(uint32, bytes.position << 1, 2); // atomically write the end block
|
|
} else {
|
|
uint32[nextPosition << 1] = 0; // clear out the next slot
|
|
writeStatus = atomicStatus(uint32, bytes.position++ << 1, 2); // atomically write the end block
|
|
nextResolution.flagPosition += 2;
|
|
}
|
|
if (writeStatus & WAITING_OPERATION) {
|
|
write(env.address, 0);
|
|
}
|
|
}
|
|
function clearWriteTxn(parentTxn) {
|
|
// TODO: We might actually want to track cursors in a write txn and manually
|
|
// close them.
|
|
if (writeTxn && writeTxn.refCount > 0) writeTxn.isDone = true;
|
|
env.writeTxn = writeTxn = parentTxn || null;
|
|
}
|
|
Object.assign(LMDBStore.prototype, {
|
|
put(key, value, versionOrOptions, ifVersion) {
|
|
let callback,
|
|
flags = 15,
|
|
type = typeof versionOrOptions;
|
|
if (type == 'object' && versionOrOptions) {
|
|
if (versionOrOptions.noOverwrite) flags |= 0x10;
|
|
if (versionOrOptions.noDupData) flags |= 0x20;
|
|
if (versionOrOptions.instructedWrite) flags |= 0x2000;
|
|
if (versionOrOptions.append) flags |= 0x20000;
|
|
if (versionOrOptions.ifVersion != undefined)
|
|
ifVersion = versionOrOptions.ifVersion;
|
|
versionOrOptions = versionOrOptions.version;
|
|
if (typeof ifVersion == 'function') callback = ifVersion;
|
|
} else if (type == 'function') {
|
|
callback = versionOrOptions;
|
|
}
|
|
return writeInstructions(
|
|
flags,
|
|
this,
|
|
key,
|
|
value,
|
|
this.useVersions ? versionOrOptions || 0 : undefined,
|
|
ifVersion,
|
|
)(callback);
|
|
},
|
|
remove(key, ifVersionOrValue, callback) {
|
|
let flags = 13;
|
|
let ifVersion, value;
|
|
if (ifVersionOrValue !== undefined) {
|
|
if (typeof ifVersionOrValue == 'function') callback = ifVersionOrValue;
|
|
else if (ifVersionOrValue === IF_EXISTS && !callback)
|
|
// we have a handler for IF_EXISTS in the callback handler for remove
|
|
callback = ifVersionOrValue;
|
|
else if (this.useVersions) ifVersion = ifVersionOrValue;
|
|
else {
|
|
flags = 14;
|
|
value = ifVersionOrValue;
|
|
}
|
|
}
|
|
return writeInstructions(
|
|
flags,
|
|
this,
|
|
key,
|
|
value,
|
|
undefined,
|
|
ifVersion,
|
|
)(callback);
|
|
},
|
|
del(key, options, callback) {
|
|
return this.remove(key, options, callback);
|
|
},
|
|
ifNoExists(key, callback) {
|
|
return this.ifVersion(key, null, callback);
|
|
},
|
|
ifVersion(key, version, callback, options) {
|
|
if (!callback) {
|
|
return new Batch((operations, callback) => {
|
|
let promise = this.ifVersion(key, version, operations, options);
|
|
if (callback) promise.then(callback);
|
|
return promise;
|
|
});
|
|
}
|
|
if (writeTxn) {
|
|
if (version === undefined || this.doesExist(key, version)) {
|
|
callback();
|
|
return SYNC_PROMISE_SUCCESS;
|
|
}
|
|
return SYNC_PROMISE_FAIL;
|
|
}
|
|
let flags = key === undefined || version === undefined ? 1 : 4;
|
|
if (options?.ifLessThan) flags |= CONDITIONAL_VERSION_LESS_THAN;
|
|
if (options?.allowNotFound) flags |= CONDITIONAL_ALLOW_NOTFOUND;
|
|
let finishStartWrite = writeInstructions(
|
|
flags,
|
|
this,
|
|
key,
|
|
undefined,
|
|
undefined,
|
|
version,
|
|
);
|
|
let promise;
|
|
batchDepth += 2;
|
|
if (batchDepth > 2) promise = finishStartWrite();
|
|
else {
|
|
writeBatchStart = () => {
|
|
promise = finishStartWrite();
|
|
};
|
|
outstandingBatchCount = 0;
|
|
}
|
|
try {
|
|
if (typeof callback === 'function') {
|
|
callback();
|
|
} else {
|
|
for (let i = 0, l = callback.length; i < l; i++) {
|
|
let operation = callback[i];
|
|
this[operation.type](operation.key, operation.value);
|
|
}
|
|
}
|
|
} finally {
|
|
if (!promise) {
|
|
finishBatch();
|
|
batchDepth -= 2;
|
|
promise = finishStartWrite(); // finish write once all the operations have been written (and it hasn't been written prematurely)
|
|
writeBatchStart = null;
|
|
} else {
|
|
batchDepth -= 2;
|
|
finishBatch();
|
|
}
|
|
}
|
|
return promise;
|
|
},
|
|
batch(callbackOrOperations) {
|
|
return this.ifVersion(undefined, undefined, callbackOrOperations);
|
|
},
|
|
drop(callback) {
|
|
return writeInstructions(
|
|
1024 + 12,
|
|
this,
|
|
Buffer.from([]),
|
|
undefined,
|
|
undefined,
|
|
undefined,
|
|
)(callback);
|
|
},
|
|
clearAsync(callback) {
|
|
if (this.encoder) {
|
|
if (this.encoder.clearSharedData) this.encoder.clearSharedData();
|
|
else if (this.encoder.structures) this.encoder.structures = [];
|
|
}
|
|
return writeInstructions(
|
|
12,
|
|
this,
|
|
Buffer.from([]),
|
|
undefined,
|
|
undefined,
|
|
undefined,
|
|
)(callback);
|
|
},
|
|
_triggerError() {
|
|
finishBatch();
|
|
},
|
|
|
|
putSync(key, value, versionOrOptions, ifVersion) {
|
|
if (writeTxn)
|
|
return (
|
|
this.put(key, value, versionOrOptions, ifVersion) ===
|
|
SYNC_PROMISE_SUCCESS
|
|
);
|
|
else
|
|
return this.transactionSync(
|
|
() =>
|
|
this.put(key, value, versionOrOptions, ifVersion) ===
|
|
SYNC_PROMISE_SUCCESS,
|
|
overlappingSync ? 0x10002 : 2,
|
|
); // non-abortable, async flush
|
|
},
|
|
removeSync(key, ifVersionOrValue) {
|
|
if (writeTxn)
|
|
return this.remove(key, ifVersionOrValue) === SYNC_PROMISE_SUCCESS;
|
|
else
|
|
return this.transactionSync(
|
|
() => this.remove(key, ifVersionOrValue) === SYNC_PROMISE_SUCCESS,
|
|
overlappingSync ? 0x10002 : 2,
|
|
); // non-abortable, async flush
|
|
},
|
|
transaction(callback) {
|
|
if (writeTxn && !nextTxnCallbacks.isExecuting) {
|
|
// already nested in a transaction, just execute and return
|
|
return callback();
|
|
}
|
|
return this.transactionAsync(callback);
|
|
},
|
|
childTransaction(callback) {
|
|
if (useWritemap)
|
|
throw new Error(
|
|
'Child transactions are not supported in writemap mode',
|
|
);
|
|
if (writeTxn) {
|
|
let parentTxn = writeTxn;
|
|
let thisTxn = (env.writeTxn = writeTxn = { write: true });
|
|
env.beginTxn(1); // abortable
|
|
let callbackDone, finishTxn;
|
|
try {
|
|
return (writeTxn.childResults = when(
|
|
callback(),
|
|
(finishTxn = (result) => {
|
|
if (writeTxn !== thisTxn)
|
|
// need to wait for child txn to finish asynchronously
|
|
return writeTxn.childResults.then(() => finishTxn(result));
|
|
callbackDone = true;
|
|
if (result === ABORT) env.abortTxn();
|
|
else env.commitTxn();
|
|
clearWriteTxn(parentTxn);
|
|
return result;
|
|
}),
|
|
(error) => {
|
|
env.abortTxn();
|
|
clearWriteTxn(parentTxn);
|
|
throw error;
|
|
},
|
|
));
|
|
} catch (error) {
|
|
if (!callbackDone) env.abortTxn();
|
|
clearWriteTxn(parentTxn);
|
|
throw error;
|
|
}
|
|
}
|
|
return this.transactionAsync(callback, true);
|
|
},
|
|
transactionAsync(callback, asChild) {
|
|
let txnIndex;
|
|
let txnCallbacks;
|
|
if (lastQueuedResolution.callbacks) {
|
|
txnCallbacks = lastQueuedResolution.callbacks;
|
|
txnIndex =
|
|
txnCallbacks.push(asChild ? { callback, asChild } : callback) - 1;
|
|
} else if (nextTxnCallbacks.isExecuting) {
|
|
txnCallbacks = [asChild ? { callback, asChild } : callback];
|
|
txnCallbacks.results = commitPromise;
|
|
nextTxnCallbacks.push(txnCallbacks);
|
|
txnIndex = 0;
|
|
} else {
|
|
if (writeTxn)
|
|
throw new Error('Can not enqueue transaction during write txn');
|
|
let finishWrite = writeInstructions(
|
|
8 | (this.strictAsyncOrder ? 0x100000 : 0),
|
|
this,
|
|
);
|
|
txnCallbacks = [asChild ? { callback, asChild } : callback];
|
|
lastQueuedResolution.callbacks = txnCallbacks;
|
|
lastQueuedResolution.id = Math.random();
|
|
txnCallbacks.results = finishWrite();
|
|
txnIndex = 0;
|
|
}
|
|
return txnCallbacks.results.then((results) => {
|
|
let result = txnCallbacks[txnIndex];
|
|
if (result === CALLBACK_THREW) throw txnCallbacks.errors[txnIndex];
|
|
return result;
|
|
});
|
|
},
|
|
transactionSync(callback, flags) {
|
|
if (writeTxn) {
|
|
if (!useWritemap && (flags == undefined || flags & 1))
|
|
// can't use child transactions in write maps
|
|
// already nested in a transaction, execute as child transaction (if possible) and return
|
|
return this.childTransaction(callback);
|
|
let result = callback(); // else just run in current transaction
|
|
if (result == ABORT && !abortedNonChildTransactionWarn) {
|
|
console.warn(
|
|
'Can not abort a transaction inside another transaction with ' +
|
|
(this.cache ? 'caching enabled' : 'useWritemap enabled'),
|
|
);
|
|
abortedNonChildTransactionWarn = true;
|
|
}
|
|
return result;
|
|
}
|
|
let callbackDone, finishTxn;
|
|
this.transactions++;
|
|
if (!env.address)
|
|
throw new Error(
|
|
'The database has been closed and you can not transact on it',
|
|
);
|
|
env.beginTxn(flags == undefined ? 3 : flags);
|
|
let thisTxn = (writeTxn = env.writeTxn = { write: true });
|
|
try {
|
|
this.emit('begin-transaction');
|
|
return (writeTxn.childResults = when(
|
|
callback(),
|
|
(finishTxn = (result) => {
|
|
if (writeTxn !== thisTxn)
|
|
// need to wait for child txn to finish asynchronously
|
|
return writeTxn.childResults.then(() => finishTxn(result));
|
|
try {
|
|
callbackDone = true;
|
|
if (result === ABORT) env.abortTxn();
|
|
else {
|
|
env.commitTxn();
|
|
resetReadTxn();
|
|
}
|
|
return result;
|
|
} finally {
|
|
clearWriteTxn(null);
|
|
}
|
|
}),
|
|
(error) => {
|
|
try {
|
|
env.abortTxn();
|
|
} catch (e) {}
|
|
clearWriteTxn(null);
|
|
throw error;
|
|
},
|
|
));
|
|
} catch (error) {
|
|
if (!callbackDone)
|
|
try {
|
|
env.abortTxn();
|
|
} catch (e) {}
|
|
clearWriteTxn(null);
|
|
throw error;
|
|
}
|
|
},
|
|
getWriteTxnId() {
|
|
return env.getWriteTxnId();
|
|
},
|
|
transactionSyncStart(callback) {
|
|
return this.transactionSync(callback, 0);
|
|
},
|
|
// make the db a thenable/promise-like for when the last commit is committed
|
|
committed: (committed = {
|
|
then(onfulfilled, onrejected) {
|
|
if (commitPromise) return commitPromise.then(onfulfilled, onrejected);
|
|
if (lastWritePromise)
|
|
// always resolve to true
|
|
return lastWritePromise.then(() => onfulfilled(true), onrejected);
|
|
return SYNC_PROMISE_SUCCESS.then(onfulfilled, onrejected);
|
|
},
|
|
}),
|
|
flushed: {
|
|
// make this a thenable for when the commit is flushed to disk
|
|
then(onfulfilled, onrejected) {
|
|
if (flushPromise) flushPromise.hasCallbacks = true;
|
|
return Promise.all([flushPromise || committed, lastSyncTxnFlush]).then(
|
|
onfulfilled,
|
|
onrejected,
|
|
);
|
|
},
|
|
},
|
|
_endWrites(resolvedPromise, resolvedSyncPromise) {
|
|
this.put =
|
|
this.remove =
|
|
this.del =
|
|
this.batch =
|
|
this.removeSync =
|
|
this.putSync =
|
|
this.transactionAsync =
|
|
this.drop =
|
|
this.clearAsync =
|
|
() => {
|
|
throw new Error('Database is closed');
|
|
};
|
|
// wait for all txns to finish, checking again after the current txn is done
|
|
let finalPromise = flushPromise || commitPromise || lastWritePromise;
|
|
if (flushPromise) flushPromise.hasCallbacks = true;
|
|
let finalSyncPromise = lastSyncTxnFlush;
|
|
if (
|
|
(finalPromise && resolvedPromise != finalPromise) ||
|
|
(finalSyncPromise && resolvedSyncPromise != finalSyncPromise)
|
|
) {
|
|
return Promise.all([finalPromise, finalSyncPromise]).then(
|
|
() => this._endWrites(finalPromise, finalSyncPromise),
|
|
() => this._endWrites(finalPromise, finalSyncPromise),
|
|
);
|
|
}
|
|
Object.defineProperty(env, 'sync', { value: null });
|
|
},
|
|
on(event, callback) {
|
|
if (event == 'beforecommit') {
|
|
eventTurnBatching = true;
|
|
beforeCommitCallbacks.push(callback);
|
|
} else if (event == 'aftercommit') afterCommitCallbacks.push(callback);
|
|
else if (event == 'committed') {
|
|
this.getUserSharedBuffer('__committed__', new ArrayBuffer(0), {
|
|
envKey: true,
|
|
callback,
|
|
});
|
|
} else super.on(event, callback);
|
|
},
|
|
});
|
|
}
|
|
|
|
class Batch extends Array {
|
|
constructor(callback) {
|
|
super();
|
|
this.callback = callback;
|
|
}
|
|
put(key, value) {
|
|
this.push({ type: 'put', key, value });
|
|
}
|
|
del(key) {
|
|
this.push({ type: 'del', key });
|
|
}
|
|
clear() {
|
|
this.length = 0;
|
|
}
|
|
write(callback) {
|
|
return this.callback(this, callback);
|
|
}
|
|
}
|
|
export function asBinary(buffer) {
|
|
return {
|
|
['\x10binary-data\x02']: buffer,
|
|
};
|
|
}
|