Skip to content

Commit

Permalink
sync some text files as ytext
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed May 27, 2024
1 parent 6f8133f commit d50c7f1
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 14 deletions.
32 changes: 31 additions & 1 deletion src/api/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import * as authorization from '../api/authorization.js'
import * as protocol from '../protocol.js'
import * as isodb from 'isodb'
import * as wsUtils from '../comms/websocket-utils.js'
import * as error from 'lib0/error'

/**
* @typedef {import('../ystream.js').Ystream} Ystream
Expand Down Expand Up @@ -391,6 +390,37 @@ export const getLww = async (tr, ystream, owner, collection, key) => {
return lww === null ? undefined : lww.op.val
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {Ystream} ystream
* @param {Uint8Array} owner
* @param {string} collection
* @param {string} docid
* @return {Promise<Array<Uint8Array>|null>}
*/
export const getYDocUpdates = async (tr, ystream, owner, collection, docid) => {
const [
updates,
isDeleted
] = await promise.all([
getDocOps(tr, ystream, owner, collection, docid, operations.OpYjsUpdateType, 0),
isDocDeleted(tr, ystream, owner, collection, docid)
])
return isDeleted ? null : updates.map(update => update.op.update)
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {Ystream} ystream
* @param {Uint8Array} owner
* @param {string} collection
* @param {string} docid
* @param {Uint8Array} update
*/
export const addYDocUpdate = async (tr, ystream, owner, collection, docid, update) => {
await addOp(tr, ystream, owner, collection, docid, new operations.OpYjsUpdate(update))
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {Ystream} ystream
Expand Down
120 changes: 110 additions & 10 deletions src/extensions/fs.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import chokidar from 'chokidar'
import fs from 'node:fs'
import path from 'node:path'
import * as Y from 'yjs'

import * as random from 'lib0/random'
import * as array from 'lib0/array'
Expand All @@ -9,6 +10,25 @@ import * as error from 'lib0/error'
import * as Ystream from '@y/stream' // eslint-disable-line
import * as actions from '@y/stream/api/actions' // eslint-disable-line
import * as logging from 'lib0/logging'
import * as diff from 'lib0/diff'

const textFileExtensions = new Set([
'txt', 'md', 'js', 'ts', 'tsx', 'jsx'
])

const _fileExtensionRegex = /.+\.(\w+)$/
/**
* @param {string} fname
*/
const getFileExtension = fname => _fileExtensionRegex.exec(fname)?.[1] ?? null

/**
* @param {string} fname
*/
const isTextFile = (fname) => {
const ext = getFileExtension(fname)
return ext != null ? textFileExtensions.has(ext) : false
}

/**
* @extends {observable.ObservableV2<{}>}
Expand Down Expand Up @@ -52,6 +72,8 @@ export default class Yfs extends observable.ObservableV2 {
}
// @todo start consuming starting at last checkpoint
this.ystream.on('ops', this._opsObserver)
this._destroyObserver = this.destroy.bind(this)
this.ystream.on('destroy', this._destroyObserver)

this.chokidarWatch = chokidar.watch(observePath, { ignoreInitial: false, ignored: /\.ystream/ /*, awaitWriteFinish: true */ })
.on('all', (type, cwdPath) => {
Expand All @@ -60,6 +82,13 @@ export default class Yfs extends observable.ObservableV2 {
let content = null
if (type === 'add' || type === 'change') {
content = fs.readFileSync(path.join(observePath, observeRelPath))
try {
if (isTextFile(observeRelPath)) {
content = content.toString('utf8')
}
} catch (e) {
console.warn('error parsing text file', e)
}
}
_eventsToCompute.push({ type, path: observeRelPath, content })
if (_eventsToCompute.length === 1) _computeEvents(this)
Expand All @@ -77,7 +106,7 @@ export default class Yfs extends observable.ObservableV2 {
* @param {import('@y/stream').YTransaction} tr
* @param {Yfs} yfs
* @param {string} docid
* @return {Promise<{ type: 'binaryFile', content: Buffer }|{ type: 'dir' }|{ type: 'skip' }|null>}
* @return {Promise<{ type: 'binaryFile', content: Buffer }|{ type: 'dir' }|{ type: 'skip' }|{ type: 'text', content: Y.Text }|null>}
*/
const getFileContent = async (tr, yfs, docid) => {
const fi = await yfs.ycollection.getFileInfo(tr, docid)
Expand All @@ -92,6 +121,17 @@ const getFileContent = async (tr, yfs, docid) => {
}
return { type: 'binaryFile', content }
}
if (fi.ftype === 'text') {
const ydoc = new Y.Doc()
const yupdates = await yfs.ycollection.getYdocUpdates(tr, docid)
if (yupdates == null) return null
ydoc.transact(tr => {
yupdates.forEach(update => {
Y.applyUpdateV2(ydoc, update)
})
})
return { type: 'text', content: ydoc.getText() }
}
if (fi.ftype === 'dir') {
return { type: 'dir' }
}
Expand Down Expand Up @@ -147,6 +187,19 @@ const _renderFiles = async (yfs) => {
fs.writeFileSync(strPath, ycontent.content)
// console.log('file written!', { strPath })
}
} else if (ycontent.type === 'text') {
const ycontentStr = ycontent.content.toString()
// console.log('trying to read file', { strPath, docPath, docid })
const fileContent = fs.existsSync(strPath) ? fs.readFileSync(strPath) : null
let fileContentStr = null
if (fileContent != null) {
try {
fileContentStr = fileContent.toString('utf8')
} catch (e) { /* nop */ }
}
if (fileContentStr !== ycontentStr) {
fs.writeFileSync(strPath, ycontentStr)
}
} else {
// console.log('checking if folder exists', { strPath })
if (!fs.existsSync(strPath)) {
Expand Down Expand Up @@ -228,19 +281,66 @@ const _computeEvents = async yfs => {
// filePath,
// ids: await ycollection.getDocIdsFromPath(tr, null, filePath)
// })
const { docid, isNew } = await mkPath(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, null, arrPath, 'binary')
//
const isTextContent = typeof event.content === 'string'
const { docid, isNew } = await mkPath(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, null, arrPath, isTextContent ? 'text' : 'binary')
if (isNew) {
// console.log('created file', { filePath, eventContent: event.content?.toString().slice(0, 50) })
await ycollection.setLww(tr, docid, event.content)
} else {
const currContent = await ycollection.getLww(tr, docid)
// console.log('updating file', { filePath, currContent: Buffer.from(currContent).toString().slice(0, 50), eventContent: event.content?.toString().slice(0, 50) })
if (Buffer.isBuffer(event.content) && currContent instanceof Uint8Array && array.equalFlat(currContent, event.content)) {
// console.log('nop...')
// nop
if (isTextContent) {
const ydoc = new Y.Doc()
ydoc.getText().insert(0, /** @type {string} */ (event.content))
await actions.addYDocUpdate(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, docid, Y.encodeStateAsUpdateV2(ydoc))
} else {
await ycollection.setLww(tr, docid, event.content)
}
} else {
if (isTextContent) {
const currDocUpdates = await ycollection.getYdocUpdates(tr, docid)
const currDoc = new Y.Doc()
if (currDocUpdates != null) {
currDoc.transact(() => {
currDocUpdates.forEach(update => {
Y.applyUpdateV2(currDoc, update)
})
})
}
const textContent = /** @type {string} */ (event.content)
const d = diff.simpleDiffString(currDoc.getText().toString(), textContent)
// apply diff and catch the updates
/**
* @type {Array<Uint8Array>}
*/
const updates = []
currDoc.on('updateV2', update => updates.push(update))
console.log('delta', d)
/**
* @type {Array<any>}
*/
const qdelta = [{ retain: d.index }]
if (d.remove > 0) {
qdelta.push({ delete: d.remove })
}
if (d.remove > 0) {
qdelta.push({ delete: d.remove })
}
if (d.insert.length > 0) {
qdelta.push({ insert: d.insert })
}
if (qdelta.length > 1) {
currDoc.getText().applyDelta(qdelta)
}
for (let i = 0; i < updates.length; i++) {
actions.addYDocUpdate(tr, ycollection.ystream, ycollection.ownerBin, ycollection.collection, docid, updates[i])
}
} else {
const currContent = await ycollection.getLww(tr, docid)
// console.log('updating file', { filePath, currContent: Buffer.from(currContent).toString().slice(0, 50), eventContent: event.content?.toString().slice(0, 50) })
if (Buffer.isBuffer(event.content) && currContent instanceof Uint8Array && array.equalFlat(currContent, event.content)) {
// console.log('nop...')
// nop
} else {
await ycollection.setLww(tr, docid, event.content)
}
}
}
break
}
Expand Down
21 changes: 19 additions & 2 deletions src/ystream.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export class YTransaction {
}

/**
* @extends ObservableV2<{ ops:function(Array<dbtypes.OpValue>,any,boolean):void, authenticate:function():void, "collection-opened":(collection:Collection)=>void }>
* @extends ObservableV2<{ ops:function(Array<dbtypes.OpValue>,any,boolean):void, authenticate:function():void, "collection-opened":(collection:Collection)=>void, "destroy": (ystream: Ystream)=>void }>
*/
export class Ystream extends ObservableV2 {
/**
Expand Down Expand Up @@ -210,6 +210,7 @@ export class Ystream extends ObservableV2 {
})
this.commHandlers.forEach(handler => handler.destroy())
bc.unsubscribe('@y/stream#' + this.dbname, this._esub)
this.emit('destroy', [this])
return this._db.destroy()
}
}
Expand Down Expand Up @@ -256,6 +257,23 @@ export class Collection extends ObservableV2 {
return ydoc
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {string} docid
*/
getYdocUpdates (tr, docid) {
return actions.getYDocUpdates(tr, this.ystream, this.ownerBin, this.collection, docid)
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {string} docid
* @param {Uint8Array} update
*/
addYdocUpdate (tr, docid, update) {
return actions.addYDocUpdate(tr, this.ystream, this.ownerBin, this.collection, docid, update)
}

/**
* @param {import('@y/stream').YTransaction} tr
* @param {string} docid
Expand Down Expand Up @@ -288,7 +306,6 @@ export class Collection extends ObservableV2 {
* @param {import('@y/stream').YTransaction} tr
* @param {string} docid
* @param {number} [endLocalClock]
* @return {Promise<Array<{ docid: string, docname: string, ftype: 'binary'|'dir'|'text' }>>}
*/
getDocPath (tr, docid, endLocalClock) {
return actions.getDocPath(tr, this.ystream, this.ownerBin, this.collection, docid, endLocalClock)
Expand Down
2 changes: 1 addition & 1 deletion tests/yfs.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export const testYfsBasics = async tc => {
cp.execSync('echo newcontent > ./tmp/clone/index.js')
await promise.wait(300)
await waitFilesSynced()
t.compare(fs.readFileSync('./tmp/init/index.js').toString(), 'newcontent\n')
t.compare(fs.readFileSync('./tmp/init/index.js').toString('utf8'), 'newcontent')
})
await t.measureTimeAsync('copy node_modules', async () => {
cp.execSync('cp -rf ./node_modules ./tmp/init/')
Expand Down

0 comments on commit d50c7f1

Please sign in to comment.