Skip to content

Commit

Permalink
Analyzer module (Azure#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
ancaantochi authored Jul 31, 2018
1 parent ed80868 commit f5a730c
Show file tree
Hide file tree
Showing 20 changed files with 672 additions and 20 deletions.
15 changes: 11 additions & 4 deletions Microsoft.Azure.Devices.Edge.sln
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edg
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker", "edge-agent\src\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.csproj", "{F8253E40-6EE0-4FFA-BF39-D133A504175E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test", "edge-agent\test\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test.csproj", "{79E573DB-A6A8-4F6F-B5A0-DA2258EB82E6}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test", "edge-agent\test\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test\Microsoft.Azure.Devices.Edge.Agent.Edgelet.Docker.Test.csproj", "{79E573DB-A6A8-4F6F-B5A0-DA2258EB82E6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "edge-modules", "edge-modules", "{DCF9EDA5-E906-4B87-B9E0-140075CA8FD1}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "load-gen", "edge-modules\load-gen\load-gen.csproj", "{54771470-860C-4853-9318-6DB4EA76B595}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "load-gen", "edge-modules\load-gen\load-gen.csproj", "{54771470-860C-4853-9318-6DB4EA76B595}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessagesAnalyzer", "edge-modules\MessagesAnalyzer\MessagesAnalyzer.csproj", "{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -426,6 +426,12 @@ Global
{54771470-860C-4853-9318-6DB4EA76B595}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54771470-860C-4853-9318-6DB4EA76B595}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54771470-860C-4853-9318-6DB4EA76B595}.Release|Any CPU.Build.0 = Release|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.CodeCoverage|Any CPU.ActiveCfg = Debug|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.CodeCoverage|Any CPU.Build.0 = Debug|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -488,7 +494,8 @@ Global
{77CF0F04-0DE0-4D81-93DE-56006D7BE213} = {F5E37327-3AA9-4CC2-9FE3-B28271ADB5E3}
{F8253E40-6EE0-4FFA-BF39-D133A504175E} = {54351E51-19CB-4DE3-8302-99846AB216CF}
{79E573DB-A6A8-4F6F-B5A0-DA2258EB82E6} = {F5E37327-3AA9-4CC2-9FE3-B28271ADB5E3}
{54771470-860C-4853-9318-6DB4EA76B595} = {DCF9EDA5-E906-4B87-B9E0-140075CA8FD1}
{54771470-860C-4853-9318-6DB4EA76B595} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D71830F5-3AF5-46B4-8A9E-1DCE4F2253AC}
Expand Down
18 changes: 18 additions & 0 deletions edge-modules/MessagesAnalyzer/Controllers/ReportController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer.Controllers
{
using Microsoft.AspNetCore.Mvc;

[Route("api/[controller]")]
[ApiController]
public class ReportController : Controller
{
// GET api/report
[HttpGet]
public ActionResult<string> Get()
{
return Reporter.GetReceivedMessagesReport(Settings.Current.ToleranceInMilliseconds).ToString();
}
}
}
24 changes: 24 additions & 0 deletions edge-modules/MessagesAnalyzer/DeviceReport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

[JsonObject(NamingStrategyType = typeof(CamelCaseNamingStrategy))]
class DeviceReport
{
public DeviceReport(IList<ModuleReport> report)
{
this.Report = report;
}

IList<ModuleReport> Report { get; }

public override string ToString()
{
return JsonConvert.SerializeObject(this.Report, Formatting.Indented);
}
}
}
19 changes: 19 additions & 0 deletions edge-modules/MessagesAnalyzer/MessageDetails.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System;

class MessageDetails
{
public long SequenceNumber { get; }

public DateTime EnquedDateTime { get; }

public MessageDetails(long seqNumber, DateTime enquedDateTime)
{
this.SequenceNumber = seqNumber;
this.EnquedDateTime = enquedDateTime;
}
}
}
39 changes: 39 additions & 0 deletions edge-modules/MessagesAnalyzer/MessagesAnalyzer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<Content Include="docker*/**/*.*" CopyToPublishDirectory="Always" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Hosting" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Http.Extensions" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Http.Features" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.1.1" />
<PackageReference Include="Microsoft.Azure.Devices" Version="1.16.0" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="2.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.1.1" />
<PackageReference Include="Serilog" Version="2.7.1" />
<PackageReference Include="Serilog.Extensions.Logging" Version="2.0.2" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>

<ItemGroup>
<None Update="config\settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
92 changes: 92 additions & 0 deletions edge-modules/MessagesAnalyzer/MessagesCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

class MessagesCache
{
// maps batchId with moduleId, there can be multiple batches for a module
readonly ConcurrentDictionary<string, string> batches = new ConcurrentDictionary<string, string>();
// maps batchId with messages
readonly ConcurrentDictionary<string, IList<MessageDetails>> messages = new ConcurrentDictionary<string, IList<MessageDetails>>();
readonly IComparer<MessageDetails> comparer = new EventDataComparer();

MessagesCache() { }

public static MessagesCache Instance { get; } = new MessagesCache();

public void AddMessage(string moduleId, string batchId, MessageDetails messageDetails)
{
this.batches.TryAdd(batchId, moduleId);

IList<MessageDetails> batchMessages = this.messages.GetOrAdd(batchId, key => new List<MessageDetails>());
this.AddMessageDetails(batchMessages, messageDetails);
}

public IDictionary<string, IList<SortedSet<MessageDetails>>> GetMessagesSnapshot()
{
IDictionary<string, IList<SortedSet<MessageDetails>>> snapshotResult = new Dictionary<string, IList<SortedSet<MessageDetails>>>();

IDictionary<string, string> batchesSnapshot = this.batches.ToArray().ToDictionary(p => p.Key, p => p.Value);
IDictionary<string, IList<MessageDetails>> messagesSnapshot = this.messages.ToArray().ToDictionary(p => p.Key, p => p.Value);

foreach (KeyValuePair<string, IList<MessageDetails>> batchMessages in messagesSnapshot)
{
IList<MessageDetails> detailsSnapshot = this.GetMessageDetailsSnapshot(batchMessages.Value);
string moduleId = batchesSnapshot[batchMessages.Key];

if (snapshotResult.TryGetValue(moduleId, out IList<SortedSet<MessageDetails>> msg))
{
msg.Add(new SortedSet<MessageDetails>(detailsSnapshot, this.comparer));
}
else
{
var batchSortedMessages = new List<SortedSet<MessageDetails>>
{
new SortedSet<MessageDetails>(detailsSnapshot, this.comparer)
};
snapshotResult.Add(moduleId, batchSortedMessages);
}
}

return snapshotResult;
}

void AddMessageDetails(IList<MessageDetails> batchMessages, MessageDetails messageDetails)
{
lock (batchMessages)
{
batchMessages.Add(messageDetails);
}
}

IList<MessageDetails> GetMessageDetailsSnapshot(IList<MessageDetails> batchMessages)
{
MessageDetails[] details;
lock (batchMessages)
{
details = new MessageDetails[batchMessages.Count];
batchMessages.CopyTo(details, 0);
}

return details;
}

class EventDataComparer : IComparer<MessageDetails>
{
public int Compare(MessageDetails msg1, MessageDetails msg2)
{
if (msg1 == null)
return -1;

if (msg2 == null)
return -1;

return msg1.SequenceNumber.CompareTo(msg2.SequenceNumber);
}
}
}
}
25 changes: 25 additions & 0 deletions edge-modules/MessagesAnalyzer/MissedMessagesDetails.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

[JsonObject(NamingStrategyType = typeof(CamelCaseNamingStrategy))]
class MissedMessagesDetails
{
public long MissedMessagesCount { get; }

public DateTime StartDateTime { get; }

public DateTime EndDateTime { get; }

public MissedMessagesDetails(long missedMessagesCount, DateTime startDateTime, DateTime endDateTime)
{
this.MissedMessagesCount = missedMessagesCount;
this.StartDateTime = startDateTime;
this.EndDateTime = endDateTime;
}
}
}
51 changes: 51 additions & 0 deletions edge-modules/MessagesAnalyzer/ModuleReport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System;
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

[JsonObject(NamingStrategyType = typeof(CamelCaseNamingStrategy))]
class ModuleReport
{
public ModuleReport(string moduleId, StatusCode statusCode, long receivedMessagesCount, string statusMessage) : this(moduleId, statusCode, receivedMessagesCount, statusMessage, DateTime.MinValue, new List<MissedMessagesDetails>())
{
}

public ModuleReport(string moduleId, StatusCode statusCode, long receivedMessagesCount, string statusMessage, DateTime lastMessageReceivedAt) : this(moduleId, statusCode, receivedMessagesCount, statusMessage, lastMessageReceivedAt, new List<MissedMessagesDetails>())
{
}

public ModuleReport(string moduleId, StatusCode statusCode, long receivedMessagesCount, string statusMessage, DateTime lastMessageReceivedAt, IList<MissedMessagesDetails> missedMessages)
{
this.ModuleId = moduleId;
this.StatusCode = statusCode;
this.ReceivedMessagesCount = receivedMessagesCount;
this.StatusMessage = statusMessage;
this.MissedMessages = missedMessages;
this.LastMessageReceivedAt = lastMessageReceivedAt;
}

public string ModuleId { get; }

public StatusCode StatusCode { get; }

public string StatusMessage { get; }

public long ReceivedMessagesCount { get; }

public DateTime LastMessageReceivedAt { get; }

public IList<MissedMessagesDetails> MissedMessages
{
get;
}

public override string ToString()
{
return JsonConvert.SerializeObject(this, Formatting.Indented);
}
}
}
75 changes: 75 additions & 0 deletions edge-modules/MessagesAnalyzer/PartitionReceiverHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.

namespace MessagesAnalyzer
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Serilog;

class PartitionReceiveHandler : IPartitionReceiveHandler
{
const string DeviceIdPropertyName = "iothub-connection-device-id";
const string ModuleIdPropertyName = "iothub-connection-module-id";
const string SequenceNumberPropertyName = "sequenceNumber";
const string EnqueuedTimePropertyName = "iothub-enqueuedtime";
const string BatchIdPropertyName = "batchId";
readonly string deviceId;
readonly IList<string> excludedModulesIds;

public PartitionReceiveHandler(string deviceId, IList<string> excludedModulesIds)
{
this.deviceId = deviceId;
this.excludedModulesIds = excludedModulesIds;
}

public Task ProcessEventsAsync(IEnumerable<EventData> events)
{
if (events != null)
{
foreach (EventData eventData in events)
{
eventData.SystemProperties.TryGetValue(DeviceIdPropertyName, out object devId);
eventData.SystemProperties.TryGetValue(ModuleIdPropertyName, out object modId);

if (devId != null && devId.ToString() == this.deviceId &&
modId != null && !this.excludedModulesIds.Contains(modId.ToString()))
{
eventData.Properties.TryGetValue(SequenceNumberPropertyName, out object sequence);
eventData.Properties.TryGetValue(BatchIdPropertyName, out object batchId);

if (sequence != null && batchId != null)
{
DateTime enqueuedtime = DateTime.MinValue;
if (eventData.SystemProperties.TryGetValue(EnqueuedTimePropertyName, out object enqueued))
{
DateTime.TryParse(enqueued.ToString(), out enqueuedtime);
}

if (long.TryParse(sequence.ToString(), out long sequenceNumber))
{
MessagesCache.Instance.AddMessage(modId.ToString(), batchId.ToString(), new MessageDetails(sequenceNumber, enqueuedtime));
}
}
else
{
Log.Debug($"Message for moduleId: {modId} doesn't contain required properties");

}
}
}
}

return Task.CompletedTask;
}

public Task ProcessErrorAsync(Exception error)
{
Log.Error(error.StackTrace);
return Task.CompletedTask;
}

public int MaxBatchSize { get; set; }
}
}
Loading

0 comments on commit f5a730c

Please sign in to comment.