Skip to content

Commit

Permalink
implement prototype for filesystem crdt
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed Apr 26, 2024
1 parent d6ff95b commit 8b55042
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 76 deletions.
85 changes: 38 additions & 47 deletions src/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const createOpsReader = (ystream, startClock, owner, collection) => {
return ystream.db.transact(async tr => {
do {
/**
* @type {Array<dbtypes.OpValue<operations.OpTypes>>}
* @type {Array<dbtypes.OpValue>}
*/
let ops = []
if (owner != null && collection != null) {
Expand Down Expand Up @@ -152,7 +152,7 @@ export const getUnsyncedDocs = (ystream, owner, collection) => ystream.db.transa
})

/**
* @template {operations.OpTypes} OP
* @template {operations.OpTypes|operations.AbstractOp} OP
* @template {{ value: dbtypes.OpValue<OP>, fkey: isodb.AutoKey }} UPDATE
* @param {Ystream} ystream
* @param {Array<UPDATE>} updates
Expand All @@ -166,39 +166,6 @@ const _updateOpClocksHelper = (ystream, updates) => updates.map(update => {
return update
})

/**
* @param {Ystream} ystream
* @param {number} clock
*/
export const getOps = async (ystream, clock) => {
const ops = await ystream.db.transact(tr =>
tr.tables.oplog.getEntries({ start: new isodb.AutoKey(clock) })
)
return utils.mergeOps(ops.map(update => {
update.value.localClock = update.key.v
if (update.value.client === ystream.clientid) {
update.value.clock = update.key.v
}
return update.value
}), clock === 0)
}

/**
* @param {Ystream} ystream
* @param {Uint8Array} owner
* @param {string} collection
* @param {number} clock
*/
export const getCollectionOps = async (ystream, owner, collection, clock) => {
const ops = await ystream.db.transact(tr =>
tr.tables.oplog.indexes.collection.getEntries({
start: new dbtypes.CollectionKey(owner, collection, clock),
end: new dbtypes.CollectionKey(owner, collection, number.HIGHEST_UINT32)
})
)
return utils.mergeOps(_updateOpClocksHelper(ystream, ops).map(entry => entry.value), clock === 0)
}

/**
* @template {operations.OpTypeIds} TYPE
* @param {Ystream} ystream
Expand Down Expand Up @@ -269,27 +236,47 @@ export const getDocOpsLast = async (ystream, owner, collection, doc, type) => {
* @param {TYPE} type
* @return {Promise<dbtypes.OpValue<InstanceType<operations.typeMap[TYPE]>>|null>}
*/
export const getDocOpsMerged = (ystream, owner, collection, doc, type) => getDocOps(ystream, owner, collection, doc, type, 0).then(ops => utils.mergeOps(ops, false)[0])
export const getDocOpsMerged = async (ystream, owner, collection, doc, type) => {
const ops = await getDocOps(ystream, owner, collection, doc, type, 0)
return utils.merge(ops, false)
}

/**
* @template {operations.OpTypeIds} TYPE
* @param {Ystream} ystream
* @param {Uint8Array} owner
* @param {string} collection
* @param {string} parent
* @return {Promise<Array<string>>}
*/
export const getDocChildren = async (ystream, owner, collection, parent) => {
const keys = await ystream.db.transact(tr =>
tr.tables.childDocs.getKeys({
prefix: { owner, collection, parent }
})
)
return keys.map(k => k.child)
}

/**
* @template {operations.OpTypeIds} TYPEID
* @template {InstanceType<operations.typeMap[TYPEID]>} TYPE
* @param {Ystream} ystream
* @param {Uint8Array} owner
* @param {string} collection
* @param {string} doc
* @param {TYPE} type
* @return {Promise<dbtypes.OpValue<InstanceType<operations.typeMap[TYPE]>>|null>}
* @param {TYPEID} type
* @return {Promise<dbtypes.OpValue<TYPE>|null>}
*/
export const mergeDocOps = (ystream, owner, collection, doc, type) =>
ystream.db.transact(async tr => {
const merged = await getDocOpsMerged(ystream, owner, collection, doc, type)
tr.tables.oplog.indexes.doc.removeRange({
start: new dbtypes.DocKey(type, owner, collection, doc, 0),
end: new dbtypes.DocKey(type, owner, collection, doc, number.HIGHEST_UINT32),
endExclusive: true
})
merged && tr.tables.oplog.add(merged)
return merged
const allOps = /** @type {Array<dbtypes.OpValue<TYPE>>} */ (await getDocOps(ystream, owner, collection, doc, type, 0))
const mergedOp = utils.merge(allOps, true)
if (mergedOp === null) return null
const opsToDelete = allOps.filter(op => mergedOp.client !== op.client && mergedOp.clock !== op.clock)
await promise.all(opsToDelete.map(/** @return {Promise<any>} */ op =>
promise.all([op.op.unintegrate(ystream, tr, /** @type {any} */ (op)), tr.tables.oplog.remove(op.localClock)])
))
return mergedOp
})

/**
Expand Down Expand Up @@ -381,6 +368,7 @@ export const addOp = async (ystream, owner, collection, doc, opv) => {
const op = await ystream.db.transact(async tr => {
const op = new dbtypes.OpValue(ystream.clientid, 0, owner, collection, doc, opv)
const key = await tr.tables.oplog.add(op)
opv.integrate(ystream, tr, op)
op.clock = key.v
op.localClock = key.v
tr.tables.clocks.set(new dbtypes.ClocksKey(op.client, owner, collection), new dbtypes.ClientClockValue(op.clock, op.clock))
Expand Down Expand Up @@ -474,6 +462,9 @@ export const applyRemoteOps = async (ystream, ops, user, origin) => {
const colperms = permissions.get(buffer.toBase64(op.owner))?.get(op.collection)
if (colperms?.get('*') || colperms?.get(op.doc)) {
const localClock = await tr.tables.oplog.add(op)
// @todo integrating concurrently might not work well in all cases. (at least not
// efficiently)
await op.op.integrate(ystream, tr, op)
op.localClock = localClock.v
clientClockEntries.set(encodeClocksKey(op.client, op.owner, op.collection), new dbtypes.ClientClockValue(op.clock, op.localClock))
filteredOpsPermsChecked.push(op)
Expand Down
4 changes: 4 additions & 0 deletions src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export const def = {
}
}
},
childDocs: {
key: dbtypes.ParentKey,
value: isodb.NoValue
},
clocks: {
key: dbtypes.ClocksKey,
value: dbtypes.ClientClockValue
Expand Down
57 changes: 56 additions & 1 deletion src/dbtypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import * as ecdsa from 'lib0/crypto/ecdsa'
/**
* @todo "owner" could be mapped to an integer
* @todo client should actually be a map to a deviceid
* @template {operations.OpTypes} [OP=any]
* @template {operations.OpTypes|operations.AbstractOp} [OP=operations.AbstractOp]
* @implements isodb.IEncodable
*/
export class OpValue {
Expand Down Expand Up @@ -498,3 +498,58 @@ export class NoPermissionIndexKey {
return new NoPermissionIndexKey(owner, collection, doc, clock)
}
}

/**
* @implements isodb.IEncodable
*/
export class ParentKey {
/**
* @param {Uint8Array} owner
* @param {string} collection
* @param {string} parent
* @param {string} child
* @param {number} localClock
*/
constructor (owner, collection, parent, child, localClock) {
this.owner = owner
this.collection = collection
this.parent = parent
this.child = child
this.localClock = localClock
}

/**
* @param {{ owner: Uint8Array, collection: string, parent: string }} prefix
*/
static prefix ({ owner, collection, parent }) {
return encoding.encode(encoder => {
encoding.writeVarUint8Array(encoder, owner)
encoding.writeVarString(encoder, collection)
encoding.writeVarString(encoder, parent)
})
}

/**
* @param {encoding.Encoder} encoder
*/
encode (encoder) {
encoding.writeVarUint8Array(encoder, this.owner)
encoding.writeVarString(encoder, this.collection)
encoding.writeVarString(encoder, this.parent)
encoding.writeTerminatedString(encoder, this.child)
encoding.writeVarUint(encoder, this.localClock)
}

/**
* @param {decoding.Decoder} decoder
* @return {isodb.IEncodable}
*/
static decode (decoder) {
const owner = decoding.readVarUint8Array(decoder)
const collection = decoding.readVarString(decoder)
const doc = decoding.readVarString(decoder)
const child = decoding.readTerminatedString(decoder)
const localClock = decoding.readVarUint(decoder)
return new this(owner, collection, doc, child, localClock)
}
}
Loading

0 comments on commit 8b55042

Please sign in to comment.