Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Kafka integration for Parseable server #936 . #1047

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4cc28cd
Feat: Add Kafka integration for Parseable server #936 .
hippalus Dec 21, 2024
06364b5
Merge remote-tracking branch 'origin' into kafka-integration
hippalus Dec 21, 2024
f085a79
fix conflicts
hippalus Dec 21, 2024
d32eae5
update cli.rs
hippalus Dec 21, 2024
693b9c9
remove unused fn from metadata.rs
hippalus Dec 21, 2024
3cc6b0e
add Copyright
hippalus Dec 21, 2024
12f0358
fix deepsource issues
hippalus Dec 21, 2024
0f6ca53
fix deepsource issues on shutdown.rs
hippalus Dec 21, 2024
58cc468
Add .idea to .gitignore
hippalus Dec 21, 2024
aff48a2
Add Kafka cluster setup to docker-compose files and refactor Dockerfi…
hippalus Dec 22, 2024
9ce1031
feat(metrics): add KafkaMetricsCollector for Prometheus integration .
hippalus Dec 23, 2024
6719a0e
Merge branch 'main' into kafka-integration
hippalus Dec 23, 2024
d8d0558
fix kafka metrics collector registeration
hippalus Dec 24, 2024
6ad0805
Merge branch 'main' into kafka-integration
hippalus Dec 26, 2024
4d13ee2
Refactor connector configurations to adapt parseable cli options.
hippalus Dec 26, 2024
d26d4de
Refactor metrics.rs to reduce cyclomatic complexity.
hippalus Dec 26, 2024
7604bc5
Refactor chunk size configuration
hippalus Dec 26, 2024
3a0fbb0
use comment instead todo! macro
hippalus Dec 26, 2024
7f94f3a
add license header
hippalus Dec 26, 2024
bfb4071
cargo update
hippalus Dec 26, 2024
73a8659
add resource limits for docker containers
hippalus Dec 26, 2024
9afc8d9
Merge branch 'main' into kafka-integration
hippalus Dec 27, 2024
bb3b5cb
scale down kafka broker to single node since OOM on integration test …
hippalus Dec 27, 2024
2df4727
add Install dependencies step to coverage.yaml
hippalus Dec 27, 2024
839bef8
improve logging and err handling
hippalus Dec 28, 2024
935fc40
change log rate
hippalus Dec 28, 2024
a44582d
comment out kafka-ui in docker-compose
hippalus Dec 28, 2024
8937c4d
refactor py script
hippalus Dec 28, 2024
0064727
refactor py script
hippalus Dec 28, 2024
5355634
update dist-test with LOG_RATE=500 TOTAL_LOGS=50000
hippalus Dec 28, 2024
32c17bd
update dist-test with topic REPLICATION_FACTOR=3
hippalus Dec 28, 2024
cc236d0
Separate kafka and standard dockerfiles. Add conditional compilation …
hippalus Dec 30, 2024
7be0ca8
Merge branch 'main' into kafka-integration
hippalus Dec 31, 2024
aade3a8
fix rust fmt
hippalus Dec 31, 2024
9ba40b5
Use dedicated runtime for KafkaSinkConnector to ensure true parallelism.
hippalus Jan 2, 2025
ecbd655
Merge branch 'main' into kafka-integration
hippalus Jan 4, 2025
5c67134
add schema version when deserialize ParseableEvent
hippalus Jan 4, 2025
ce2fca0
rename Event as ParseableEvent
hippalus Jan 4, 2025
835e9b9
-v flag to clean up volumes when stopping containers. Remove the erro…
hippalus Jan 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add Kafka cluster setup to docker-compose files and refactor Dockerfi…
…le for rdkafka dependencies. Implement retrying for consumer.rcv() fn to handle temporary Kafka unavailability.
  • Loading branch information
hippalus committed Dec 22, 2024
commit aff48a202dd2c161bfc1abfcddebb0802baf2ba7
162 changes: 103 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mime = "0.3.17"
rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["kafka"] }
backon = "1.3"

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
Expand Down Expand Up @@ -71,7 +72,7 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
regex = "1.7.3"
regex = "1.11.1"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
Expand All @@ -83,8 +84,8 @@ semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
sysinfo = "0.33.0"
thiserror = "2.0.9"
thread-priority = "1.0.0"
tokio = { version = "1.42", default-features = false, features = [
"sync",
Expand Down
20 changes: 19 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ LABEL maintainer="Parseable Team <[email protected]>"
LABEL org.opencontainers.image.vendor="Parseable Inc"
LABEL org.opencontainers.image.licenses="AGPL-3.0"

RUN apt-get update && \
apt-get install --no-install-recommends -y \
cmake \
librdkafka-dev \
ca-certificates \
libsasl2-dev \
libssl-dev && \
rm -rf /var/lib/apt/lists/*
hippalus marked this conversation as resolved.
Show resolved Hide resolved

WORKDIR /parseable
COPY . .
RUN cargo build --release
Expand All @@ -30,7 +39,16 @@ FROM gcr.io/distroless/cc-debian12:latest

WORKDIR /parseable

# Copy the static shell into base image.
# Copy the Parseable binary from builder
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable

# Copy only the libraries that binary needs since kafka is statically linked
COPY --from=builder /usr/lib/x86_64-linux-gnu/libsasl2.so.2 /usr/lib/x86_64-linux-gnu/
COPY --from=builder /usr/lib/x86_64-linux-gnu/libssl.so.3 /usr/lib/x86_64-linux-gnu/
COPY --from=builder /usr/lib/x86_64-linux-gnu/libcrypto.so.3 /usr/lib/x86_64-linux-gnu/

# Copy CA certificates
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/


CMD ["/usr/bin/parseable"]
161 changes: 154 additions & 7 deletions docker-compose-distributed-test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: "3.7"
networks:
parseable-internal:

services:
# minio
minio:
Expand All @@ -18,7 +18,7 @@ services:
ports:
- 9000:9000
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
interval: 15s
timeout: 20s
retries: 5
Expand All @@ -29,9 +29,10 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ["parseable", "s3-store"]
platform: linux/amd64
command: [ "parseable", "s3-store" ]
ports:
- 8000:8000
- "8000:8000"
environment:
- P_S3_URL=http://minio:9000
- P_S3_ACCESS_KEY=parseable
Expand All @@ -47,7 +48,7 @@ services:
networks:
- parseable-internal
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ]
interval: 15s
timeout: 20s
retries: 5
Expand All @@ -63,7 +64,8 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ["parseable", "s3-store"]
platform: linux/amd64
command: [ "parseable", "s3-store" ]
ports:
- 8000
environment:
Expand All @@ -79,16 +81,23 @@ services:
- P_PARQUET_COMPRESSION_ALGO=snappy
- P_MODE=ingest
- P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
- P_KAFKA_TOPICS=dist-test-logs-stream
- P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
- P_KAFKA_GROUP_ID=parseable-kafka-sink-connector
# additional settings like security, tuning, etc.
networks:
- parseable-internal
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ]
interval: 15s
timeout: 20s
retries: 5
depends_on:
- parseable-query
- minio
- kafka-0
- kafka-1
- kafka-2
deploy:
restart_policy:
condition: on-failure
Expand Down Expand Up @@ -126,3 +135,141 @@ services:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-0:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_0_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-1:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_1_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-2:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_2_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
depends_on:
- kafka-0
- kafka-1
- kafka-2
environment:
KAFKA_CLUSTERS_0_NAME: dist-test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
networks:
- parseable-internal
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-log-generator:
build:
context: ./scripts
dockerfile: Dockerfile
environment:
- KAFKA_BROKERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
- KAFKA_TOPIC=dist-test-logs-stream
- LOG_RATE=500
- TOTAL_LOGS=100000
depends_on:
- kafka-0
- kafka-1
- kafka-2
networks:
- parseable-internal
restart: "no"

volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local

41 changes: 41 additions & 0 deletions docker-compose-local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
services:
kafka:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092:9092"
- "29092:29092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners for internal and external communication
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT_INTERNAL

kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

volumes:
kafka_data:
driver: local
Loading
Loading