Skip to content

Commit

Permalink
Plumb endpoint ID and priority to checkpointer for metrics (Azure#2903)
Browse files Browse the repository at this point in the history
  • Loading branch information
richma-ms authored May 1, 2020
1 parent 5fd01be commit b473bf4
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace Microsoft.Azure.Devices.Routing.Core
public interface ICheckpointerFactory
{
Task<ICheckpointer> CreateAsync(string id);
Task<ICheckpointer> CreateAsync(string id, string endpointId, uint priority);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Checkpointer : ICheckpointer
readonly AtomicBoolean closed;
readonly ICheckpointStore store;

Checkpointer(string id, ICheckpointStore store, CheckpointData checkpointData)
Checkpointer(string id, ICheckpointStore store, CheckpointData checkpointData, string endpointId, uint priority)
{
this.Id = Preconditions.CheckNotNull(id);
this.store = Preconditions.CheckNotNull(store);
Expand All @@ -32,32 +32,8 @@ public class Checkpointer : ICheckpointer
this.UnhealthySince = checkpointData.UnhealthySince;
this.Proposed = checkpointData.Offset;
this.closed = new AtomicBoolean(false);

// The endpoint ID and priority is encoded into the checkpointer ID
// using the following format:
// {endpointId}_Pri{priority}
// We use "_Pri" as a delimiter to parse the endpoint ID and priority
// back out for metrics reporting
string[] tokens = id.Split(new string[] { "_Pri" }, System.StringSplitOptions.RemoveEmptyEntries);

switch (tokens.Length)
{
case 1:
// There's no priority value, which means this is
// the checkpointer for the default priority
this.EndpointId = tokens[0];
this.Priority = RouteFactory.DefaultPriority.ToString();
break;
case 2:
this.EndpointId = tokens[0];
this.Priority = tokens[1];
break;
default:
// Bad format (maybe due to testcase or some other such)
this.EndpointId = string.Empty;
this.Priority = string.Empty;
break;
}
this.EndpointId = endpointId;
this.Priority = priority.ToString();
}

public string Id { get; }
Expand All @@ -76,15 +52,17 @@ public class Checkpointer : ICheckpointer

public bool HasOutstanding => this.Offset < this.Proposed;

public static async Task<Checkpointer> CreateAsync(string id, ICheckpointStore store)
public static Task<Checkpointer> CreateAsync(string id, ICheckpointStore store) => CreateAsync(id, store, string.Empty, RouteFactory.DefaultPriority);

public static async Task<Checkpointer> CreateAsync(string id, ICheckpointStore store, string endpointId, uint priority)
{
Preconditions.CheckNotNull(id);
Preconditions.CheckNotNull(store);

Events.CreateStart(id);
CheckpointData checkpointData = await store.GetCheckpointDataAsync(id, CancellationToken.None);

var checkpointer = new Checkpointer(id, store, checkpointData);
var checkpointer = new Checkpointer(id, store, checkpointData, endpointId, priority);

Events.CreateFinished(checkpointer);
return checkpointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ public static async Task<MasterCheckpointer> CreateAsync(string id, ICheckpointS
return masterCheckpointer;
}

public async Task<ICheckpointer> CreateAsync(string id)
public Task<ICheckpointer> CreateAsync(string id) => this.CreateAsync(id, string.Empty, RouteFactory.DefaultPriority);

public async Task<ICheckpointer> CreateAsync(string id, string endpointId, uint priority)
{
Events.CreateChildStart(this, id);

Checkpointer checkpointer = await Checkpointer.CreateAsync(id, this.store);
Checkpointer checkpointer = await Checkpointer.CreateAsync(id, this.store, endpointId, priority);
var child = new ChildCheckpointer(this, checkpointer);
await this.AddChild(child);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace Microsoft.Azure.Devices.Routing.Core.Checkpointers
public class NullCheckpointerFactory : ICheckpointerFactory
{
public Task<ICheckpointer> CreateAsync(string id) => Task.FromResult(NullCheckpointer.Instance);
public Task<ICheckpointer> CreateAsync(string id, string _, uint __) => Task.FromResult(NullCheckpointer.Instance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public async Task UpdatePriorities(IList<uint> priorities, Option<Endpoint> newE
await this.messageStore.AddEndpoint(id);

// Create a checkpointer and a FSM for every message queue
ICheckpointer checkpointer = await this.checkpointerFactory.CreateAsync(id);
ICheckpointer checkpointer = await this.checkpointerFactory.CreateAsync(id, this.Endpoint.Id, priority);
EndpointExecutorFsm fsm = new EndpointExecutorFsm(this.Endpoint, checkpointer, this.config);

// Add it to our dictionary
Expand Down

0 comments on commit b473bf4

Please sign in to comment.