Skip to content

Commit

Permalink
Merged PR 740397: EdgeHub fixes
Browse files Browse the repository at this point in the history
This PR contains a number of fixes in the EdgeHub -
- Add checks to make sure if CloudProxy.cloudReceiver is not initialized, then it logs error and handles the issue.
- We need to have only one CloudEndpoint per edgehub. So fixed EndpointFactory to return the same instance.
- The code to handle re-enabling of subscriptions needs to be re-visited. There are some complications there with the work the DeviceClient does v/s what EdgeHub does. This also ties in to what offline capabilities we want to support for GA. So will do it in a separate PR. Removing that capability for now.
  • Loading branch information
varunpuranik committed Apr 4, 2018
1 parent 82a4c5f commit ba537d2
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,35 @@ public Task SendFeedbackMessageAsync(string messageId, FeedbackStatus feedbackSt
}
}

public Task SetupCallMethodAsync() => this.cloudReceiver.SetupCallMethodAsync;
public Task SetupCallMethodAsync() =>
this.EnsureCloudReceiver(nameof(this.SetupCallMethodAsync)) ? this.cloudReceiver.SetupCallMethodAsync() : Task.CompletedTask;

public Task RemoveCallMethodAsync() => this.cloudReceiver.RemoveCallMethodAsync;
public Task RemoveCallMethodAsync() =>
this.EnsureCloudReceiver(nameof(this.RemoveCallMethodAsync)) ? this.cloudReceiver.RemoveCallMethodAsync() : Task.CompletedTask;

public Task SetupDesiredPropertyUpdatesAsync() => this.cloudReceiver.SetupDesiredPropertyUpdatesAsync();
public Task SetupDesiredPropertyUpdatesAsync() =>
this.EnsureCloudReceiver(nameof(this.SetupDesiredPropertyUpdatesAsync)) ? this.cloudReceiver.SetupDesiredPropertyUpdatesAsync() : Task.CompletedTask;

public Task RemoveDesiredPropertyUpdatesAsync() => this.cloudReceiver.RemoveDesiredPropertyUpdatesAsync();
public Task RemoveDesiredPropertyUpdatesAsync() =>
this.EnsureCloudReceiver(nameof(this.RemoveDesiredPropertyUpdatesAsync)) ? this.cloudReceiver.RemoveDesiredPropertyUpdatesAsync() : Task.CompletedTask;

public void StartListening() => this.cloudReceiver.StartListening();
public void StartListening()
{
if (this.EnsureCloudReceiver(nameof(this.RemoveDesiredPropertyUpdatesAsync)))
{
this.cloudReceiver.StartListening();
}
}

bool EnsureCloudReceiver(string operation)
{
if (this.cloudReceiver == null)
{
Events.CloudReceiverNull(this.clientId, operation);
return false;
}
return true;
}

Task HandleException(Exception ex)
{
Expand Down Expand Up @@ -243,9 +263,9 @@ public Task CloseAsync()
return this.receiveMessageTask.GetOrElse(Task.CompletedTask);
}

public Task SetupCallMethodAsync => this.cloudProxy.deviceClient.SetMethodDefaultHandlerAsync(this.MethodCallHandler, null);
public Task SetupCallMethodAsync() => this.cloudProxy.deviceClient.SetMethodDefaultHandlerAsync(this.MethodCallHandler, null);

public Task RemoveCallMethodAsync => this.cloudProxy.deviceClient.SetMethodDefaultHandlerAsync(null, null);
public Task RemoveCallMethodAsync() => this.cloudProxy.deviceClient.SetMethodDefaultHandlerAsync(null, null);

internal async Task<MethodResponse> MethodCallHandler(MethodRequest methodrequest, object usercontext)
{
Expand Down Expand Up @@ -314,7 +334,8 @@ enum EventIds
ReceiveError,
ReceiverStopped,
MethodReceived,
StartListening
StartListening,
CloudReceiverNull
}

public static void Closed(CloudProxy cloudProxy)
Expand Down Expand Up @@ -401,6 +422,11 @@ internal static void TerminatingErrorReceivingMessage(string clientId, Exception
{
Log.LogInformation((int)EventIds.ReceiveError, e, Invariant($"Error receiving C2D messages for device {clientId}. Closing receive loop."));
}

internal static void CloudReceiverNull(string clientId, string operation)
{
Log.LogWarning((int)EventIds.CloudReceiverNull, Invariant($"Cannot complete operation {operation} for device {clientId} because cloud receiver is null"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public async Task SendMessageAsync(IMessage message, string input)
var taskCompletionSource = new TaskCompletionSource<bool>();
this.messageTaskCompletionSources.TryAdd(lockToken, taskCompletionSource);

Events.SendingMessage(this.Identity, lockToken);
await this.underlyingProxy.SendMessageAsync(message, input);

Task completedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(TimeSpan.FromSeconds(60)));
Expand Down Expand Up @@ -244,7 +245,8 @@ enum EventIds
InvalidMethodResponse,
MessageFeedbackTimedout,
MessageFeedbackReceived,
MessageFeedbackWithNoMessageId
MessageFeedbackWithNoMessageId,
MessageSentToClient
}

public static void BindDeviceProxy(IIdentity identity)
Expand Down Expand Up @@ -296,6 +298,11 @@ public static void MethodCallSentToClient(IIdentity identity, string id, string
{
Log.LogDebug((int)EventIds.MethodSentToClient, Invariant($"Sent method invoke call from device/module {identity.Id} for {id} with correlation ID {correlationId}"));
}

public static void SendingMessage(IIdentity identity, string lockToken)
{
Log.LogDebug((int)EventIds.MessageSentToClient, Invariant($"Sent message with correlation ID {lockToken} to {identity.Id}"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ public async Task<ISinkResult> ProcessAsync(IRoutingMessage routingMessage, Canc
SendFailureDetails sendFailureDetails = null;

IMessage message = this.cloudEndpoint.messageConverter.ToMessage(routingMessage);

Util.Option<ICloudProxy> cloudProxy = this.GetCloudProxy(routingMessage);

if (!cloudProxy.HasValue)
{
sendFailureDetails = new SendFailureDetails(FailureKind.None, new EdgeHubConnectionException("IoT Hub is not connected"));
Expand Down Expand Up @@ -120,6 +119,7 @@ public async Task<ISinkResult> ProcessAsync(ICollection<IRoutingMessage> routing
Devices.Routing.Core.Util.Option<SendFailureDetails> sendFailureDetails =
Devices.Routing.Core.Util.Option.None<SendFailureDetails>();

Events.ProcessingMessages(routingMessages);
foreach (IRoutingMessage routingMessage in routingMessages)
{
if (token.IsCancellationRequested)
Expand Down Expand Up @@ -181,7 +181,8 @@ enum EventIds
DeviceIdNotFound = IdStart,
IoTHubNotConnected,
RetryingMessages,
InvalidMessage
InvalidMessage,
ProcessingMessages
}

public static void DeviceIdNotFound(IRoutingMessage routingMessage)
Expand Down Expand Up @@ -219,6 +220,11 @@ internal static void InvalidMessage(Exception ex)
// TODO - Add more info to this log message
Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message."));
}

public static void ProcessingMessages(ICollection<IRoutingMessage> routingMessages)
{
Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) upstream."));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
{
using System;
Expand All @@ -8,10 +8,10 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing

public class EndpointFactory : IEndpointFactory
{
const string CloudEndpoint = "$upstream";
const string CloudEndpointName = "$upstream";
const string FunctionEndpoint = "BrokeredEndpoint";
static readonly char[] BrokeredEndpointSplitChars = { '/' };

readonly CloudEndpoint cloudEndpoint;
readonly IConnectionManager connectionManager;
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly string edgeDeviceId;
Expand All @@ -23,13 +23,14 @@ public EndpointFactory(IConnectionManager connectionManager,
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.cloudEndpoint = new CloudEndpoint("iothub", (id) => this.connectionManager.GetCloudConnection(id), this.messageConverter);
}

public Endpoint CreateSystemEndpoint(string endpoint)
{
if (CloudEndpoint.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
if (CloudEndpointName.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
{
return new CloudEndpoint("iothub", (id) => this.connectionManager.GetCloudConnection(id), this.messageConverter);
return this.cloudEndpoint;
}
else
{
Expand Down Expand Up @@ -72,4 +73,4 @@ public Endpoint CreateFunctionEndpoint(string function, string parameterString)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public async Task<ISinkResult> ProcessAsync(ICollection<IRoutingMessage> routing
var invalid = new List<InvalidDetails<IRoutingMessage>>();
SendFailureDetails sendFailureDetails = null;

Events.ProcessingMessages(this.moduleEndpoint, routingMessages);
Util.Option<IDeviceProxy> deviceProxy = this.GetDeviceProxy();

if (!deviceProxy.HasValue)
{
failed.AddRange(routingMessages);
Expand Down Expand Up @@ -159,7 +159,8 @@ enum EventIds
NoDeviceProxy = IdStart,
ErrorSendingMessages,
RetryingMessages,
InvalidMessage
InvalidMessage,
ProcessingMessages
}

public static void NoDeviceProxy(ModuleEndpoint moduleEndpoint)
Expand All @@ -183,6 +184,11 @@ internal static void InvalidMessage(Exception ex)
// TODO - Add more info to this log message
Log.LogWarning((int)EventIds.InvalidMessage, ex, Invariant($"Non retryable exception occurred while sending message."));
}

public static void ProcessingMessages(ModuleEndpoint moduleEndpoint, ICollection<IRoutingMessage> routingMessages)
{
Log.LogDebug((int)EventIds.ProcessingMessages, Invariant($"Sending {routingMessages.Count} message(s) to module {moduleEndpoint.moduleId}."));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@ public SessionStateStoragePersistenceProvider(IConnectionManager connectionManag
: base(connectionManager)
{
this.sessionStore = Preconditions.CheckNotNull(sessionStore, nameof(sessionStore));
connectionManager.CloudConnectionEstablished += this.OnCloudConnectionEstablished;
}

async void OnCloudConnectionEstablished(object sender, IIdentity identity)
{
try
{
Preconditions.CheckNotNull(identity, nameof(identity));
Events.SetSubscriptionsStarted(identity);
Option<SessionState> sessionState = await this.sessionStore.Get(identity.Id);
if (!sessionState.HasValue)
{
Events.NoSessionStateFoundInStore(identity);
}
else
{
await sessionState.ForEachAsync(
async s =>
{
await this.ProcessSessionSubscriptions(identity.Id, s);
Events.SetSubscriptionsSuccess(identity);
});
}
}
catch (Exception ex)
{
Events.ClientReconnectError(ex, identity);
}
}

public override async Task<ISessionState> GetAsync(IDeviceIdentity identity) => (await this.sessionStore.Get(identity.Id)).GetOrElse((SessionState)null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ public async Task CanListenForDesiredPropertyUpdates()
Assert.Equal(expected.SystemProperties.Keys, actual.SystemProperties.Keys);
}

[Fact]
[TestPriority(407)]
public async Task CloudProxyNullReceiverTest()
{
// Arrange
string deviceConnectionStringKey = "device2ConnStrKey";
ICloudProxy cloudProxy = await this.GetCloudProxyWithConnectionStringKey(deviceConnectionStringKey);

// Act/assert
// Without setting up the cloudlistener, the following methods should not throw.
await cloudProxy.SetupCallMethodAsync();
await cloudProxy.RemoveCallMethodAsync();
await cloudProxy.SetupDesiredPropertyUpdatesAsync();
await cloudProxy.RemoveDesiredPropertyUpdatesAsync();
cloudProxy.StartListening();
}

async Task<ICloudProxy> GetCloudProxyWithConnectionStringKey(string connectionStringConfigKey)
{
const int ConnectionPoolSize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public void TestCreateSystemEndpoint()
Endpoint endpoint = this.endpointFactory.CreateSystemEndpoint("$upstream");
Assert.NotNull(endpoint);
Assert.IsType<CloudEndpoint>(endpoint);

Endpoint endpoint2 = this.endpointFactory.CreateSystemEndpoint("$upstream");
Assert.NotNull(endpoint2);
Assert.IsType<CloudEndpoint>(endpoint2);
Assert.Equal(endpoint, endpoint2);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,82 +275,6 @@ public async Task TestPersistence()
cloudProxy.Verify(x => x.SetupCallMethodAsync(), Times.Exactly(2));
cloudProxy.Verify(x => x.StartListening(), Times.Exactly(2));
cloudProxy.Verify(x => x.RemoveDesiredPropertyUpdatesAsync(), Times.Exactly(2));
}

[Fact]
[Unit]
public async Task TestConnectionEstablishedReenableSubscriptions()
{
string deviceId = "deviceId";

var cloudProxyMock = new Mock<ICloudProxy>();
cloudProxyMock.SetupGet(cp => cp.IsActive).Returns(true);

var cloudConnectionMock = new Mock<ICloudConnection>();
cloudConnectionMock.SetupGet(dp => dp.IsActive).Returns(true);
cloudConnectionMock.SetupGet(dp => dp.CloudProxy).Returns(Option.Some(cloudProxyMock.Object));
cloudConnectionMock.Setup(c => c.CreateOrUpdateAsync(It.IsAny<IClientCredentials>()))
.ReturnsAsync(cloudProxyMock.Object);

Action<string, CloudConnectionStatus> connectionChangeCallback = (_, __) => { };
var cloudConnectionProvider = new Mock<ICloudConnectionProvider>();
cloudConnectionProvider.Setup(c => c.Connect(It.IsAny<IClientCredentials>(), It.IsAny<Action<string, CloudConnectionStatus>>()))
.Callback<IClientCredentials, Action<string, CloudConnectionStatus>>((id, cb) => connectionChangeCallback = cb)
.ReturnsAsync(() => Try.Success(cloudConnectionMock.Object));

var protocolGatewayIdentity = Mock.Of<IProtocolgatewayDeviceIdentity>(i => i.Id == deviceId);
var edgeCredentials = Mock.Of<IClientCredentials>(c => c.Identity == Mock.Of<IIdentity>(i => i.Id == deviceId));

var connectionManager = new ConnectionManager(cloudConnectionProvider.Object);
await connectionManager.CreateCloudConnectionAsync(edgeCredentials);

var sessionProvider = new SessionStateStoragePersistenceProvider(connectionManager, this.entityStore);
ISessionState sessionState = await sessionProvider.GetAsync(protocolGatewayIdentity);
Assert.Null(sessionState);

sessionState = new SessionState(false);
sessionState.AddOrUpdateSubscription(SessionStatePersistenceProvider.C2DSubscriptionTopicPrefix, QualityOfService.AtLeastOnce);
sessionState.AddOrUpdateSubscription(SessionStatePersistenceProvider.MethodSubscriptionTopicPrefix, QualityOfService.AtMostOnce);
sessionState.RemoveSubscription(SessionStatePersistenceProvider.TwinSubscriptionTopicPrefix);
await sessionProvider.SetAsync(protocolGatewayIdentity, sessionState);

cloudProxyMock.Verify(x => x.SetupCallMethodAsync(), Times.Once);
cloudProxyMock.Verify(x => x.StartListening(), Times.Once);
cloudProxyMock.Verify(x => x.RemoveDesiredPropertyUpdatesAsync(), Times.Once);

ISessionState storedSession = await sessionProvider.GetAsync(protocolGatewayIdentity);
Assert.NotNull(storedSession);

var retrievedSessionState = storedSession as SessionState;
Assert.NotNull(retrievedSessionState);
Assert.Equal(2, retrievedSessionState.Subscriptions.Count);
Assert.Equal(3, retrievedSessionState.SubscriptionRegistrations.Count);
Assert.True(retrievedSessionState.SubscriptionRegistrations.ContainsKey(SessionStatePersistenceProvider.C2DSubscriptionTopicPrefix));
Assert.True(retrievedSessionState.SubscriptionRegistrations[SessionStatePersistenceProvider.C2DSubscriptionTopicPrefix]);
Assert.True(retrievedSessionState.SubscriptionRegistrations.ContainsKey(SessionStatePersistenceProvider.MethodSubscriptionTopicPrefix));
Assert.True(retrievedSessionState.SubscriptionRegistrations[SessionStatePersistenceProvider.MethodSubscriptionTopicPrefix]);
Assert.True(retrievedSessionState.SubscriptionRegistrations.ContainsKey(SessionStatePersistenceProvider.TwinSubscriptionTopicPrefix));
Assert.False(retrievedSessionState.SubscriptionRegistrations[SessionStatePersistenceProvider.TwinSubscriptionTopicPrefix]);
ISubscription c2DSubscription = retrievedSessionState.Subscriptions.FirstOrDefault(s => s.TopicFilter == SessionStatePersistenceProvider.C2DSubscriptionTopicPrefix);
Assert.NotNull(c2DSubscription);
Assert.Equal(QualityOfService.AtLeastOnce, c2DSubscription.QualityOfService);
Assert.True(DateTime.UtcNow - c2DSubscription.CreationTime < TimeSpan.FromMinutes(2));
ISubscription methodSubscription = retrievedSessionState.Subscriptions.FirstOrDefault(s => s.TopicFilter == SessionStatePersistenceProvider.MethodSubscriptionTopicPrefix);
Assert.NotNull(methodSubscription);
Assert.Equal(QualityOfService.AtMostOnce, methodSubscription.QualityOfService);
Assert.True(DateTime.UtcNow - methodSubscription.CreationTime < TimeSpan.FromMinutes(2));

await sessionProvider.SetAsync(protocolGatewayIdentity, sessionState);

cloudProxyMock.Verify(x => x.SetupCallMethodAsync(), Times.Exactly(2));
cloudProxyMock.Verify(x => x.StartListening(), Times.Exactly(2));
cloudProxyMock.Verify(x => x.RemoveDesiredPropertyUpdatesAsync(), Times.Exactly(2));

connectionChangeCallback.Invoke(deviceId, CloudConnectionStatus.ConnectionEstablished);

cloudProxyMock.Verify(x => x.SetupCallMethodAsync(), Times.Exactly(3));
cloudProxyMock.Verify(x => x.StartListening(), Times.Exactly(3));
cloudProxyMock.Verify(x => x.RemoveDesiredPropertyUpdatesAsync(), Times.Exactly(3));
}
}
}
}
Loading

0 comments on commit ba537d2

Please sign in to comment.