Skip to content

turbolytics/sql-flow

Repository files navigation

SQLFlow: DuckDB for Streaming Data.

Docker Pulls

SQLFlow enables SQL-based stream-processing, powered by DuckDB. SQLFlow embeds duckdb, supporting kafka stream processing logic using pure sql.

SQLFlow executes SQL against streaming data, such as Kafka or webhooks. Think of SQLFlow as a way to run sql against a continuous stream of data. The data outputs can be shipped to sinks, such as Kafka.

Screenshot 2024-12-31 at 7 22 55 AM

SQLFlow Use-Cases

  • Streaming Data Transformations: Clean data and types and publish the new data (example config).
  • Stream Enrichment: Add data an input stream and publish the new data (example config).
  • Data aggregation: Aggregate input data batches to decrease data volume (example config).
  • Tumbling Window Aggregation: Bucket data into arbitrary time windows (such as "hour" or "10 minutes") (example config).
  • Running SQL against the Bluesky Firehose: Execute SQL against any webhook source, such as the Bluesky firehose (example config)

SQLFlow Features & Roadmap

  • Sources
    • Kafka Consumer using consumer groups
    • Websocket input (for consuming bluesky firehose)
    • HTTP (for webhooks)
  • Sinks
    • Kafka Producer
    • Stdout
    • Local Disk
    • Postgres
    • S3
  • Serialization
    • JSON Input
    • JSON Output
    • Parquet Output
    • Iceberg Output
  • Handlers
    • Memory Persistence
    • Pipeline-scoped SQL such as defining views, or attaching to databases.
    • User Defined Functions (UDF)
    • Disk Persistence
  • Table Managers
    • Tumbling Window Aggregations
  • Operations
    • Observability Metrics

Getting Started

Docker

Docker is the easiest way to get started.

  • Pull the sql-flow docker image
docker pull turbolytics/sql-flow:latest
  • Validate config by invoking it on test data
docker run -v $(pwd)/dev:/tmp/conf -v /tmp/sqlflow:/tmp/sqlflow turbolytics/sql-flow:latest dev invoke /tmp/conf/config/examples/basic.agg.yml /tmp/conf/fixtures/simple.json

['{"city":"New York","city_count":28672}', '{"city":"Baltimore","city_count":28672}']
  • Start kafka locally using docker
docker-compose -f dev/kafka-single.yml up -d
  • Publish test messages to kafka
python3 cmd/publish-test-data.py --num-messages=10000 --topic="input-simple-agg-mem"
  • Start kafka consumer from inside docker-compose container
docker exec -it kafka1 kafka-console-consumer --bootstrap-server=kafka1:9092 --topic=output-simple-agg-mem
  • Start SQLFlow in docker
docker run -v $(pwd)/dev:/tmp/conf -v /tmp/sqlflow:/tmp/sqlflow -e SQLFLOW_KAFKA_BROKERS=host.docker.internal:29092 turbolytics/sql-flow:latest run /tmp/conf/config/examples/basic.agg.mem.yml --max-msgs-to-process=10000
  • Verify output in the kafka consumer
...
...
{"city":"San Francisco504","city_count":1}
{"city":"San Francisco735","city_count":1}
{"city":"San Francisco533","city_count":1}
{"city":"San Francisco556","city_count":1}

The dev invoke command enables testing a SQLFlow pipeline configuration on a batch of test data. This enables fast feedback local development before launching a SQLFlow consumer that reads from kafka.

Consuming Bluesky Firehose

SQLFlow supports DuckDB over websocket. Running SQL against the Bluesky firehose is a simple configuration file:

bluesky firehose config

The following command starts a bluesky consumer and prints every post to stdout:

docker run -v $(pwd)/dev/config/examples:/examples turbolytics/sql-flow:latest run /examples/bluesky/bluesky.raw.stdout.yml

output

Checkout the configuration files here

Streaming to Iceberg

SQLFlow supports writing to Iceberg tables using pyiceberg.

The following configuration writes to an Iceberg table using a local SQLite catalog:

  • Initialize the SQLite iceberg catalog and test table
python3 cmd/setup-iceberg-local.py setup
created default.city_events
created default.bluesky_post_events
Catalog setup complete.
  • Start Kafka Locally
docker-compose -f dev/kafka-single.yml up -d
  • Publish Test Messages to Kafka
python3 cmd/publish-test-data.py --num-messages=5000 --topic="input-kafka-mem-iceberg"
  • Run SQLFlow, which will read from kafka and write to the iceberg table locally
docker run \
  -e SQLFLOW_KAFKA_BROKERS=host.docker.internal:29092 \
  -e PYICEBERG_HOME=/tmp/iceberg/ \ 
  -v $(pwd)/dev/config/iceberg/.pyiceberg.yaml:/tmp/iceberg/.pyiceberg.yaml \
  -v /tmp/sqlflow/warehouse:/tmp/sqlflow/warehouse \
  -v $(pwd)/dev/config/examples:/examples \
  turbolytics/sql-flow:latest run /examples/kafka.mem.iceberg.yml --max-msgs-to-process=5000
  • Verify iceberg data was written by querying it with duckdb
% duckdb
v1.1.3 19864453f7
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D select count(*) from '/tmp/sqlflow/warehouse/default.db/city_events/data/*.parquet';
┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│         5000 │
└──────────────┘

Recipes

Coming Soon, until then checkout:

Development

  • Install python deps
pip install -r requirements.txt
pip install -r requirements.dev.txt

C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/2.3.0/include LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/2.3.0/lib pip install confluent-kafka
  • Run tests
make test-unit

The following table shows the performance of different test scenarios:

Name Throughput Max RSS Memory Peak Memory Usage
Simple Aggregation Memory 45,000 msgs / sec 230 MiB 130 MiB
Simple Aggregation Disk 36,000 msgs / sec 256 MiB 102 MiB
Enrichment 13,000 msgs /sec 368 MiB 124 MiB
CSV Disk Join 11,500 msgs /sec 312 MiB 152 MiB
CSV Memory Join 33,200 msgs / sec 300 MiB 107 MiB
In Memory Tumbling Window 44,000 msgs / sec 198 MiB 96 MiB

More information about benchmarks are available in the wiki.

Contact Us

Like SQLFlow? Use SQLFlow? Feature Requests? Please let us know! [email protected]