Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing from a consumer #2

Open
rob-blackbourn opened this issue Sep 14, 2020 · 2 comments
Open

Publishing from a consumer #2

rob-blackbourn opened this issue Sep 14, 2020 · 2 comments

Comments

@rob-blackbourn
Copy link

Hi,

When I publish from a consume loop I get the error below. Would you expect this to work, or should I be using a separate connection for consuming and publishing?

error publishing: Invalid state transition from 'Message Published' to 'Sent message acknowledgement'

The code looks like this:

async for msg in client.consume(queue):
    await client.publish(
        exchange,
        routing_key,
        msg.body,
        content_type=msg.content_type
    )
@gmr
Copy link
Owner

gmr commented Sep 14, 2020

In the current state model, you would need to Ack or Nack the message before taking on another operation. IE it wants to finish what was started before starting a new operation.

async for msg in client.consume(queue):
    await client.basic_ack(msg.delivery_tag)
    await client.publish(
        exchange,
        routing_key,
        msg.body,
        content_type=msg.content_type)

Alternatively, as it exists today, you could create a 2nd client connection for publishing, as the idea of acking before a confirmed republish might be problematic.

What could be cool here is instead of client being a single connection, having it be a managed pool that automatically understands the context of its usage in scenarios like the one you presented.

@rob-blackbourn
Copy link
Author

I'm really liking the state machine approach BTW.

The pattern I have is a chain of micro-services: producer, consumer-producer, ..., consumer. It seems natural to want to publish the message before acknowledging, but I'm not sure I'm right.

At the moment I'm using the 2nd client connection solution.

With regard to the managed pool:

I can see the client is in STATE_MESSAGE_ASSEMBLED, which is a chain starting with STATE_BASIC_DELIVER_RECEIVED which is in the _IDLE_STATE set.

The simplest solution would be to used a fresh client if the current state was not in the _IDLE_STATE set, but I don't know if there are state chains which would conflict?

A more complex approach might be to use the initial state as a "context", and have a set of "legal" contexts for which new clients are allowed to be created.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants