forked from decodingml/llm-twin-course
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbytewax_pipeline.py
30 lines (29 loc) · 1010 Bytes
/
bytewax_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from data_flow.stream_input import RabbitMQSource
from data_flow.stream_output import QdrantOutput
from data_logic.dispatchers import (
ChunkingDispatcher,
CleaningDispatcher,
EmbeddingDispatcher,
RawDispatcher,
)
from db.qdrant import connection
flow = Dataflow("Streaming ingestion pipeline")
stream = op.input("input", flow, RabbitMQSource())
stream = op.map("raw dispatch", stream, RawDispatcher.handle_mq_message)
stream = op.map("clean dispatch", stream, CleaningDispatcher.dispatch_cleaner)
op.output(
"cleaned data insert to qdrant",
stream,
QdrantOutput(connection=connection, sink_type="clean"),
)
stream = op.flat_map("chunk dispatch", stream, ChunkingDispatcher.dispatch_chunker)
stream = op.map(
"embedded chunk dispatch", stream, EmbeddingDispatcher.dispatch_embedder
)
op.output(
"embedded data insert to qdrant",
stream,
QdrantOutput(connection=connection, sink_type="vector"),
)