Skip to content

Commit

Permalink
add new ingest server
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Oct 23, 2024
1 parent d669b9e commit 57aec1b
Show file tree
Hide file tree
Showing 14 changed files with 470 additions and 258 deletions.
244 changes: 160 additions & 84 deletions bin/server.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import * as http from "http";
import LinkHeader from "http-link-header";
import lmdb, { open } from "lmdb";
import commandLineArgs from "command-line-args";
import { Lock } from "async-await-mutex-lock";
import { readFile } from "fs/promises";
import { createReadStream } from "fs";

import { Kafka, Producer } from "kafkajs";
import { writeFile } from "fs/promises";
import { PROV } from "@treecg/types";

const dbDefinitions: commandLineArgs.OptionDefinition[] = [
{ name: "database", alias: "d", type: String, defaultValue: "my-db" },
{
name: "kafka",
alias: "k",
type: String,
defaultValue: "127.0.0.1:9092",
multiple: true,
},
{ name: "topic", alias: "t", type: String, defaultValue: "mumo" },
{ name: "group", alias: "g", type: String, defaultValue: "reading-group" },
{ name: "clientId", alias: "c", type: String, defaultValue: "my-client" },
{ name: "help", alias: "h", type: Boolean, defaultValue: false },
{ name: "command", type: String, defaultOption: true },
];

const serverDefinitions: commandLineArgs.OptionDefinition[] = [
{ name: "file", alias: "f", type: String },
{ name: "port", alias: "p", type: String, defaultValue: 3000 },
{ name: "key", alias: "k", type: String, defaultValue: "mumo-is-cool" },
{ name: "host", alias: "h", type: String, defaultValue: "127.0.0.1" },
Expand All @@ -27,112 +40,144 @@ ${process.argv[0]} ${process.argv[1]} [OPTION]
server [SERVER_OPTIONS] start the server
batch [BATCH_OPTIONS] seed the server from data.bin
delete clear the database
log log messages in kafka topic
[OPTION]
--database, -d location of the database folder (default my-db)
--kafka, -k kafka endpoint (default 127.0.0.1:9092)
--group, -p kafka group (default reading-group)
--topic, -t kafka topic (default mumo)
--help, -h print this help message
[BATCH_OPTIONS]
--file, -f location of the data file
--start, -s starting index (default 0)
[SERVER_OPTIONS]
--file, -f location of datafile to write to
--host, -h host to bind server with (default 127.0.0.1)
--key, -k secret key to use (default mumo-is-cool)
--port, -p port to use (3000)
--port, -p port to use (default 3000)
`;

class Instance {
database: lmdb.Database;
kafka: Kafka;
producer: Producer;
topic: string;
host: string = "127.0.0.1";
port: number = 3000;
group: string;
lock: Lock<string>;

constructor(options: { database: string }) {
console.log("connecting with", options.database);
this.database = open({
path: options.database,
// any options go here, we can turn on compression like this:
compression: true,
constructor(options: {
kafka: string[];
topic: string;
clientId: string;
group: string;
}) {
console.log(options, options);
this.kafka = new Kafka({
clientId: options.clientId,
brokers: options.kafka,
});
this.producer = this.kafka.producer({ allowAutoTopicCreation: true });
this.topic = options.topic;
this.group = options.group;
this.lock = new Lock();
}

async init() {
await this.producer.connect();
}

async log_messages() {
// await this.reset();
const consumer = this.kafka.consumer({
groupId: this.group + Math.random(),
});
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: this.topic, fromBeginning: true });
const gropu = await consumer.describeGroup();
console.log(gropu);

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value!.toString(),
});
},
});

// await consumer.disconnect();
}

async ingest(str: string[]) {
// lets first clear it
console.log("ingesting", str.length);
let count = this.database.getCount();
await this.database.batch(() => {
for (const st of str) {
this.database.put(count, st);
count += 1;
}
});
let i = 0;
const step = 100;

while (i < str.length) {
console.log("at", i, str.length);
await this.producer.send({
topic: this.topic,
messages: str.slice(i, i + step).map((value) => ({ value })),
});

i = i + step;
}
}

async close() {
await this.database.close();
await this.producer.disconnect();
}

async reset() {
const admin = this.kafka.admin();
await admin.resetOffsets({
groupId: this.group,
topic: this.topic,
earliest: true,
});
await admin.disconnect();
}

async drop() {
console.log("dropping database");
await this.database.drop();
await this.database.clearAsync();
const admin = this.kafka.admin();
const topics = await admin.listTopics();
if (topics.find((x) => x === this.topic)) {
console.log("deleting topic", this.topic);
await admin.deleteTopics({ topics: [this.topic] });
} else {
console.log("topic not found");
}
await admin.disconnect();
}

async handlePost(request: http.IncomingMessage) {
async handlePost(request: http.IncomingMessage, file: string) {
const content = await getBody(request);
const count = this.database.getCount();
await this.database.put(count, content);
await writeFile(file, content, { encoding: "utf8", flag: "a+" });
await this.producer.send({
topic: this.topic,
messages: [{ value: content }],
});
}

handleGet(
request: http.IncomingMessage,
query: URLSearchParams,
): {
handleGet(): {
body: Buffer;
links: LinkHeader;
} {
const parts = (request.url ?? "").split("?");
const p1 = parts[0];
const p2 = new URLSearchParams(parts[1]);

const indexKey = query.get("index");
if (!indexKey) {
throw "No index found";
}
const index = parseInt(indexKey);
const content: Buffer | undefined = this.database.get(index);
if (!content) {
throw "No content found";
}

const createUri = (index: number) => {
p2.set("index", index + "");
return p1 + "?" + p2.toString();
};

const count = this.database.getCount();

const links = new LinkHeader();
links.set({ uri: createUri(0), rel: "first" });
links.set({ uri: createUri(count - 1), rel: "last" });
if (index > 0) {
links.set({ uri: createUri(index - 1), rel: "prev" });
}
if (index < count - 1) {
links.set({ uri: createUri(index + 1), rel: "next" });
}

return { body: content, links };
throw "we don't do get";
}

server(options: { key: string; host: string; port: number }) {
server(options: { key: string; host: string; port: number; file: string }) {
this.host = options.host;
this.port = options.port;
const server = http.createServer(async (req, res) => {
const query = new URLSearchParams((req.url ?? "").split("?")[1]);


if (query.get("key") !== options.key) {
res.writeHead(401, { "Content-Type": "text/plain" });
res.end("Unauthorized");
Expand All @@ -142,7 +187,7 @@ class Instance {
if (req.method === "POST") {
await this.lock.acquire();
try {
await this.handlePost(req);
await this.handlePost(req, options.file);
} catch (ex: unknown) {
console.log(ex);
res.writeHead(500, { "Content-Type": "text/plain" });
Expand All @@ -163,7 +208,7 @@ class Instance {
// getting the body and links should be done in the mutex
await this.lock.acquire();
try {
const get = this.handleGet(req, query);
const get = this.handleGet();
body = get.body;
links = get.links;
} catch (ex: unknown) {
Expand Down Expand Up @@ -226,11 +271,16 @@ async function main() {
return;
}

const instance = new Instance(<{ database: string }>options);
const instance = new Instance(
<ConstructorParameters<typeof Instance>[0]>options,
);
await instance.init();

if (options.command === "delete") {
await instance.drop();
console.log("dropped");
await instance.close();
console.log("closed");
return;
}

Expand All @@ -254,25 +304,25 @@ async function main() {
if (options.command === "batch") {
const argv = options._unknown || [];
const batchOptions = commandLineArgs(batchDefinitions, { argv });

const data = await readFile(batchOptions.file, { encoding: "utf8" });
const slices = data
.split("}{")
.map((s, i, xs) =>
i == 0 ? s + "}" : i == xs.length - 1 ? "{" + s : "{" + s + "}",
)
.filter((x, i) => {
try {
JSON.parse(x);
return true;
} catch (ex) {
console.log(ex);
console.log("index", i, x);
return false;
}
});
const data = await readFileChunks(batchOptions.file);
const slices = data.filter((x, i) => {
try {
JSON.parse(x);
return true;
} catch (ex) {
console.log("error at index", i, x);
return false;
}
});

await instance.ingest(slices.slice(batchOptions.start));
await instance.close();
return;
}

if (options.command === "log") {
await instance.log_messages();
await instance.close();
return;
}

Expand All @@ -281,4 +331,30 @@ async function main() {
process.exit(1);
}

function readFileChunks(location: string): Promise<string[]> {
return new Promise((res, rej) => {
const out: string[] = [];
let last = "";
const stream = createReadStream(location, { encoding: "utf8" });
stream.on("data", (incoming: string) => {
const s = last + incoming;
let pos = 0;
let idx = s.indexOf("}{", pos);
while (idx >= 0) {
out.push(s.slice(pos, idx + 1));
pos = idx + 1;
idx = s.indexOf("}{", pos);
}
last = s.slice(pos);
});
stream.on("error", rej);
stream.on("end", () => {
if (last) {
out.push(last);
}
res(out);
});
});
}

main();
Loading

0 comments on commit 57aec1b

Please sign in to comment.