diff --git a/src/sender.js b/src/sender.js index 8834f3a..71fd106 100644 --- a/src/sender.js +++ b/src/sender.js @@ -8,6 +8,21 @@ const crypto = require('crypto'); const DEFAULT_BUFFER_SIZE = 8192; +const SPACE_REPLACE_REGEX = / /g; +const SPACE_REPLACE_VALUE = '\\ '; +const EQUALS_REPLACE_REGEX = /=/g; +const EQUALS_REPLACE_VALUE = '\\='; +const DOUBLE_QOUTE_REPLACE_REGEX = /"/g; +const DOUBLE_QOUTE_REPLACE_VALUE = '\\"'; +const COMMA_REPLACE_REGEX = /,/g; +const COMMA_REPLACE_VALUE = '\\,'; +const NEWLINE_REPLACE_REGEX = /\n/g; +const NEWLINE_REPLACE_VALUE = '\\\n'; +const RETURN_REPLACE_REGEX = /\r/g; +const RETURN_REPLACE_VALUE = '\\\r'; +const BACKSLASH_REPLACE_REGEX = /\\/g; +const BACKSLASH_REPLACE_VALUE = '\\\\'; + /** @classdesc * The QuestDB client's API provides methods to connect to the database, ingest data and close the connection. *
@@ -202,7 +217,7 @@ class Sender {
}
validateTableName(table);
checkCapacity(this, [table]);
- writeEscaped(this, table);
+ writeName(this, table);
this.hasTable = true;
return this;
}
@@ -225,9 +240,9 @@ class Sender {
checkCapacity(this, [name, valueStr], 2 + name.length + valueStr.length);
write(this, ',');
validateColumnName(name);
- writeEscaped(this, name);
+ writeName(this, name);
write(this, '=');
- writeEscaped(this, valueStr);
+ writeNonQuotedValue(this, valueStr);
this.hasSymbols = true;
return this;
}
@@ -243,7 +258,7 @@ class Sender {
writeColumn(this, name, value, () => {
checkCapacity(this, [value], 2 + value.length);
write(this, '"');
- writeEscaped(this, value, true);
+ writeQuotedValue(this, value);
write(this, '"');
}, "string");
return this;
@@ -417,7 +432,7 @@ function writeColumn(sender, name, value, writeValue, valueType) {
checkCapacity(sender, [name], 2 + name.length);
write(sender, sender.hasColumns ? ',' : ' ');
validateColumnName(name);
- writeEscaped(sender, name);
+ writeName(sender, name);
write(sender, '=');
writeValue();
sender.hasColumns = true;
@@ -430,41 +445,25 @@ function write(sender, data) {
}
}
-function writeEscaped(sender, data, quoted = false) {
- for (const ch of data) {
- if (ch > '\\') {
- write(sender, ch);
- continue;
- }
+function writeNonQuotedValue(sender, value) {
+ write(sender, value.replace(BACKSLASH_REPLACE_REGEX, BACKSLASH_REPLACE_VALUE)
+ .replace(SPACE_REPLACE_REGEX, SPACE_REPLACE_VALUE)
+ .replace(EQUALS_REPLACE_REGEX, EQUALS_REPLACE_VALUE)
+ .replace(COMMA_REPLACE_REGEX, COMMA_REPLACE_VALUE)
+ .replace(NEWLINE_REPLACE_REGEX, NEWLINE_REPLACE_VALUE)
+ .replace(RETURN_REPLACE_REGEX, RETURN_REPLACE_VALUE));
+}
- switch (ch) {
- case ' ':
- case ',':
- case '=':
- if (!quoted) {
- write(sender, '\\');
- }
- write(sender, ch);
- break;
- case '\n':
- case '\r':
- write(sender, '\\');
- write(sender, ch);
- break;
- case '"':
- if (quoted) {
- write(sender, '\\');
- }
- write(sender, ch);
- break;
- case '\\':
- write(sender, '\\\\');
- break;
- default:
- write(sender, ch);
- break;
- }
- }
+function writeQuotedValue(sender, value) {
+ write(sender, value.replace(BACKSLASH_REPLACE_REGEX, BACKSLASH_REPLACE_VALUE)
+ .replace(DOUBLE_QOUTE_REPLACE_REGEX, DOUBLE_QOUTE_REPLACE_VALUE)
+ .replace(NEWLINE_REPLACE_REGEX, NEWLINE_REPLACE_VALUE)
+ .replace(RETURN_REPLACE_REGEX, RETURN_REPLACE_VALUE));
+}
+
+function writeName(sender, name) {
+ write(sender, name.replace(SPACE_REPLACE_REGEX, SPACE_REPLACE_VALUE)
+ .replace(EQUALS_REPLACE_REGEX, EQUALS_REPLACE_VALUE));
}
exports.Sender = Sender;
diff --git a/src/validation.js b/src/validation.js
index 29b84a9..39cab70 100644
--- a/src/validation.js
+++ b/src/validation.js
@@ -2,6 +2,15 @@
const QuestDBMaxFileNameLength = 127;
+// eslint-disable-next-line no-control-regex
+const INVALID_COLUMN_REGEX = /[?.,'"\\/:()+\-*%~\r\n\u{0000}\u{0001}\u{0002}\u{0003}\u{0004}\u{0005}\u{0006}\u{0007}\u{0008}\u{0009}\u{000B}\u{000C}\u{000E}\u{000F}\u{007F}\u{FEFF}]/u;
+// eslint-disable-next-line no-control-regex
+const INVALID_TABLE_REGEX = /[?,'"\\/:()+*%~\r\n\u{0000}\u{0001}\u{0002}\u{0003}\u{0004}\u{0005}\u{0006}\u{0007}\u{0008}\u{0009}\u{000B}\u{000C}\u{000E}\u{000F}\u{007F}\u{FEFF}]/u;
+const INVALID_TABLE_START_DOT_REGEX = /^\./;
+const INVALID_TABLE_END_DOT_REGEX = /\.$/;
+const INVALID_TABLE_MORE_DOTS_REGEX = /\.\./;
+const INVALID_DESIGNATED_REGEX = /\D/;
+
/**
* Validates a table name.
* Throws an error if table name is invalid.
@@ -16,52 +25,17 @@ function validateTableName(name) {
if (len === 0) {
throw new Error("Empty string is not allowed as table name");
}
- for (let i = 0; i < len; i++) {
- let ch = name[i];
- switch (ch) {
- case '.':
- if (i === 0 || i === len - 1 || name[i - 1] === '.')
- // single dot is allowed in the middle only
- // starting with a dot hides directory in Linux
- // ending with a dot can be trimmed by some Windows versions / file systems
- // double or triple dot looks suspicious
- // single dot allowed as compatibility,
- // when someone uploads 'file_name.csv' the file name used as the table name
- throw new Error("Table name cannot start or end with a dot and only a single dot allowed");
- break;
- case '?':
- case ',':
- case '\'':
- case '"':
- case '\\':
- case '/':
- case ':':
- case ')':
- case '(':
- case '+':
- case '*':
- case '%':
- case '~':
- case '\u0000':
- case '\u0001':
- case '\u0002':
- case '\u0003':
- case '\u0004':
- case '\u0005':
- case '\u0006':
- case '\u0007':
- case '\u0008':
- case '\u0009': // control characters, except \n.
- case '\u000B': // new line allowed for compatibility, there are tests to make sure it works
- case '\u000c':
- case '\r':
- case '\n':
- case '\u000e':
- case '\u000f':
- case '\u007f':
- case '\ufeff': // UTF-8 BOM (Byte Order Mark) can appear at the beginning of a character stream
- throw new Error(`Invalid character in table name: ${ch}`);
- }
+ if (INVALID_TABLE_REGEX.test(name)) {
+ throw new Error(`Invalid character in table name: ${name}`);
+ }
+ if (INVALID_TABLE_START_DOT_REGEX.test(name)) {
+ throw new Error(`Table name cannot start with a dot: ${name}`);
+ }
+ if (INVALID_TABLE_END_DOT_REGEX.test(name)) {
+ throw new Error(`Table name cannot end with a dot: ${name}`);
+ }
+ if (INVALID_TABLE_MORE_DOTS_REGEX.test(name)) {
+ throw new Error(`Only single dots allowed in table name: ${name}`);
}
}
@@ -79,43 +53,8 @@ function validateColumnName(name) {
if (len === 0) {
throw new Error("Empty string is not allowed as column name");
}
- for (const ch of name) {
- switch (ch) {
- case '?':
- case '.':
- case ',':
- case '\'':
- case '"':
- case '\\':
- case '/':
- case ':':
- case ')':
- case '(':
- case '+':
- case '-':
- case '*':
- case '%':
- case '~':
- case '\u0000':
- case '\u0001':
- case '\u0002':
- case '\u0003':
- case '\u0004':
- case '\u0005':
- case '\u0006':
- case '\u0007':
- case '\u0008':
- case '\u0009': // control characters, except \n
- case '\u000B':
- case '\u000c':
- case '\r':
- case '\n':
- case '\u000e':
- case '\u000f':
- case '\u007f':
- case '\ufeff': // UTF-8 BOM (Byte Order Mark) can appear at the beginning of a character stream
- throw new Error(`Invalid character in column name: ${ch}`);
- }
+ if (INVALID_COLUMN_REGEX.test(name)) {
+ throw new Error(`Invalid character in column name: ${name}`);
}
}
@@ -130,11 +69,8 @@ function validateDesignatedTimestamp(timestamp) {
if (len === 0) {
throw new Error("Empty string is not allowed as designated timestamp");
}
- for (let i = 0; i < len; i++) {
- let ch = timestamp[i];
- if (ch < '0' || ch > '9') {
- throw new Error(`Invalid character in designated timestamp: ${ch}`);
- }
+ if (INVALID_DESIGNATED_REGEX.test(timestamp)) {
+ throw new Error(`Invalid character in designated timestamp: ${timestamp}`);
}
}
diff --git a/test/flametest.js b/test/flametest.js
new file mode 100644
index 0000000..9fee502
--- /dev/null
+++ b/test/flametest.js
@@ -0,0 +1,51 @@
+'use strict';
+
+const { Sender } = require("../index");
+const { readFileSync } = require('fs');
+
+const PORT = 9009;
+const HOST = "127.0.0.1";
+
+const PRIVATE_KEY = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
+const PUBLIC_KEY = {
+ x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
+ y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
+};
+const JWK = {
+ ...PUBLIC_KEY,
+ kid: "testapp",
+ kty: "EC",
+ d: PRIVATE_KEY,
+ crv: "P-256",
+};
+
+const senderTLS = {
+ host: HOST,
+ port: PORT,
+ ca: readFileSync('certs/ca/ca.crt') // necessary only if the server uses self-signed certificate
+};
+
+async function run() {
+ const tableName = "test";
+
+ const sender = new Sender({bufferSize: 131072, jwk: JWK});
+ await sender.connect(senderTLS);
+ const numOfRows = 300000;
+ for (let i = 0; i < numOfRows; i++) {
+ sender.table(tableName)
+ .symbol("location", `emea${i}`).symbol("city", `budapest${i}`)
+ .stringColumn("hoppa", `hello${i}`).stringColumn("hippi", `hel${i}`).stringColumn("hippo", `haho${i}`)
+ .floatColumn("temperature", 12.1).intColumn("intcol", i)
+ .atNow();
+ if (i % 1000 === 0) {
+ await sender.flush();
+ }
+ }
+ await sender.flush();
+ await sender.close();
+
+ return 0;
+}
+
+run().then(value => console.log(value))
+ .catch(err => console.log(err));
diff --git a/test/perftest.js b/test/perftest.js
new file mode 100644
index 0000000..40bfa00
--- /dev/null
+++ b/test/perftest.js
@@ -0,0 +1,118 @@
+'use strict';
+
+const { Sender } = require("../index");
+const { readFileSync } = require('fs');
+const http = require("http");
+
+const HTTP_OK = 200;
+
+const PORT = 9009;
+const HOST = "127.0.0.1";
+
+const PRIVATE_KEY = "9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8";
+const PUBLIC_KEY = {
+ x: "aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc",
+ y: "__ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
+};
+const JWK = {
+ ...PUBLIC_KEY,
+ kid: "testapp",
+ kty: "EC",
+ d: PRIVATE_KEY,
+ crv: "P-256",
+};
+
+const senderTLS = {
+ host: HOST,
+ port: PORT,
+ ca: readFileSync('certs/ca/ca.crt') // necessary only if the server uses self-signed certificate
+};
+
+async function run() {
+ const tableName = "test";
+ await query(`drop table "${tableName}"`);
+
+ const sender = new Sender({bufferSize: 131072, jwk: JWK});
+ await sender.connect(senderTLS);
+ const numOfRows = 1000000;
+ for (let i = 0; i < numOfRows; i++) {
+ sender.table(tableName)
+ .symbol("location", `emea${i}`).symbol("city", `budapest${i}`)
+ .stringColumn("hoppa", `hello${i}`).stringColumn("hippi", `hel${i}`).stringColumn("hippo", `haho${i}`)
+ // .symbol("location", "emea").symbol("city", "budapest")
+ // .stringColumn("hoppa", "hello").stringColumn("hippi", "hel").stringColumn("hippo", "haho")
+ .floatColumn("temperature", 12.1).intColumn("intcol", i)
+ .atNow();
+ if (i % 1000 === 0) {
+ await sender.flush();
+ }
+ }
+ await sender.flush();
+ await sender.close();
+ console.info(`mem usage: ${getMemory()}`);
+
+ await waitForData(tableName, numOfRows);
+
+ const selectMin = await query(`select * from "${tableName}" where intcol=0`);
+ const dateMin = new Date(selectMin.dataset[0][7]);
+ const selectMax = await query(`select * from "${tableName}" where intcol=${numOfRows - 1}`);
+ const dateMax = new Date(selectMax.dataset[0][7]);
+ const elapsed = (dateMax.getTime() - dateMin.getTime()) / 1000;
+ console.info(`took: ${elapsed}, rate: ${numOfRows / elapsed} rows/s`);
+
+ return 0;
+}
+
+async function query(query) {
+ const options = {
+ hostname: "127.0.0.1",
+ port: 9000,
+ path: `/exec?query=${encodeURIComponent(query)}`,
+ method: 'GET',
+ };
+
+ return new Promise((resolve, reject) => {
+ const req = http.request(options, response => {
+ if (response.statusCode === HTTP_OK) {
+ response.on('data', data => {
+ resolve(JSON.parse(data.toString()));
+ });
+ } else {
+ reject(new Error(`HTTP request failed, statusCode=${response.statusCode}, query=${query}`));
+ }
+ });
+
+ req.on('error', error => {
+ reject(error);
+ });
+
+ req.end();
+ });
+}
+
+async function waitForData(table, expectedRowCount, timeout = 90000) {
+ const interval = 500;
+ const num = timeout / interval;
+ let selectResult;
+ for (let i = 0; i < num; i++) {
+ selectResult = await query(`select count(*) from "${table}"`);
+ if (selectResult && selectResult.dataset[0][0] === expectedRowCount) {
+ return selectResult;
+ }
+ await sleep(interval);
+ }
+ throw new Error(`Timed out while waiting for ${expectedRowCount} rows, table='${table}'`);
+}
+
+async function sleep(ms) {
+ return new Promise(resolve => setTimeout(resolve, ms));
+}
+
+function getMemory() {
+ return Object.entries(process.memoryUsage()).reduce((carry, [key, value]) => {
+ return `${carry}${key}:${Math.round(value / 1024 / 1024 * 100) / 100}MB;`;
+ }, "");
+}
+
+run().then(value => console.log(value))
+ .catch(err => console.log(err));
diff --git a/test/sender.test.js b/test/sender.test.js
index 6bf41b6..37688e2 100644
--- a/test/sender.test.js
+++ b/test/sender.test.js
@@ -391,7 +391,7 @@ describe('Sender message builder test suite (anything not covered in client inte
() => sender.table("tableName")
.symbol("name", "value")
.at("343434.5656")
- ).toThrow("Invalid character in designated timestamp: .");
+ ).toThrow("Invalid character in designated timestamp: 343434.5656");
});
it('throws exception if designated timestamp is set without any fields added', function () {