Skip to content

Commit

Permalink
Added SetDeliveryAcknowledgementTimeout on the receive endpoint confi…
Browse files Browse the repository at this point in the history
…gurator for RabbitMQ to set the x-consumer-timeout queue argument.
  • Loading branch information
phatboyg committed Nov 11, 2024
1 parent 665b462 commit c332e29
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,25 @@ void Bind<T>(Action<IRabbitMqExchangeBindingConfigurator> callback = null)
/// <param name="consumerTag">Overrides the default consumer tag with the specified name</param>
/// <param name="callback"></param>
void Stream(string consumerTag, Action<IRabbitMqStreamConfigurator> callback = null);

/// <summary>
/// Configure the RabbitMQ delivery acknowledgement timeout for this queue explicitly. This is entirely optional,
/// and generally not necessary.
/// <see href="https://www.rabbitmq.com/docs/consumers#acknowledgement-timeout"/>
/// </summary>
/// <param name="timeSpan"></param>
void SetDeliveryAcknowledgementTimeout(TimeSpan timeSpan);

/// <summary>
/// Configure the RabbitMQ delivery acknowledgement timeout for this queue explicitly. This is entirely optional,
/// and generally not necessary.
/// <see href="https://www.rabbitmq.com/docs/consumers#acknowledgement-timeout"/>
/// </summary>
/// <param name="d">days</param>
/// <param name="h">hours</param>
/// <param name="m">minutes</param>
/// <param name="s">seconds</param>
/// <param name="ms">milliseconds</param>
void SetDeliveryAcknowledgementTimeout(int? d = null, int? h = null, int? m = null, int? s = null, int? ms = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ public TimeSpan? QueueExpiration
{
get
{
if (QueueArguments.TryGetValue("x-expires", out var value) && value is long milliseconds)
if (QueueArguments.TryGetValue(Headers.XExpires, out var value) && value is long milliseconds)
return TimeSpan.FromMilliseconds(milliseconds);

return null;
}
set
{
if (value.HasValue && value.Value > TimeSpan.Zero)
QueueArguments["x-expires"] = (long)value.Value.TotalMilliseconds;
QueueArguments[Headers.XExpires] = (long)value.Value.TotalMilliseconds;
else
QueueArguments.Remove("x-expires");
QueueArguments.Remove(Headers.XExpires);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace MassTransit.RabbitMqTransport.Configuration
using System.Collections.Generic;
using MassTransit.Configuration;
using Middleware;
using RabbitMQ.Client;
using Topology;
using Transports;

Expand Down Expand Up @@ -77,7 +78,7 @@ BrokerTopology BuildTopology(ReceiveSettings settings)
var queueAutoDelete = settings.AutoDelete;
if (settings.QueueExpiration.HasValue)
{
queueArguments["x-expires"] = (long)settings.QueueExpiration.Value.TotalMilliseconds;
queueArguments[Headers.XExpires] = (long)settings.QueueExpiration.Value.TotalMilliseconds;
queueAutoDelete = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ public void SetQuorumQueue(int? replicationFactor = default)
_settings.SetQuorumQueue(replicationFactor);
}

public void SetDeliveryAcknowledgementTimeout(TimeSpan timeSpan)
{
if (timeSpan <= TimeSpan.Zero)
throw new ArgumentException("The RabbitMQ consumer timeout must be > 0");

SetQueueArgument("x-consumer-timeout", (long)timeSpan.TotalMilliseconds);
}

public void SetDeliveryAcknowledgementTimeout(int? d = null, int? h = null, int? m = null, int? s = null, int? ms = null)
{
var value = new TimeSpan(d ?? 0, h ?? 0, m ?? 0, s ?? 0, ms ?? 0);

SetDeliveryAcknowledgementTimeout(value);
}

public void Bind(string exchangeName, Action<IRabbitMqExchangeToExchangeBindingConfigurator> callback)
{
if (exchangeName == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace MassTransit.RabbitMqTransport.Configuration;

using System;
using RabbitMQ.Client;


public class RabbitMqStreamConfigurator :
Expand All @@ -16,7 +17,7 @@ public RabbitMqStreamConfigurator(RabbitMqReceiveSettings settings)

public long MaxLength
{
set => _settings.QueueArguments["x-max-length-bytes"] = value;
set => _settings.QueueArguments[Headers.XMaxLengthInBytes] = value;
}

public TimeSpan MaxAge
Expand All @@ -33,13 +34,13 @@ public TimeSpan MaxAge
else if (value.TotalSeconds >= 1)
text = $"{value.TotalSeconds:F0}s";

_settings.QueueArguments["x-max-age"] = text;
_settings.QueueArguments[Headers.XMaxAge] = text;
}
}

public long MaxSegmentSize
{
set => _settings.QueueArguments["x-stream-max-segment-size-bytes"] = value;
set => _settings.QueueArguments[Headers.XStreamMaxSegmentSizeInBytes] = value;
}

public string Filter
Expand All @@ -49,24 +50,24 @@ public string Filter

public void FromOffset(long offset)
{
_settings.ConsumeArguments["x-stream-offset"] = offset;
_settings.ConsumeArguments[Headers.XStreamOffset] = offset;
}

public void FromTimestamp(DateTime timestamp)
{
if (timestamp.Kind == DateTimeKind.Local)
timestamp = timestamp.ToUniversalTime();

_settings.ConsumeArguments.SetAmqpTimestamp("x-stream-offset", timestamp);
_settings.ConsumeArguments.SetAmqpTimestamp(Headers.XStreamOffset, timestamp);
}

public void FromFirst()
{
_settings.ConsumeArguments["x-stream-offset"] = "first";
_settings.ConsumeArguments[Headers.XStreamOffset] = "first";
}

public void FromLast()
{
_settings.ConsumeArguments["x-stream-offset"] = "last";
_settings.ConsumeArguments[Headers.XStreamOffset] = "last";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public async Task Should_properly_handle_message_redelivery()
.Endpoint(e => e.AddConfigureEndpointCallback(cfg =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
rmq.SetQueueArgument("x-consumer-timeout", 10000);
rmq.SetDeliveryAcknowledgementTimeout(ms: 10000);
}));

x.UsingRabbitMq((context, cfg) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace MassTransit.RabbitMqTransport.Tests
{
using System;
using NUnit.Framework;
using RabbitMQ.Client;


[TestFixture]
Expand Down Expand Up @@ -447,7 +448,7 @@ public class GivenATimeToLive
[Test]
public void HighAvailabilityQueue()
{
Assert.That(_receiveSettings.QueueArguments["x-message-ttl"], Is.EqualTo("30000"));
Assert.That(_receiveSettings.QueueArguments[Headers.XMessageTTL], Is.EqualTo("30000"));
}

[Test]
Expand Down

0 comments on commit c332e29

Please sign in to comment.