Opinionated and Simplified Minimal APIs for Consuming Messages from RabbitMQ, Ensuring No Crucial Configurations Are Hidden.
Oragon.RabbitMQ is a Minimal API implementation to consume RabbitMQ queues. It provides everything you need to create resilient RabbitMQ consumers without the need to study numerous books and articles or introduce unknown risks to your environment.
All things about consuming queues is configurable in a friendly fluent and consistent way.
Add the principal package, Oragon.RabbitMQ for enabling consuming queues like Minimal API's.
dotnet add package Oragon.RabbitMQ
Pick one serializer: SystemTextJson or NewtonsoftJson
For System.Text.Json use Oragon.RabbitMQ.Serializer.SystemTextJson nuget package. It's ensuring latest performance and security issues resolved by Microsoft .NET Team.
dotnet add package Oragon.RabbitMQ.Serializer.SystemTextJson
If you have special needs that only JSON .NET solve, use the nuget package Oragon.RabbitMQ.Serializer.NewtonsoftJson.
dotnet add package Oragon.RabbitMQ.Serializer.NewtonsoftJson
var builder = WebApplication.CreateBuilder(args); //or Host.CreateApplicationBuilder(args);
// ...existing code...
builder.AddRabbitMQConsumer();
/*Pick only one*/
builder.Services.AddAmqpSerializer(options: JsonSerializerOptions.Default); // For Oragon.RabbitMQ.Serializer.SystemTextJson
//or
builder.Services.AddAmqpSerializer(options: new JsonSerializerSettings{...}); // For Oragon.RabbitMQ.Serializer.NewtonsoftJson
// ...existing code...
The consumer will use dependency injection to get a valid instance of RabbitMQ.Client.IConnection. If you do not provide one, you can create a connection configuration as shown below.
// ...existing code...
builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory()
{
Uri = new Uri("amqp://rabbitmq:5672"),
DispatchConsumersAsync = true
});
builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());
// ...existing code...
If you are using .NET Aspire, replace Aspire.RabbitMQ.Client
with the Oragon.RabbitMQ.AspireClient
package.
Today, Oragon.RabbitMQ.AspireClient
supports RabbitMQ.Client 7.x, while Aspire.RabbitMQ.Client
supports 6.x.
After Aspire.RabbitMQ.Client
gain supports RabbitMQ.Client 7.x, the Oragon.RabbitMQ.AspireClient
package will be not necessary and will be marked as deprecated.
// ...existing code...
builder.AddRabbitMQClient("rabbitmq");
// ...existing code...
To map your queue using this package, follow these steps:
-
Build the application: First, build your application using the builder pattern. This initializes the application and prepares it for further configuration.
var app = builder.Build();
-
Map the queue: Next, map your queue to a specific service and command/event. This step involves configuring how the service will handle incoming messages from the queue.
public class BusinessService { public bool CanDoSomething(BusinessCommandOrEvent command) { return /* Check if can process this message*/; } public void DoSomething(BusinessCommandOrEvent command) { ... } public Task DoSomethingAsync(BusinessCommandOrEvent command) { return Task.CompletedTask; } }
In this example, the success of execution causes an implicit AckResult to process an ack for message broker after processing the entire message.
//Service Method Signature: void DoSomething(BusinessCommandOrEvent command) app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => svc.DoSomething(msg));
The same mechanism supports async/await code.
//Service Method Signature: Task DoSomethingAsync(BusinessCommandOrEvent command) app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => await svc.DoSomethingAsync(msg).ConfigureAwait(false));
You can take control by returning an instance of IAmqpResult implementation.
We provide some built-in implementations like: AckResult, NackResult, RejectResult, ComposableResult and ReplyResult.
//Service Method Signature: // bool CanDoSomething(BusinessCommandOrEvent command) // Task DoSomethingAsync(BusinessCommandOrEvent command) app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => { IAmqpResult returnValue; if (svc.CanDoSomething(msg)) { await svc.DoSomethingAsync(msg); returnValue = AmqpResults.Ack(); } else { returnValue = AmqpResults.Reject(requeue: true); } return returnValue; });
Or changing the behavior of exception handling by handling yourself and returning a IAmqpResult valid implementation.
//Service Method Signature: // Task DoSomethingAsync(BusinessCommandOrEvent command) app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => { IAmqpResult returnValue; try { await svc.DoSomethingAsync(msg); returnValue = AmqpResults.Ack(); } catch(Exception ex) { // Log this exception returnValue = AmqpResults.Nack(true); } return returnValue; });
-
Run the application: Finally, run the application to start processing messages from the queue.
app.Run();
var builder = WebApplication.CreateBuilder(args);
builder.AddRabbitMQConsumer();
builder.Services.AddSingleton<BusinessService>();
builder.Services.AddAmqpSerializer(options: JsonSerializerOptions.Default);
builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory(){ Uri = new Uri("amqp://rabbitmq:5672"), DispatchConsumersAsync = true });
builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());
var app = builder.Build();
app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) =>
svc.DoSomethingAsync(msg));
app.Run();
This approach is designed to decouple RabbitMQ consumers from business logic, ensuring that business code remains unaware of the queue consumption context. The result is incredibly simple, decoupled, agnostic, more reusable, and highly testable code.
This consumer is focused on creating a resilient consumer using manual acknowledgments.
- The automatic flow produces a
BasicReject
without requeue when serialization failures (e.g., incorrectly formatted messages), you must use dead-lettering to ensure that your message will not be lost. - The automatic flow produces a
BasicNack
without requeue for processing failures. You must use dead-lettering to ensure that your message will not be lost. - The automatic flow produces a
BasicAck
for success. If you need more control, return an instance ofIAmqpResult
to control this behavior. - Minimal API design style made with minimum and cached reflection
- Extensible with support for custom serializers and encoders
Autoflow uses Ack, Nack, and Reject automatically, but you can control the flow.
Inside the Oragon.RabbitMQ.Consumer.Actions
namespace, you can find some results:
- AckResult (
AmqpResults.Ack();
) - NackResult (
AmqpResults.Nack(requeue: bool);
) - RejectResult (
AmqpResults.Reject(requeue: bool);
) - ReplyResult (
AmqpResults.Reply<T>(T objectToReply);
)⚠️ EXPERIMENTAL⚠️ - ComposableResult (
AmqpResults.Compose(params IAmqpResult[] results);
)
Example:
app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
IAmqpResult returnValue;
if (svc.CanXpto(msg))
{
svc.DoXpto(msg);
returnValue = AmqpResults.Ack();
}
else
{
returnValue = AmqpResults.Nack(true);
}
return returnValue;
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);
app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
if (await svc.CanXpto(msg))
{
await svc.DoXpto(msg);
return AmqpResults.Ack();
}
else
{
return AmqpResults.Nack(true);
}
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);
For these types, the model binder will set the correct current instance without needing a special attribute.
- RabbitMQ.Client.IConnection
- RabbitMQ.Client.IChannel
- RabbitMQ.Client.Events.BasicDeliverEventArgs
- RabbitMQ.Client.DeliveryModes
- RabbitMQ.Client.IReadOnlyBasicProperties
- System.IServiceProvider (scoped)
Some string parameters are considered special, and the model binder will use a name to set the correct current string from the consumer.
The model binder will set the name of the queue that the consumer is consuming.
- queue
- queueName
The model binder will set a routing key from the Amqp message.
- routing
- routingKey
The model binder will set an exchange name from the Amqp message.
- exchange
- exchangeName
The model binder will set a consumer tag from the actual consumer.
- consumer
- consumerTag
For version 1.0.0, I've removed all implementations of automatic telemetry and OpenTelemetry. It will be available as soon as possible.
- Migrate Demo to Library Project
- Core: Queue Consumer
- Core: Rpc Queue Consumer
- Core: Support Keyed Services
- Core: Support of new design of RabbitMQ.Client
- Create Samples
- Review All SuppressMessageAttribute
- Create Docs
- Benchmarks
- Automate Badges
- Add SonarCloud
- Code Coverage > 80%
- Add CI/CD
- Add Unit Tests
- Add Integrated Tests with TestContainers
- Test CI/CD Flow: MyGet Alpha Packages with Symbols
- Test CI/CD Flow: MyGet Packages without Symbols
- Test CI/CD Flow: Nuget Packages without Symbols
- Change original behavior based on lambda expressions to dynamic delegate.