Skip to content

Commit

Permalink
DurableTask.AzureStorage: Make max queue polling interval configurable (
Browse files Browse the repository at this point in the history
Azure#257)

It's just one setting because control queues matter most and having that different from work-item queues is not expected to be important.
  • Loading branch information
cgillum authored Feb 28, 2019
1 parent 880f701 commit bb6f54c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace DurableTask.AzureStorage
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
Expand All @@ -43,8 +42,6 @@ public sealed class AzureStorageOrchestrationService :
IPartitionObserver<BlobLease>,
IDisposable
{
internal static readonly TimeSpan MaxQueuePollingDelay = TimeSpan.FromSeconds(30);

static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0];
static readonly OrchestrationInstance EmptySourceInstance = new OrchestrationInstance
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class AzureStorageOrchestrationServiceSettings
{
internal const int DefaultPartitionCount = 4;

internal static readonly TimeSpan DefaultMaxQueuePollingInterval = TimeSpan.FromSeconds(30);

/// <summary>
/// Gets or sets the number of messages to pull from the control queue at a time. The default is 32.
/// The maximum batch size supported by Azure Storage Queues is 32.
Expand Down Expand Up @@ -135,6 +137,11 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public TimeSpan LeaseInterval { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Maximum interval for polling control and work-item queues.
/// </summary>
public TimeSpan MaxQueuePollingInterval { get; set; } = DefaultMaxQueuePollingInterval;

/// <summary>
/// Gets or sets the Azure Storage Account details
/// If provided, this is used to connect to Azure Storage
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.AzureStorage/BackoffPollingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public TimeSpan UpdateDelay(bool executionSucceeded)
this.CurrentInterval = this.maximumInterval;
}
}
// else do nothing and keep current interval equal to max

// else do nothing and keep current interval equal to max
return this.CurrentInterval;
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ public TaskHubQueue(
this.messageManager = messageManager;

TimeSpan minPollingDelay = TimeSpan.FromMilliseconds(50);
TimeSpan maxPollingDelay = AzureStorageOrchestrationService.MaxQueuePollingDelay;
TimeSpan maxPollingDelay = settings.MaxQueuePollingInterval;
if (maxPollingDelay < minPollingDelay)
{
maxPollingDelay = minPollingDelay;
}

this.backoffHelper = new BackoffPollingHelper(minPollingDelay, maxPollingDelay);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public class DisconnectedPerformanceMonitor
internal const int QueueLengthSampleSize = 5;
internal const int MaxMessagesPerWorkerRatio = 100;

static readonly int MaxPollingLatency = (int)AzureStorageOrchestrationService.MaxQueuePollingDelay.TotalMilliseconds;
static readonly int HighLatencyThreshold = Math.Min(MaxPollingLatency, 1000); // milliseconds
static readonly int LowLatencyThreshold = 200; // milliseconds
static readonly Random Random = new Random();

Expand All @@ -39,6 +37,8 @@ public class DisconnectedPerformanceMonitor

readonly CloudStorageAccount storageAccount;
readonly string taskHub;
readonly int maxPollingLatency;
readonly int highLatencyThreshold;

int currentPartitionCount;
int currentWorkItemQueueLength;
Expand All @@ -59,10 +59,18 @@ public DisconnectedPerformanceMonitor(string storageConnectionString, string tas
/// </summary>
/// <param name="storageAccount">The Azure Storage account to monitor.</param>
/// <param name="taskHub">The name of the task hub within the specified storage account.</param>
public DisconnectedPerformanceMonitor(CloudStorageAccount storageAccount, string taskHub)
/// <param name="maxPollingIntervalMilliseconds">The maximum interval in milliseconds for polling control and work-item queues.</param>
public DisconnectedPerformanceMonitor(
CloudStorageAccount storageAccount,
string taskHub,
int? maxPollingIntervalMilliseconds = null)
{
this.storageAccount = storageAccount;
this.taskHub = taskHub;
this.maxPollingLatency =
maxPollingIntervalMilliseconds ??
(int)AzureStorageOrchestrationServiceSettings.DefaultMaxQueuePollingInterval.TotalMilliseconds;
this.highLatencyThreshold = Math.Min(this.maxPollingLatency, 1000);
}

internal virtual int PartitionCount => this.currentPartitionCount;
Expand Down Expand Up @@ -276,12 +284,12 @@ ScaleRecommendation MakeScaleRecommendation(int workerCount)
keepWorkersAlive: false,
reason: "Task hub is idle");
}
else if (IsHighLatency(this.WorkItemQueueLatencies))
else if (this.IsHighLatency(this.WorkItemQueueLatencies))
{
return new ScaleRecommendation(
ScaleAction.AddWorker,
keepWorkersAlive: true,
reason: $"Work-item queue latency: {this.WorkItemQueueLatencies.Latest} > {HighLatencyThreshold}");
reason: $"Work-item queue latency: {this.WorkItemQueueLatencies.Latest} > {this.highLatencyThreshold}");
}
else if (workerCount > this.PartitionCount && IsIdle(this.WorkItemQueueLatencies))
{
Expand All @@ -292,14 +300,14 @@ ScaleRecommendation MakeScaleRecommendation(int workerCount)
}

// Control queues are partitioned; only scale-out if there are more partitions than workers.
if (workerCount < this.ControlQueueLatencies.Count(IsHighLatency))
if (workerCount < this.ControlQueueLatencies.Count(this.IsHighLatency))
{
// Some control queues are busy, so scale out until workerCount == partitionCount.
QueueMetricHistory metric = this.ControlQueueLatencies.First(IsHighLatency);
QueueMetricHistory metric = this.ControlQueueLatencies.First(this.IsHighLatency);
return new ScaleRecommendation(
ScaleAction.AddWorker,
keepWorkersAlive: true,
reason: $"High control queue latency: {metric.Latest} > {HighLatencyThreshold}");
reason: $"High control queue latency: {metric.Latest} > {this.highLatencyThreshold}");
}
else if (workerCount > this.ControlQueueLatencies.Count(h => !IsIdle(h)) && IsIdle(this.WorkItemQueueLatencies))
{
Expand Down Expand Up @@ -333,16 +341,16 @@ ScaleRecommendation MakeScaleRecommendation(int workerCount)
return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: $"Queue latencies are healthy");
}

static bool IsHighLatency(QueueMetricHistory history)
bool IsHighLatency(QueueMetricHistory history)
{
if (history.Previous == 0)
{
// If previous was zero, the queue may have been idle, which means
// backoff polling might have been the reason for the latency.
return history.Latest >= MaxPollingLatency;
return history.Latest >= this.maxPollingLatency;
}

return history.Latest >= HighLatencyThreshold;
return history.Latest >= this.highLatencyThreshold;
}

static bool IsLowLatency(QueueMetricHistory history)
Expand Down

0 comments on commit bb6f54c

Please sign in to comment.