Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Kafka consumers inside of one process [Question] #2056

Open
nightblure opened this issue Jan 29, 2025 · 1 comment · May be fixed by #1779
Open

Multiple Kafka consumers inside of one process [Question] #2056

nightblure opened this issue Jan 29, 2025 · 1 comment · May be fixed by #1779
Milestone

Comments

@nightblure
Copy link

nightblure commented Jan 29, 2025

Description

Hi! I'm trying to use this package for the first time and I have a question

I declared two Kafka consumers (and two partitions), but only one of them is ready to listen (see logs and code below).
Is there a way to fix this so that an arbitrary number of consumers work within one process? (as in my case it is a process running uvicorn+fastapi)

Consumers code:

from functools import partial

from faststream.kafka import KafkaMessage
from pydantic import BaseModel

from deps import kafka_client

# .kafka field - it's object of KafkaBroker type which is taken from FastStream
consumer_group = partial(
    kafka_client.kafka.subscriber,
    "test_topic",
    group_id="gr",
    auto_commit=False,
)

class Message(BaseModel): ...

@consumer_group()
async def consumer(message: Message, msg: KafkaMessage):
    print(f"Got message: {message}")


@consumer_group(batch=True)
async def another_consumer(msgs: list[Message], msg: KafkaMessage):
    print(f"Got batch message")

Logs with this code:

INFO:     Uvicorn running on http://0.0.0.0:8099 (Press CTRL+C to quit)
INFO:     Started reloader process [30176] using StatReload
INFO:     Started server process [30178]
INFO:     Waiting for application startup.
2025-01-29 15:13:06,164 INFO     - test_topic | gr |            - `Consumer` waiting for messages

Case with commented first consumer

If i comment code with first consumer (consumer function) then i get next logs:

INFO:     Uvicorn running on http://0.0.0.0:8099 (Press CTRL+C to quit)
INFO:     Started reloader process [30355] using StatReload
INFO:     Started server process [30360]
INFO:     Waiting for application startup.
2025-01-29 15:16:26,309 INFO     - test_topic | gr |            - `AnotherConsumer` waiting for messages

P.S. code with consumers located in package kafka_.consumer and when the application starts I import everything from this file:

from kafka_.consumer import *
@Lancetnik Lancetnik moved this to Waiting for merge in FastStream Jan 29, 2025
@Lancetnik Lancetnik added this to the 0.6.0 milestone Jan 29, 2025
@Lancetnik
Copy link
Member

Looks like a copy of #1901 , #1308 and #2029

@Lancetnik Lancetnik linked a pull request Jan 29, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Waiting for merge
Development

Successfully merging a pull request may close this issue.

2 participants