Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Commit

Permalink
add temporary snap file in dtable when flushing, move into regular sn…
Browse files Browse the repository at this point in the history
…ap after successful write, tests coming
  • Loading branch information
kevinwilson541 committed Oct 3, 2017
1 parent 5bada45 commit d58a8ac
Showing 1 changed file with 67 additions and 39 deletions.
106 changes: 67 additions & 39 deletions lib/dtable.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,53 @@ const utils = require("./utils"),

const defaults = consts.dtableOpts;

const ops = {
set: (key, val, table) => {
table.set(key, val);
return val;
},
sset: (key, val, table) => {
const out = table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sset", key, typeof out);
}
out.add(val);
table.set(key, out);
return out;
},
hset: (key, hkey, val, table) => {
const out = table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hset", key, typeof out);
}
out.set(hkey, val);
table.set(key, out);
return out;
},
del: (key, table) => {
table.delete(key);
},
sdel: (key, val, table) => {
const out = table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sdel", key, typeof out);
}
out.delete(val);
if (out.size === 0) table.delete(key);
},
hdel: (key, hkey, table) => {
const out = table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hdel", key, typeof out);
}
out.delete(hkey);
if (out.size === 0) table.delete(key);
},
clear: (table) => {
table.clear();
}
};

class DTable extends EventEmitter {
/**
*
Expand Down Expand Up @@ -76,6 +123,7 @@ class DTable extends EventEmitter {
start(name) {
this._name = name;
this._path = path.join(this._prefix, this._name + "_DATA.SNAP");
this._tmpDumpPath = path.join(this._prefix, this._name + "_DATA_PREV.SNAP");
this._aofPath = path.join(this._prefix, this._name + "_LATEST.LOG");
this._tmpAOFPath = path.join(this._prefix, this._name + "_PREV.LOG");
this._id = shortid.generate();
Expand Down Expand Up @@ -121,7 +169,8 @@ class DTable extends EventEmitter {
*/
load(cb) {
async.series([
_.partial(this._loadState).bind(this),
_.partial(this._loadState, this._path).bind(this),
_.partial(this._loadState, this._tmpDumpPath).bind(this),
_.partial(this._loadAOF, this._tmpAOFPath).bind(this),
_.partial(this._loadAOF, this._aofPath).bind(this)
], cb);
Expand Down Expand Up @@ -238,7 +287,7 @@ class DTable extends EventEmitter {
*
*/
set(key, val) {
this._table.set(key, val);
ops.set(key, val, this._table);
this._writeToLog("set", key, val);
this._updateWriteCount();
return val;
Expand All @@ -259,12 +308,7 @@ class DTable extends EventEmitter {
*
*/
sset(key, val) {
const out = this._table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sset", key, typeof out);
}
out.add(val);
this._table.set(key, out);
const out = ops.sset(key, val, this._table);
this._writeToLog("sset", key, val);
this._updateWriteCount();
return out;
Expand All @@ -286,12 +330,7 @@ class DTable extends EventEmitter {
*
*/
hset(key, hkey, val) {
const out = this._table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hset", key, typeof out);
}
out.set(hkey, val);
this._table.set(key, out);
const out = ops.hset(key, hkey, val, this._table);
this._writeToLog("hset", key, hkey, val);
this._updateWriteCount();
return out;
Expand All @@ -311,7 +350,7 @@ class DTable extends EventEmitter {
*
*/
del(key) {
this._table.delete(key);
ops.del(key, this._table);
this._writeToLog("del", key);
this._updateWriteCount();
return this;
Expand All @@ -332,12 +371,7 @@ class DTable extends EventEmitter {
*
*/
sdel(key, val) {
const out = this._table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sdel", key, typeof out);
}
out.delete(val);
if (out.size === 0) this._table.delete(key);
ops.sdel(key, val, this._table);
this._writeToLog("sdel", key, val);
this._updateWriteCount();
return this;
Expand All @@ -358,12 +392,7 @@ class DTable extends EventEmitter {
*
*/
hdel(key, hkey) {
const out = this._table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hdel", key, typeof out);
}
out.delete(hkey);
if (out.size === 0) this._table.delete(key);
ops.hdel(key, hkey, this._table);
this._writeToLog("hdel", key, hkey);
this._updateWriteCount();
return this;
Expand All @@ -381,7 +410,7 @@ class DTable extends EventEmitter {
*
*/
clear() {
this._table.clear();
ops.clear(this._table);
this._writeToLog("clear");
this._updateWriteCount();
return this;
Expand Down Expand Up @@ -613,12 +642,12 @@ class DTable extends EventEmitter {
if (called) return;
called = true;
wstream.end();
return cb(err);
fs.unlink(this._tmpDumpPath, _.partial(cb, err));
});
fstream.on("close", () => {
if (called) return;
called = true;
return cb();
fs.rename(this._tmpDumpPath, this._path, cb);
});
async.eachLimit(Object.keys(obj), 4, (key, next) => {
if (called) return next("Closed prematurely.");
Expand Down Expand Up @@ -664,7 +693,7 @@ class DTable extends EventEmitter {
}

_createFlushStreams() {
const fstream = fs.createWriteStream(this._path);
const fstream = fs.createWriteStream(this._tmpDumpPath);
let wstream;
if (this._compress) {
wstream = zlib.createGzip();
Expand All @@ -685,11 +714,11 @@ class DTable extends EventEmitter {
* @param {Function} cb
*
*/
_loadState(cb) {
fs.stat(this._path, (err) => {
_loadState(location, cb) {
fs.stat(location, (err) => {
if (err && err.code !== "ENOENT") return cb(err);
else if (err && err.code === "ENOENT") return cb();
const {fstream, rstream} = this._createLoadStreams();
const {fstream, rstream} = this._createLoadStreams(location);

const rline = readline.createInterface({
input: rstream
Expand Down Expand Up @@ -738,15 +767,13 @@ class DTable extends EventEmitter {
rline.once("close", () => {
if (called) return;
called = true;
this._queue.flush();
return cb();
});
rstream.on("error", (err) => {
if (called) return;
called = true;
rstream.resume();
rline.close();
this._queue.flush();
return cb(err);
});
rline.on("line", (line) => {
Expand All @@ -756,16 +783,17 @@ class DTable extends EventEmitter {
});

try {
this[lineObj.op].apply(this, lineObj.args);
lineObj.args.push(this._table);
ops[lineObj.op].apply(null, lineObj.args);
} catch (e) {
debug("failed to complete operation %s with args " + JSON.stringify(lineObj.args) + " from %s", lineObj.op, path);
}
});
});
}

_createLoadStreams() {
const fstream = fs.createReadStream(this._path);
_createLoadStreams(location) {
const fstream = fs.createReadStream(location);
let rstream;
if (this._compress) {
rstream = zlib.createGunzip();
Expand Down

0 comments on commit d58a8ac

Please sign in to comment.