Skip to content

Commit

Permalink
Fix edgehub queue len metric (Azure#4952)
Browse files Browse the repository at this point in the history
This attempts to fix the issue of queue len metric reporting incorrectly due to how MessageStore calculated the current queue length. This change now gives the ability to directly request the Count of items from the store implementation.

There is also a matter of discussion on whether we can get the metric to exist solely in one unified place and whether we require both Checkpointer and MessageStore.
  • Loading branch information
nyanzebra authored May 14, 2021
1 parent e5218d1 commit 065bf32
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,14 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// If checkEntireQueueOnCleanup is set to false, we only peek the head, message counts is tailOffset-headOffset+1
// otherwise count while iterating over the queue.
var headOffset = 0L;
var tailOffset = sequentialStore.GetTailOffset(CancellationToken.None);
var messageCount = 0L;

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
if (offset > checkpointData.Offset && expiry > DateTime.UtcNow)
{
// message is not sent and not expired, increase message counts
messageCount++;
return false;
}

headOffset = Math.Max(headOffset, offset);

var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);

await message.ForEachAsync(async msg =>
Expand Down Expand Up @@ -368,12 +358,10 @@ await message.ForEachAsync(async msg =>
{
cleanupCount++;
}

messageCount = tailOffset - headOffset + 1;
}

// update Metrics for message counts
Checkpointer.Metrics.QueueLength.Set(messageCount, new[] { endpointId, priority.ToString(), bool.TrueString });
Checkpointer.Metrics.SetQueueLength(await sequentialStore.Count(), endpointId, priority.ToString());
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static void CommitStarted(Checkpointer checkpointer, int successfulCount,

public static void CommitFinished(Checkpointer checkpointer)
{
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinishedo] {context}", GetContextString(checkpointer));
Log.LogInformation((int)EventIds.CommitFinished, "[CheckpointerCommitFinished] {context}", GetContextString(checkpointer));
}

public static void Close(Checkpointer checkpointer)
Expand All @@ -203,7 +203,13 @@ public static class Metrics
"Number of messages pending to be processed for the endpoint",
new List<string> { "endpoint", "priority", MetricsConstants.MsTelemetry });

public static void SetQueueLength(Checkpointer checkpointer) => QueueLength.Set(checkpointer.Proposed - checkpointer.Offset, new[] { checkpointer.EndpointId, checkpointer.Priority, bool.TrueString });
public static void SetQueueLength(Checkpointer checkpointer) => SetQueueLength(CalculateQueueLength(checkpointer), checkpointer.EndpointId, checkpointer.Priority);

public static void SetQueueLength(double length, string endpointId, string priority) => QueueLength.Set(length, new[] { endpointId, priority, bool.TrueString });

private static double CalculateQueueLength(Checkpointer checkpointer) => CalculateQueueLength(checkpointer.Proposed - checkpointer.Offset);

private static double CalculateQueueLength(long length) => Math.Max(length, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@ namespace Microsoft.Azure.Devices.Edge.Storage.RocksDb
class ColumnFamilyDbStore : IDbStore
{
readonly IRocksDb db;
private ulong count;

public ColumnFamilyDbStore(IRocksDb db, ColumnFamilyHandle handle)
{
this.db = Preconditions.CheckNotNull(db, nameof(db));
this.Handle = Preconditions.CheckNotNull(handle, nameof(handle));

var iterator = db.NewIterator(this.Handle);
this.count = 0;
while (iterator.Valid())
{
this.count += 1;
iterator = iterator.Next();
}
}

internal ColumnFamilyHandle Handle { get; }
Expand Down Expand Up @@ -49,20 +58,23 @@ public async Task<Option<byte[]>> Get(byte[] key, CancellationToken cancellation
return returnValue;
}

public Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
public async Task Put(byte[] key, byte[] value, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));
Preconditions.CheckNotNull(value, nameof(value));

Action operation = () => this.db.Put(key, value, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
await operation.ExecuteUntilCancelled(cancellationToken);
this.count += 1;
}

public Task Remove(byte[] key, CancellationToken cancellationToken)
public async Task Remove(byte[] key, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(key, nameof(key));

Action operation = () => this.db.Remove(key, this.Handle);
return operation.ExecuteUntilCancelled(cancellationToken);
await operation.ExecuteUntilCancelled(cancellationToken);
this.count -= 1;
}

public async Task<Option<(byte[] key, byte[] value)>> GetLastEntry(CancellationToken cancellationToken)
Expand Down Expand Up @@ -128,6 +140,8 @@ public Task IterateBatch(int batchSize, Func<byte[], byte[], Task> callback, Can
return this.IterateBatch(iterator => iterator.SeekToFirst(), batchSize, callback, cancellationToken);
}

public Task<ulong> Count() => Task.FromResult(this.count);

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,7 @@ public Task Remove(byte[] key, CancellationToken cancellationToken)
{
return this.dbStore.Remove(key, cancellationToken);
}

public Task<ulong> Count() => this.dbStore.Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ await decryptedValue.ForEachAsync(
cancellationToken);
}

public Task<ulong> Count() => this.entityStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, Cancellatio
public Task<bool> Contains(TK key, CancellationToken cancellationToken)
=> this.dbStore.Contains(key, cancellationToken);

public Task<ulong> Count() => this.dbStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,7 @@ public interface IKeyValueStore<TK, TV> : IDisposable
Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);

Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken);

Task<ulong> Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ public interface ISequentialStore<T> : IDisposable
Task<bool> RemoveFirst(Func<long, T, Task<bool>> predicate, CancellationToken cancellationToken);

Task<IEnumerable<(long, T)>> GetBatch(long startingOffset, int batchSize, CancellationToken cancellationToken);

Task<ulong> Count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public async Task Remove(byte[] key, CancellationToken cancellationToken)
}
}

public Task<ulong> Count() => Task.FromResult((ulong)this.keyValues.Count);

public void Dispose()
{
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> callback
public Task IterateBatch(int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
=> this.IterateBatch(Option.None<TK>(), batchSize, callback, cancellationToken);

public Task<ulong> Count() => this.underlyingStore.Count();

Task IterateBatch(Option<TK> startKey, int batchSize, Func<TK, TV, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckRange(batchSize, 1, nameof(batchSize));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ public void Dispose()
public Task IterateBatch(int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;

public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntityCallback, CancellationToken cancellationToken) => Task.CompletedTask;

public Task<ulong> Count() => Task.FromResult(0UL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ await this.entityStore.IterateBatch(
return batch;
}

public Task<ulong> Count() => this.entityStore.Count();

public void Dispose()
{
this.Dispose(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ public Task IterateBatch(TK startKey, int batchSize, Func<TK, TV, Task> perEntit
Func<CancellationToken, Task> iterateWithTimeout = cts => this.underlyingKeyValueStore.IterateBatch(startKey, batchSize, perEntityCallback, cts);
return iterateWithTimeout.TimeoutAfter(cancellationToken, this.timeout);
}

public Task<ulong> Count() => this.underlyingKeyValueStore.Count();
}
}

0 comments on commit 065bf32

Please sign in to comment.