Skip to content

Commit

Permalink
Remove timing from PriorityQueue E2E tests (Azure#2762)
Browse files Browse the repository at this point in the history
* Making PriorityQueue tests TimingProof.

* Now, load-gen and Relayer will both notify the test when they are complete. In that way, we don't have to wait with delays that would need to be configured for each platform.

* We do this by using a Direct Method on both modules. We poll every five seconds after the module starts up to see if it has completed yet.
  • Loading branch information
dylanbronson authored Apr 7, 2020
1 parent d2bb21d commit 46c2ef0
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 11 deletions.
41 changes: 32 additions & 9 deletions test/Microsoft.Azure.Devices.Edge.Test/PriorityQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ namespace Microsoft.Azure.Devices.Edge.Test
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.ModuleUtil;
using Microsoft.Azure.Devices.Edge.ModuleUtil.TestResults;
using Microsoft.Azure.Devices.Edge.Test.Common;
using Microsoft.Azure.Devices.Edge.Test.Common.Config;
using Microsoft.Azure.Devices.Edge.Test.Helpers;
using Microsoft.Azure.Devices.Edge.Util.Test.Common.NUnit;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using Serilog;

[EndToEnd]
public class PriorityQueues : SasManualProvisioningFixture
Expand All @@ -35,6 +39,7 @@ public async Task PriorityQueueModuleToModuleMessages()
string routeTemplate = $"FROM /messages/modules/{loadGenModuleName}/outputs/pri{0} INTO BrokeredEndpoint('/modules/{relayerModuleName}/inputs/input1')";

string trackingId = Guid.NewGuid().ToString();
string priorityString = this.BuildPriorityString(5);

Action<EdgeConfigBuilder> addInitialConfig = new Action<EdgeConfigBuilder>(
builder =>
Expand Down Expand Up @@ -75,7 +80,6 @@ public async Task PriorityQueueModuleToModuleMessages()
}
});

string priorityString = this.BuildPriorityString(5);
builder.AddModule(loadGenModuleName, loadGenImage)
.WithEnvironment(new[]
{
Expand All @@ -92,26 +96,31 @@ public async Task PriorityQueueModuleToModuleMessages()
});

EdgeDeployment deployment = await this.runtime.DeployConfigurationAsync(addInitialConfig, token);

// Wait for loadGen to send some messages
await Task.Delay(TimeSpan.Parse(loadGenTestDuration).Add(TimeSpan.FromSeconds(10)));
PriorityQueueTestStatus loadGenTestStatus = await this.PollUntilFinishedAsync(loadGenModuleName, token);

Action<EdgeConfigBuilder> addRelayerConfig = new Action<EdgeConfigBuilder>(
builder =>
{
builder.AddModule(relayerModuleName, relayerImage)
.WithEnvironment(new[] { ("receiveOnly", "true") });
.WithEnvironment(new[]
{
("receiveOnly", "true"),
("uniqueResultsExpected", loadGenTestStatus.ResultCount.ToString())
});
});

deployment = await this.runtime.DeployConfigurationAsync(addInitialConfig + addRelayerConfig, token);

// Wait for relayer to spin up, receive messages, and pass along results to TRC
await Task.Delay(TimeSpan.FromSeconds(30));
deployment = await this.runtime.DeployConfigurationAsync(addInitialConfig + addRelayerConfig, token, false);
await this.PollUntilFinishedAsync(relayerModuleName, token);

HttpClient client = new HttpClient();
HttpResponseMessage response = await client.GetAsync("http://localhost:5001/api/report");
var jsonstring = await response.Content.ReadAsStringAsync();
bool isPassed = (bool)JArray.Parse(jsonstring)[0]["IsPassed"];
if (!isPassed)
{
Log.Verbose("Test Result Coordinator response: {Response}", jsonstring);
}

Assert.IsTrue(isPassed);
}

Expand Down Expand Up @@ -164,5 +173,19 @@ private string BuildPriorityString(int numberOfPriorities)

return priorityString + TestConstants.PriorityQueues.Default;
}

private async Task<PriorityQueueTestStatus> PollUntilFinishedAsync(string moduleName, CancellationToken token)
{
PriorityQueueTestStatus testStatus;
do
{
await Task.Delay(TimeSpan.FromSeconds(5));
var result = await this.iotHub.InvokeMethodAsync(Context.Current.DeviceId, moduleName, new CloudToDeviceMethod("IsFinished", TimeSpan.FromSeconds(300), TimeSpan.FromSeconds(300)), token);
Assert.AreEqual(result.Status, (int)HttpStatusCode.OK);
testStatus = JsonConvert.DeserializeObject<PriorityQueueTestStatus>(result.GetPayloadAsJson());
}
while (!testStatus.IsFinished);
return testStatus;
}
}
}
18 changes: 18 additions & 0 deletions test/modules/ModuleLib/TestResults/PriorityQueueTestStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.ModuleUtil.TestResults
{
using Microsoft.Azure.Devices.Edge.Util;

public class PriorityQueueTestStatus
{
public PriorityQueueTestStatus(bool isFinished, int resultCount)
{
this.IsFinished = isFinished;
this.ResultCount = resultCount;
}

public bool IsFinished { get; }

public int ResultCount { get; }
}
}
37 changes: 37 additions & 0 deletions test/modules/Relayer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@
namespace Relayer
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.ModuleUtil;
using Microsoft.Azure.Devices.Edge.ModuleUtil.TestResults;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

/*
* Module for relaying messages. It receives a message and passes it on.
*/
class Program
{
static readonly ILogger Logger = ModuleUtil.CreateLogger("Relayer");
static volatile bool isFinished = false;
static ConcurrentBag<string> resultsReceived = new ConcurrentBag<string>();

static async Task Main(string[] args)
{
Expand All @@ -33,6 +41,8 @@ static async Task Main(string[] args)

(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(TimeSpan.FromSeconds(5), Logger);

await SetIsFinishedDirectMethodAsync(moduleClient);

// Receive a message and call ProcessAndSendMessageAsync to send it on its way
await moduleClient.SetInputMessageHandlerAsync(Settings.Current.InputName, ProcessAndSendMessageAsync, moduleClient);

Expand Down Expand Up @@ -121,6 +131,19 @@ static async Task<MessageResponse> ProcessAndSendMessageAsync(Message message, o
await ModuleUtil.ReportTestResultAsync(testResultReportingClient, Logger, testResultSent);
Logger.LogInformation($"Successfully sent message: trackingid={trackingId}, batchId={batchId}, sequenceNumber={sequenceNumber}");
}
else
{
int uniqueResultsExpected = Settings.Current.UniqueResultsExpected.Expect<ArgumentException>(() => throw new ArgumentException("Must supply this value if in ReceiveOnly mode"));
if (!resultsReceived.Contains(sequenceNumber))
{
resultsReceived.Add(sequenceNumber);
}

if (resultsReceived.Count == uniqueResultsExpected)
{
isFinished = true;
}
}
}
catch (Exception ex)
{
Expand All @@ -129,5 +152,19 @@ static async Task<MessageResponse> ProcessAndSendMessageAsync(Message message, o

return MessageResponse.Completed;
}

private static async Task SetIsFinishedDirectMethodAsync(ModuleClient client)
{
await client.SetMethodHandlerAsync(
"IsFinished",
(MethodRequest methodRequest, object _) => Task.FromResult(IsFinished()),
null);
}

private static MethodResponse IsFinished()
{
string response = JsonConvert.SerializeObject(new PriorityQueueTestStatus(isFinished, resultsReceived.Count));
return new MethodResponse(Encoding.UTF8.GetBytes(response), (int)HttpStatusCode.OK);
}
}
}
13 changes: 11 additions & 2 deletions test/modules/Relayer/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class Settings
string outputName,
Uri testResultCoordinatorUrl,
string moduleId,
bool receiveOnly)
bool receiveOnly,
Option<int> uniqueResultsExpected)
{
this.InputName = Preconditions.CheckNonWhiteSpace(inputName, nameof(inputName));
this.OutputName = Preconditions.CheckNonWhiteSpace(outputName, nameof(outputName));
this.TransportType = transportType;
this.TestResultCoordinatorUrl = Preconditions.CheckNotNull(testResultCoordinatorUrl, nameof(testResultCoordinatorUrl));
this.ModuleId = Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));
this.ReceiveOnly = receiveOnly;
this.UniqueResultsExpected = uniqueResultsExpected;
}

static Settings Create()
Expand All @@ -37,13 +39,17 @@ static Settings Create()
.AddEnvironmentVariables()
.Build();

int uniqueResultsNum = configuration.GetValue<int>("uniqueResultsExpected", -1);
Option<int> uniqueResultsExpected = uniqueResultsNum > 0 ? Option.Some(uniqueResultsNum) : Option.None<int>();

return new Settings(
configuration.GetValue("transportType", TransportType.Amqp_Tcp_Only),
configuration.GetValue("inputName", "input1"),
configuration.GetValue("outputName", "output1"),
configuration.GetValue<Uri>("testResultCoordinatorUrl", new Uri("http://testresultcoordinator:5001")),
configuration.GetValue<string>("IOTEDGE_MODULEID"),
configuration.GetValue<bool>("receiveOnly", false));
configuration.GetValue<bool>("receiveOnly", false),
uniqueResultsExpected);
}

public TransportType TransportType { get; }
Expand All @@ -58,6 +64,8 @@ static Settings Create()

public bool ReceiveOnly { get; }

public Option<int> UniqueResultsExpected { get; }

public override string ToString()
{
// serializing in this pattern so that secrets don't accidentally get added anywhere in the future
Expand All @@ -68,6 +76,7 @@ public override string ToString()
{ nameof(this.ModuleId), this.ModuleId },
{ nameof(this.TransportType), Enum.GetName(typeof(TransportType), this.TransportType) },
{ nameof(this.TestResultCoordinatorUrl), this.TestResultCoordinatorUrl.ToString() },
{ nameof(this.ReceiveOnly), this.ReceiveOnly.ToString() }
};

return $"Settings:{Environment.NewLine}{string.Join(Environment.NewLine, fields.Select(f => $"{f.Key}={f.Value}"))}";
Expand Down
29 changes: 29 additions & 0 deletions test/modules/load-gen/PriorityMessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ namespace LoadGen
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.ModuleUtil;
using Microsoft.Azure.Devices.Edge.ModuleUtil.TestResults;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

class PriorityMessageSender : LoadGenSenderBase
{
readonly Random rng = new Random();
private bool isFinished;
private int resultsSent;

public PriorityMessageSender(
ILogger logger,
Expand All @@ -23,6 +29,8 @@ public PriorityMessageSender(
{
this.PriorityString = Settings.Current.Priorities.Expect(() =>
new ArgumentException("PriorityMessageSender must have 'priorities' environment variable set to a valid list of string delimited by ';'"));
this.isFinished = false;
this.resultsSent = 0;
}

public string PriorityString { get; }
Expand All @@ -34,6 +42,9 @@ public async override Task RunAsync(CancellationTokenSource cts, DateTime testSt
bool firstMessageWhileOffline = true;
var priorityAndSequence = new SortedDictionary<int, List<long>>();
long messageIdCounter = 1;

await this.SetIsFinishedDirectMethodAsync();

while (!cts.IsCancellationRequested &&
(Settings.Current.TestDuration == TimeSpan.Zero || DateTime.UtcNow - testStartAt < Settings.Current.TestDuration))
{
Expand Down Expand Up @@ -96,6 +107,8 @@ public async override Task RunAsync(CancellationTokenSource cts, DateTime testSt
.SelectMany(t => t.Value)
.ToList();

this.resultsSent = expectedSequenceNumberList.Count;

// See explanation above why we need to send sequence number 1 first
await this.ReportResult(1);

Expand All @@ -105,6 +118,22 @@ public async override Task RunAsync(CancellationTokenSource cts, DateTime testSt
this.Logger.LogInformation($"Sending sequence number {sequenceNumber} to TRC");
await this.ReportResult(sequenceNumber);
}

this.isFinished = true;
}

private async Task SetIsFinishedDirectMethodAsync()
{
await this.Client.SetMethodHandlerAsync(
"IsFinished",
async (MethodRequest methodRequest, object _) => await Task.FromResult(this.IsFinished()),
null);
}

private MethodResponse IsFinished()
{
string response = JsonConvert.SerializeObject(new PriorityQueueTestStatus(this.isFinished, this.resultsSent));
return new MethodResponse(Encoding.UTF8.GetBytes(response), (int)HttpStatusCode.OK);
}
}
}

0 comments on commit 46c2ef0

Please sign in to comment.