Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
koonopek committed Feb 18, 2024
1 parent 1d5800e commit bda69f4
Show file tree
Hide file tree
Showing 13 changed files with 398 additions and 363 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Golem pipeline

Golem pipelines is library which allows to spawn golem tasks in declarative form, this approach unlocks some sexy features.

## Features
- **declarative** Declarative code should be easier to read/maintain/extend especially for highly parallelized tasks
- **composability** Cause of pipeline/stage/executable abstraction it is easy to build reusable tasks. This could intresting direction of development especially in web.
- **pipelining** ![alt text](image.png) It means that `task_3` will start as soon as `task_0` finishes. We don't have to wait for `task_1`. The dependency is defined by `output <> input` variables between two or more tasks.
- **checkpointing** Every task result after successful execution is persisted. Thus we don't loose progress on interrupted pipelines. It also allows for iterative building new stages, without re-running already working stages over and over again.
- `FsCheckPointer` - persists checkpoints to file system
- `S3CheckPointer` - (TBD) interface can be easily implemented for S3 which would be good solution for browser execution
- **Input** Input abstraction allows to choose from different input resources
- `FileInput` - upload files to Provider, and pass path as args to command line
- `ArgInput` - command line arguments
- `S3Input` - (TBD) interface can be easily implemented for S3 which would be good solution for browser execution
- **Executable** Different modes of execution tasks, reduce boilerplate code
- `ExecutableToStdout` - executes program and treats `stdout` as a result
- `ExecutableToFiles`- allows producing multiple results (files) from single input


## Future improvements
- **visualization** - Such organized pipelines could be easly visualized using [graphs](https://www.npmjs.com/package/react-json-graph). Also bills etc. could be visualized in real time, by implementing event bus. I didn't make it in time :(
- **s3 interfaces** - I didn't make it in time :(

## Feedback for Golem team
- Really great experience. Most of the hhings were working out of the box. Interface (js-sdk) is very intuitive. Generally it was pleasure to this library. Good job!
- What could be better
- More `stream API` both for uploading data to provider and for fetching. It would avoid copying data, and allow processing bigger files. (I had problem with `ctx.spawn` api I didnt managed to make it work)
- I was getting error when tried to upload/download file concurrently
- Global event bus for better observability

Checkout out example of using this library: [example](./example/README.md)
9 changes: 9 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Example

1. In this example we have two files which contains William Shakespeare texts.
2. We first partition the sentences in this texts by given list of 8 words [ "I", "you", "We", "god", "devil", "mother", "father"]. We use for this [partition_by.js](../golem_tasks/partition_by.js). (2 parallel tasks)
3. As the result we got 2x8 outputs. For every output we create task with [go program](../golem_tasks//go//main.go) which in parallel score sentiment of every sentence. Score 1 => positive 0=> negative
4. The last stage is responsible for aggregating sentiments and generating report concluding that "the most positive word" in William Shakespeare poetry is "father" and the least is "god".


I understand that Golem is better suited for tasks that demand significant CPU and RAM resources rather than those involving extensive data transmission. Unfortunately, due to time constraints, I was unable to develop a more suitable example. The primary purpose of this example is to showcase the library's capabilities
52 changes: 12 additions & 40 deletions main.ts → example/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
FsCheckpointer,
Pipeline,
Stage,
} from ".";
} from "../src";

async function main() {
const executorJs = await TaskExecutor.create({
Expand Down Expand Up @@ -51,7 +51,7 @@ async function main() {
new ArgInput([wordsToAnalyze.join(",")]),
new FileInput("inputs/t8.shakespeare-1.txt"),
],
"node",
"node"
),
new ExecutableToFiles(
"letter_2",
Expand All @@ -60,7 +60,7 @@ async function main() {
new ArgInput([wordsToAnalyze.join(",")]),
new FileInput("inputs/t8.shakespeare-2.txt"),
],
"node",
"node"
),
]);

Expand All @@ -77,23 +77,23 @@ async function main() {
[
new FileInput(
partitionByTitle.outputs[outputIndex].then(
(o) => o[wordIndex],
),
(o) => o[wordIndex]
)
),
],
),
),
),
]
)
)
)
);

const reduce = new Stage("reduce", executorJs, [
new ExecutableToStdout(
"stats",
new FileInput("golem_tasks/stats.js"),
range(Object.values(sentiment.outputs).length).flatMap(
(i) => new FileInput(sentiment.outputs[i].then((o) => o[0])),
(i) => new FileInput(sentiment.outputs[i].then((o) => o[0]))
),
"node",
"node"
),
]);

Expand All @@ -108,7 +108,7 @@ async function main() {
reduce.outputs[0].then((result) => {
console.log("RESULT RAPORT");
console.log(
JSON.stringify(JSON.parse(readFileSync(result[0]).toString()), null, 4),
JSON.stringify(JSON.parse(readFileSync(result[0]).toString()), null, 4)
);
});
} catch (err) {
Expand All @@ -120,31 +120,3 @@ async function main() {
}

main();
//
// I you all
// analyze sentiment analyze sentiment sentiment
// sum sum
// stats

// budowanie reuzywlnych klockow na wyzszmy poziome
// latwo przeniesc do przegladarki i udostepnic innym i pozwolic budowac z insitejacych klockow
// deklaratywne podejscie
// latwiej analizowac kod
//An error occurred: GolemWorkError: Unable to execute task. Error: Failed to upload outputs/test-2/sentiment::sentiment_01.data: Local service error: State error: Busy: StatePair(Ready, Some(Ready))

// issues streams doesnt work
// beforeEach doesnt work so redundant downloads

// url: http://127.0.0.1:7465/ya-client/#/
// what is app-key
// type Input = "file";
// type Output = "file" | "byteStream";

// new Pipeline([
// new Stage([fileChunk1, fileChunk2, fileChunk3]),
// [partitionBy, partitionBy, partitionBy],
// [checkPoint],
// new Stage([Chunk]),
// [hash, hash, hash],
// [checkPoint],
// ]);
1 change: 0 additions & 1 deletion golem_tasks/stats.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const { readFileSync } = require('node:fs');
const path = require('node:path');

function main() {
const raport = {};
Expand Down
Binary file added image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit bda69f4

Please sign in to comment.