Skip to content

Commit

Permalink
Fix: edgeAgent creates rogue ModuleClients in case of error (Azure#5332)
Browse files Browse the repository at this point in the history
causing multiple ModuleClients trying to connect with same identity.

When edgeAgent creates a new instance of module client, it sets up subscriptions. If that failed (e.g in case of network problem), then the newly created ModuleClient was left there without referencing it and nothing closed it afterwards. Later edgeAgent needed to create a new module client again (as the previous attempt failed). Thus at least two active ModuleClient was created with the same connection string, both trying to connect iothub. When this happened, they started to fight over the connection - iothub always disconnected the previously connected ModuleClient.
Because edgeAgent pulls a twin at every connection, the connection fight mentioned above caused pulling several twins in every second.
  • Loading branch information
vipeller authored Aug 11, 2021
1 parent 89917f1 commit e3892eb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ async Task<IModuleClient> InitModuleClient()
async () =>
{
IModuleClient mc = await this.moduleClientProvider.Create(this.connectionStatusChangesHandler);
mc.Closed += this.OnModuleClientClosed;

if (this.enableSubscriptions)
{
await mc.SetDefaultMethodHandlerAsync(this.MethodCallback);
await mc.SetDesiredPropertyUpdateCallbackAsync(this.desiredPropertyUpdateCallback);
await this.EnableSubscriptions(mc);
}

mc.Closed += this.OnModuleClientClosed;
this.moduleClient = Option.Some(mc);
Events.InitializedNewModuleClient(this.enableSubscriptions);
return mc;
Expand All @@ -86,6 +86,30 @@ async Task<IModuleClient> InitModuleClient()
}
}

async Task EnableSubscriptions(IModuleClient moduleClient)
{
try
{
await moduleClient.SetDefaultMethodHandlerAsync(this.MethodCallback);
await moduleClient.SetDesiredPropertyUpdateCallbackAsync(this.desiredPropertyUpdateCallback);
}
catch (Exception ex)
{
Events.ErrorSettingUpNewModule(ex);

try
{
await moduleClient.CloseAsync();
}
catch
{
// swallowing intentionally
}

throw;
}
}

async void OnModuleClientClosed(object sender, EventArgs e)
{
try
Expand Down Expand Up @@ -134,14 +158,20 @@ enum EventIds
ReceivedMethodCallback,
DisposingModuleConnection,
DisposedModuleConnection,
ErrorDisposingModuleConnection
ErrorDisposingModuleConnection,
ErrorSettingUpNewModule
}

public static void ErrorHandlingModuleClosedEvent(Exception ex)
{
Log.LogWarning((int)EventIds.ErrorHandlingModuleClosedEvent, ex, "Error handling module client closed event");
}

public static void ErrorSettingUpNewModule(Exception ex)
{
Log.LogWarning((int)EventIds.ErrorSettingUpNewModule, ex, "Error setting up new module client");
}

public static void ModuleClientClosed(bool enableSubscriptions)
{
string message = enableSubscriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core.Requests;
Expand Down Expand Up @@ -206,5 +207,32 @@ public async Task SubscriptionsDisabledTest()
// Assert
Assert.True(optionResultModuleClient.HasValue);
}

[Fact]
public async Task FailingInitClosesModuleClient()
{
// Arrange
ConnectionStatusChangesHandler connectionStatusChangesHandler = (status, reason) => { };
DesiredPropertyUpdateCallback desiredPropertyUpdateCallback = (properties, context) => Task.CompletedTask;

var milestone = new SemaphoreSlim(0, 1);

var moduleClient = new Mock<IModuleClient>();
moduleClient.Setup(m => m.SetDefaultMethodHandlerAsync(It.IsAny<MethodCallback>())).Callback(() => milestone.Release()).Throws<TimeoutException>();

var moduleClientProvider = new Mock<IModuleClientProvider>();
moduleClientProvider.Setup(m => m.Create(connectionStatusChangesHandler)).ReturnsAsync(moduleClient.Object);

var requestManager = new Mock<IRequestManager>();
bool enableSubscriptions = true;

// Act
var moduleConnection = new ModuleConnection(moduleClientProvider.Object, requestManager.Object, connectionStatusChangesHandler, desiredPropertyUpdateCallback, enableSubscriptions);
await milestone.WaitAsync(TimeSpan.FromSeconds(10));
await Task.Delay(TimeSpan.FromSeconds(0.5)); // the milestone is hit a bit earlier than the exception, so wait a tiny bit

// Assert
moduleClient.Verify(m => m.CloseAsync(), Times.Once);
}
}
}

0 comments on commit e3892eb

Please sign in to comment.