Skip to content

Commit

Permalink
refactor py script
Browse files Browse the repository at this point in the history
  • Loading branch information
hippalus committed Dec 28, 2024
1 parent a44582d commit 8937c4d
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 60 deletions.
131 changes: 103 additions & 28 deletions docker-compose-distributed-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
- P_MODE=ingest
- P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
- P_KAFKA_CONSUMER_TOPICS=dist-test-logs-stream
- P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092
- P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
# additional settings like security, tuning, etc.
networks:
- parseable-internal
Expand All @@ -95,6 +95,8 @@ services:
- parseable-query
- minio
- kafka-0
- kafka-1
- kafka-2
deploy:
restart_policy:
condition: on-failure
Expand Down Expand Up @@ -141,7 +143,7 @@ services:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
Expand All @@ -150,8 +152,8 @@ services:
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_0_data:/bitnami/kafka
Expand All @@ -163,43 +165,116 @@ services:
timeout: 5s
retries: 5

#kafka-ui:
# platform: linux/amd64
# image: provectuslabs/kafka-ui:latest
# environment:
# KAFKA_CLUSTERS_0_NAME: dist-test
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092
# KAFKA_CLUSTERS_0_METRICS_PORT: 9101
# DYNAMIC_CONFIG_ENABLED: "true"
# JAVA_OPTS: -Xms256m -Xmx512m -XX:+UseG1GC
# networks:
# - parseable-internal
# depends_on:
# - kafka-0
# ports:
# - "8080:8080"
# deploy:
# restart_policy:
# condition: on-failure
# delay: 20s
# max_attempts: 3
kafka-1:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_1_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-2:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_2_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
environment:
KAFKA_CLUSTERS_0_NAME: dist-test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
JAVA_OPTS: -Xms256m -Xmx512m -XX:+UseG1GC
networks:
- parseable-internal
depends_on:
- kafka-0
- kafka-1
- kafka-2
ports:
- "8080:8080"
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-log-generator:
build:
context: ./scripts
dockerfile: Dockerfile
environment:
- KAFKA_BROKERS=kafka-0:9092
- KAFKA_BROKERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
- KAFKA_TOPIC=dist-test-logs-stream
- LOG_RATE=500
- TOTAL_LOGS=20000
- LOG_RATE=1000
- TOTAL_LOGS=100000
- REPLICATION_FACTO=3
depends_on:
- kafka-0
- kafka-1
- kafka-2
networks:
- parseable-internal
restart: "no"
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local

50 changes: 27 additions & 23 deletions docker-compose-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,26 @@ services:
timeout: 5s
retries: 5

#kafka-ui:
# platform: linux/amd64
# image: provectuslabs/kafka-ui:latest
# ports:
# - "8080:8080"
# depends_on:
# - kafka-0
# environment:
# KAFKA_CLUSTERS_0_NAME: test
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092
# KAFKA_CLUSTERS_0_METRICS_PORT: 9101
# DYNAMIC_CONFIG_ENABLED: "true"
# JAVA_OPTS: -Xms256m -Xmx512m -XX:+UseG1GC
# networks:
# - parseable-internal
# deploy:
# restart_policy:
# condition: on-failure
# delay: 20s
# max_attempts: 3
kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
depends_on:
- kafka-0
environment:
KAFKA_CLUSTERS_0_NAME: test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
JAVA_OPTS: -Xms256m -Xmx512m -XX:+UseG1GC
networks:
- parseable-internal
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-log-generator:
build:
Expand All @@ -146,13 +146,17 @@ services:
environment:
- KAFKA_BROKERS=kafka-0:9092
- KAFKA_TOPIC=test-logs-stream
- LOG_RATE=1000
- TOTAL_LOGS=10000
- LOG_RATE=500
- TOTAL_LOGS=50000
depends_on:
- kafka-0
networks:
- parseable-internal
restart: "no"
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

volumes:
kafka_0_data:
Expand Down
26 changes: 17 additions & 9 deletions scripts/kafka_log_stream_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,16 @@ def generate_log():


def main():
logger.info("Starting rate-limited log producer...")
logger.info("Starting continuous log producer...")
create_topic(KAFKA_TOPIC, NUM_PARTITIONS, REPLICATION_FACTOR)
logger.info(f"Broker: {KAFKA_BROKERS}, Topic: {KAFKA_TOPIC}, Rate: {LOG_RATE} logs/sec, Total Logs: {TOTAL_LOGS}")
logger.info(f"Broker: {KAFKA_BROKERS}, Topic: {KAFKA_TOPIC}, Rate: {LOG_RATE} logs/sec")

message_count = 0
start_time = time.time()
batch_start_time = time.time()

try:
for i in range(TOTAL_LOGS):
while True:
log_data = generate_log()
log_str = json.dumps(log_data)

Expand All @@ -168,9 +170,19 @@ def main():
callback=delivery_report
)

if (i + 1) % REPORT_EVERY == 0:
logger.info(f"{i + 1} messages produced. Flushing producer...")
message_count += 1

if message_count % REPORT_EVERY == 0:
current_time = time.time()
batch_elapsed = current_time - batch_start_time
total_elapsed = current_time - start_time

logger.info(f"Batch of {REPORT_EVERY} messages produced in {batch_elapsed:.2f}s")
logger.info(f"Total messages: {message_count}, Running time: {total_elapsed:.2f}s")
logger.info(f"Current rate: ~{REPORT_EVERY / batch_elapsed:,.0f} logs/sec")

producer.flush()
batch_start_time = current_time

# Sleep to maintain the logs/second rate
time.sleep(1 / LOG_RATE)
Expand All @@ -186,10 +198,6 @@ def main():
logger.info("Flushing producer...")
producer.flush()

elapsed = time.time() - start_time
logger.info(f"DONE! Produced {TOTAL_LOGS} log messages in {elapsed:.2f} seconds.")
logger.info(f"Effective rate: ~{TOTAL_LOGS / elapsed:,.0f} logs/sec")


if __name__ == "__main__":
main()

0 comments on commit 8937c4d

Please sign in to comment.