The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora
.
An operator requires to be registered and implement the DoraOperator
trait. It is composed of an on_input
method that defines the behaviour of the operator when there is an input.
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus};
register_operator!(ExampleOperator);
#[derive(Debug, Default)]
struct ExampleOperator {
time: Option<String>,
}
impl DoraOperator for ExampleOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()> {
- Generate a new Rust library
cargo new rust-dataflow-example-operator --lib
Cargo.toml
{{#include ../../examples/rust-dataflow/operator/Cargo.toml}}
src/lib.rs
{{#include ../../examples/rust-dataflow/operator/src/lib.rs}}
- Build it:
cargo build --release
- Link it in your graph as:
{{#include ../../examples/rust-dataflow/dataflow.yml:13:21}}
This example can be found in examples
.
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
DoraNode::init_from_env()
initiate a node from environment variables set by dora-coordinator
let node = DoraNode::init_from_env().await?;
.inputs()
gives you a stream of input that you can access using next()
on the input stream.
let mut inputs = node.inputs().await?;
send_output
send data from the node.
node.send_output(&data_id, data.as_bytes()).await?;
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
{{#include ../../examples/rust-dataflow/node/Cargo.toml}}
src/main.rs
{{#include ../../examples/rust-dataflow/node/src/main.rs}}
- Link it in your graph as:
{{#include ../../examples/rust-dataflow/dataflow.yml:6:12}}