Skip to content

Commit

Permalink
Merged PR 276703: Support for Invoking methods from modules on other …
Browse files Browse the repository at this point in the history
…modules/devices

Overview - the intent of these changes is to allow modules to call direct methods on other modules/devices via the EdgeHub.

Note: This is an early preview. I am still working on refining the changes, testing and adding tests.

Changes involved -
- Added an HTTP Head because the invoke call is made on HTTP. Why? Because the existing Method invoke call from cloud is made on Http, and we wanted to maintain parity. We are using AspNet Core for the Http server.
- Identity moves to Core, as it is shared between MQTT and HTTP.
- Logic to send method call and receive response, and correlate the two moves from CloudReceiver to EdgeHub. This is because module to module calls don't go via CloudReceiver.
- Authentication in HTTP works pretty much similar to the existing MQTT authentication (with caching).
- Initialization code had to be moved around to fit the MQTT bootstrap code with aspnet core startup code.

Testing -
- I have added basic tests for the major new components. I am still working on adding more tests. I will send out iterations with more tests soon.
- I have tested the basic scenario with modules calling methods on devices, which seems to be working fine. I am also doing more manual testing with simulated modules/devices.

Related work items: #1262687, #1257296, #1257297
  • Loading branch information
varunpuranik committed Jun 15, 2017
1 parent 14fdd18 commit 6435dee
Show file tree
Hide file tree
Showing 71 changed files with 1,651 additions and 375 deletions.
18 changes: 18 additions & 0 deletions Microsoft.Azure.Devices.Edge.sln
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SimulatedTemperatureSensor"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "service", "service", "{024364A6-77BA-4812-A061-5D95016EDCD7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Hub.Http", "edge-hub\src\Microsoft.Azure.Devices.Edge.Hub.Http\Microsoft.Azure.Devices.Edge.Hub.Http.csproj", "{16C92EBE-CB81-4980-8927-590CA5D5A97E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Hub.Http.Test", "edge-hub\test\Microsoft.Azure.Devices.Edge.Hub.Http.Test\Microsoft.Azure.Devices.Edge.Hub.Http.Test.csproj", "{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E0809CFF-5FBD-4B0A-B778-8B32C3B470BE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Service", "edge-service\src\Microsoft.Azure.Devices.Edge.Service\Microsoft.Azure.Devices.Edge.Service.csproj", "{BCB8B740-A654-46A9-A016-0A91C847778F}"
Expand Down Expand Up @@ -204,6 +208,18 @@ Global
{BCB8B740-A654-46A9-A016-0A91C847778F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BCB8B740-A654-46A9-A016-0A91C847778F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BCB8B740-A654-46A9-A016-0A91C847778F}.Release|Any CPU.Build.0 = Release|Any CPU
{16C92EBE-CB81-4980-8927-590CA5D5A97E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16C92EBE-CB81-4980-8927-590CA5D5A97E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16C92EBE-CB81-4980-8927-590CA5D5A97E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16C92EBE-CB81-4980-8927-590CA5D5A97E}.Release|Any CPU.Build.0 = Release|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Release|Any CPU.Build.0 = Release|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -241,5 +257,7 @@ Global
{592F8A5A-B1C7-4011-9879-B2AEE118DD3F} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
{E0809CFF-5FBD-4B0A-B778-8B32C3B470BE} = {024364A6-77BA-4812-A061-5D95016EDCD7}
{BCB8B740-A654-46A9-A016-0A91C847778F} = {E0809CFF-5FBD-4B0A-B778-8B32C3B470BE}
{16C92EBE-CB81-4980-8927-590CA5D5A97E} = {AB4285D8-CF1D-4B20-95F6-CB80892C8321}
{E4239B7F-1FDB-4034-8FD7-E9486CA091A4} = {63969606-14B2-4D9D-AB72-A5D60D22037C}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<None Update="appsettings_agent.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="config.json">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Service

public class Program
{
const string ConfigFileName = "appsettings.json";
const string ConfigFileName = "appsettings_agent.json";

public static int Main()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ public Task RemoveCallMethodAsync()
return this.cloudReceiver.RemoveCallMethodAsync();
}

public Task SendMethodResponseAsync(DirectMethodResponse response)
{
return this.cloudReceiver.SendMethodResponseAsync(response);
}

public Task SetupDesiredPropertyUpdatesAsync()
{
return this.cloudReceiver.SetupDesiredPropertyUpdatesAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public CloudReceiver(DeviceClient deviceClient, IMessageConverterProvider messag
this.deviceClient = Preconditions.CheckNotNull(deviceClient, nameof(deviceClient));
this.cloudListener = Preconditions.CheckNotNull(cloudListener, nameof(cloudListener));
this.messageConverterProvider = Preconditions.CheckNotNull(messageConverterProvider, nameof(messageConverterProvider));
this.identity = Preconditions.CheckNotNull(identity);
this.identity = Preconditions.CheckNotNull(identity, nameof(identity));
IMessageConverter<TwinCollection> converter = this.messageConverterProvider.Get<TwinCollection>();
this.desiredUpdateHandler = new DesiredPropertyUpdateHandler(cloudListener, converter);
this.methodCalls = new ConcurrentDictionary<string, TaskCompletionSource<MethodResponse>>();
Expand Down Expand Up @@ -100,49 +100,18 @@ public Task SetupCallMethodAsync()
public Task RemoveCallMethodAsync()
{
return this.deviceClient.SetMethodDefaultHandlerAsync(null, null);
}

public Task SendMethodResponseAsync(DirectMethodResponse response)
{
Preconditions.CheckNotNull(response, nameof(response));

var deviceClientResponse = new MethodResponse(response.Data, response.Status);

if (this.methodCalls.TryRemove(response.RequestId.ToLowerInvariant(), out TaskCompletionSource<MethodResponse> taskCompletion))
{
Events.MethodResponseReceived(this, response.RequestId);
taskCompletion.SetResult(deviceClientResponse);
}
else
{
Events.MethodResponseNotMapped(this, response.RequestId);
}

return TaskEx.Done;
}
}

internal async Task<MethodResponse> MethodCallHandler(MethodRequest methodrequest, object usercontext)
{
Preconditions.CheckNotNull(methodrequest, nameof(methodrequest));

Events.MethodCallReceived(this);

string id = this.correlationId.Increment().ToString();
var taskCompletion = new TaskCompletionSource<MethodResponse>();

this.methodCalls.TryAdd(id.ToLowerInvariant(), taskCompletion);
await this.cloudListener.CallMethodAsync(new DirectMethodRequest(id, methodrequest.Name, methodrequest.Data));
Events.MethodCallSentToClient(this, id);

Task completedTask = await Task.WhenAny(taskCompletion.Task, Task.Delay(DeviceMethodMaxResponseTimeout));
if (completedTask != taskCompletion.Task)
{
Events.MethodResponseTimedout(this, id);
taskCompletion.TrySetResult(new MethodResponse(GatewayTimeoutErrorCode));
this.methodCalls.TryRemove(id.ToLowerInvariant(), out taskCompletion);
}

return await taskCompletion.Task;
var direceMethodRequest = new DirectMethodRequest(this.identity.Id, methodrequest.Name, methodrequest.Data, DeviceMethodMaxResponseTimeout);
DirectMethodResponse directMethodResponse = await this.cloudListener.CallMethodAsync(direceMethodRequest);
var methodResponse = new MethodResponse(directMethodResponse.Data, directMethodResponse.Status);
return methodResponse;
}

public Task SetupDesiredPropertyUpdatesAsync()
Expand Down Expand Up @@ -175,7 +144,6 @@ enum EventIds
ReceiveError,
ReceiverStopped,
MethodReceived,
MethodSentToClient,
MethodResponseReceived,
MethodRequestIdNotMatched,
MethodResponseTimedout
Expand Down Expand Up @@ -206,11 +174,6 @@ public static void MethodCallReceived(CloudReceiver cloudReceiver)
Log.LogDebug((int)EventIds.MethodReceived, Invariant($"Received call method from cloud for device {cloudReceiver.identity.Id}"));
}

public static void MethodCallSentToClient(CloudReceiver cloudReceiver, string requestId)
{
Log.LogDebug((int)EventIds.MethodSentToClient, Invariant($"Sent call method from cloud for device {cloudReceiver.identity.Id} with requestId {requestId}"));
}

public static void MethodResponseReceived(CloudReceiver cloudReceiver, string requestId)
{
Log.LogDebug((int)EventIds.MethodResponseReceived, Invariant($"Received response for call with requestId {requestId} method from device {cloudReceiver.identity.Id}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ public async Task<bool> AuthenticateAsync(IIdentity identity)
// Authentication here happens against the cloud, i.e., Azure IoT Hub. We consider a device/module
// as having authenticated successfully if we are able to acquire a valid ICloudProxy object from
// the connection manager.

// Initially we will have many modules connecting with same device ID, so this is a GetOrCreate.
// When we have module identity implemented, this should be CreateCloudConnectionAsync.
Try<ICloudProxy> cloudProxyTry = await this.connectionManager.GetOrCreateCloudConnectionAsync(Preconditions.CheckNotNull(identity, nameof(identity)));
Try<ICloudProxy> cloudProxyTry = await this.connectionManager.CreateCloudConnectionAsync(Preconditions.CheckNotNull(identity, nameof(identity)));
Events.AuthResult(cloudProxyTry, identity.Id);
return cloudProxyTry.Success && cloudProxyTry.Value.IsActive;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public Option<IDeviceProxy> GetDeviceConnection(string deviceId)

public Option<ICloudProxy> GetCloudConnection(string deviceId)
{
// TODO: This line is a temporary workaround to use the underlying DeviceIdentity for cloud connections for modules
deviceId = GetDeviceId(deviceId);

return this.devices.TryGetValue(Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId)), out ConnectedDevice device)
? device.CloudProxy.Filter(cp => cp.IsActive)
: Option.None<ICloudProxy>();
Expand Down Expand Up @@ -80,64 +83,61 @@ public async Task<Try<ICloudProxy>> CreateCloudConnectionAsync(IIdentity identit
{
Preconditions.CheckNotNull(identity, nameof(identity));

// TODO: This line is a temporary workaround to use the underlying DeviceIdentity for cloud connections for modules
IIdentity deviceIdentity = GetDeviceIdentity(identity);

// Open a connection to Azure IoT Hub for this device/module.
Try<ICloudProxy> cloudProxy = await this.cloudProxyProvider.Connect(identity);
Try<ICloudProxy> cloudProxy = await this.cloudProxyProvider.Connect(deviceIdentity);
if (cloudProxy.Success)
{
// Update the cloud proxy stored in this.devices with this new cloud proxy
// instance.
ConnectedDevice device = this.GetOrCreateConnectedDevice(identity);
ConnectedDevice device = this.GetOrCreateConnectedDevice(deviceIdentity);
Option<ICloudProxy> currentCloudProxy = device.UpdateCloudProxy(cloudProxy.Value);

// If the existing cloud proxy had an active connection then close it since we
// now have a new connected cloud proxy.
await currentCloudProxy.Filter(cp => cp.IsActive)
.Map(cp => cp.CloseAsync())
.GetOrElse(Task.FromResult(true));
Events.NewCloudConnection(identity);
Events.NewCloudConnection(deviceIdentity);
}
return cloudProxy;
}

public Task<Try<ICloudProxy>> GetOrCreateCloudConnectionAsync(IIdentity identity)
{
Preconditions.CheckNotNull(identity, nameof(identity));

// TODO: This line is a temporary workaround to use the underlying DeviceIdentity for cloud connections for modules
IIdentity deviceIdentity = GetDeviceIdentity(identity);

// Get an existing ConnectedDevice from this.devices or add a new non-connected
// instance to this.devices and return that.
ConnectedDevice device = this.GetOrCreateConnectedDevice(identity);
ConnectedDevice device = this.GetOrCreateConnectedDevice(deviceIdentity);

// Read this code as: if the cloud proxy instance has a value and it is connected
// already (evidenced by the fact that its 'IsActive' property is 'true') then return
// that instance as is. Otherwise, open a new connection.
return device.CloudProxy.Filter(cp => cp.IsActive)
.Match(cp => Task.FromResult(Try.Success(cp)),
() =>
{
// TODO - Temporary code to allow multiple modules to use same connection to the IoTHub.
// Once IoTHub supports modules connecting to it, we should remove this.
Option<ICloudProxy> existingConnection = this.CheckExistingCloudConnection(identity);
return existingConnection.Match(
c =>
{
device.UpdateCloudProxy(c);
return Task.FromResult(Try.Success(c));
},
() => this.CreateCloudConnectionAsync(identity));
}
);
.Match(cp => Task.FromResult(Try.Success(cp)), () => this.CreateCloudConnectionAsync(deviceIdentity));
}

Option<ICloudProxy> CheckExistingCloudConnection(IIdentity identity)
/// <summary>
/// If the identity is a moduleIdentity, it creates an identity for the underlying device.
/// TODO: This is a temporary workaround to use the underlying DeviceIdentity for cloud connections for modules
/// </summary>
static IIdentity GetDeviceIdentity(IIdentity identity)
{
KeyValuePair<string, ConnectedDevice> deviceWithActiveCloudConnection = this.devices.FirstOrDefault(
d => IsSameDevice(identity, d.Value.Identity) && d.Value.CloudProxy.Exists(cp => cp.IsActive));
return deviceWithActiveCloudConnection.Equals(default(KeyValuePair<string, ConnectedDevice>))
? Option.None<ICloudProxy>()
: deviceWithActiveCloudConnection.Value.CloudProxy;
var moduleIdentity = identity as ModuleIdentity;
return moduleIdentity != null ? new DeviceIdentity(moduleIdentity, moduleIdentity.DeviceId) : identity;
}

static bool IsSameDevice(IIdentity id1, IIdentity id2)
/// <summary>
/// If the id is deviceId/moduleId, then it gets the deviceId from it
/// TODO: This is a temporary workaround to use the underlying DeviceIdentity for cloud connections for modules
/// </summary>
static string GetDeviceId(string id)
{
return GetDeviceId(id1).Equals(GetDeviceId(id2), StringComparison.OrdinalIgnoreCase);
int seperatorIndex = id.IndexOf('/');
return seperatorIndex > 0 ? id.Substring(0, seperatorIndex) : id;
}

static string GetDeviceId(IIdentity identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,34 @@

namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
using System;

public class DirectMethodRequest
{
public DirectMethodRequest(string id, string name, byte[] data)
public DirectMethodRequest(string id, string name, byte[] data, TimeSpan responseTimeout)
: this(id, name, data, responseTimeout, TimeSpan.Zero)
{ }

public DirectMethodRequest(string id, string name, byte[] data, TimeSpan responseTimeout, TimeSpan connectTimeout)
{
this.Id = id;
this.Name = name;
this.Data = data;
this.Data = data;
this.ConnectTimeout = connectTimeout;
this.ResponseTimeout = responseTimeout;
this.CorrelationId = Guid.NewGuid().ToString();
}

public string Id { get; }

public string CorrelationId { get; }

public string Name { get; }

public byte[] Data { get; }

public TimeSpan ConnectTimeout { get; }

public TimeSpan ResponseTimeout { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ public class DirectMethodResponse
{
public DirectMethodResponse(string rid, byte[] data, int statusCode)
{
this.RequestId = rid;
this.CorrelationId = rid;
this.Data = data;
this.Status = statusCode;
}
Expand All @@ -15,6 +15,6 @@ public DirectMethodResponse(string rid, byte[] data, int statusCode)

public int Status { get; }

public string RequestId { get; }
public string CorrelationId { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ public static class HubCoreEventIds
public const int CloudEndpoint = EventIdStart + 200;
public const int ModuleEndpoint = EventIdStart + 300;
public const int Authenticator = EventIdStart + 400;
public const int RoutingEdgeHub = EventIdStart + 500;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ public interface IEdgeHub
Task ProcessDeviceMessage(IIdentity identity, IMessage message);

Task ProcessDeviceMessageBatch(IIdentity identity, IEnumerable<IMessage> message);

Task<DirectMethodResponse> InvokeMethodAsync(IIdentity identity, DirectMethodRequest methodRequest);

Task SendMethodResponseAsync(DirectMethodResponse response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
public interface IValidator<in T>
{
void Validate(T value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
<None Remove="Listeners\**" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices.ModuleClient" Version="1.2.11" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
<ProjectReference Include="..\Microsoft.Azure.Devices.Routing.Core\Microsoft.Azure.Devices.Routing.Core.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Cloud
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Util;

class CloudListener : ICloudListener
{
readonly IDeviceProxy deviceProxy;
readonly IEdgeHub edgeHub;
readonly IIdentity identity;

public CloudListener(IDeviceProxy deviceProxy)
public CloudListener(IDeviceProxy deviceProxy, IEdgeHub edgeHub, IIdentity identity)
{
this.deviceProxy = deviceProxy;
this.deviceProxy = Preconditions.CheckNotNull(deviceProxy, nameof(deviceProxy));
this.edgeHub = Preconditions.CheckNotNull(edgeHub, nameof(edgeHub));
this.identity = Preconditions.CheckNotNull(identity, nameof(identity));
}

public Task CallMethodAsync(DirectMethodRequest request) => this.deviceProxy.CallMethodAsync(request);
public Task<DirectMethodResponse> CallMethodAsync(DirectMethodRequest request) => this.edgeHub.InvokeMethodAsync(this.identity, request);

public Task OnDesiredPropertyUpdates(IMessage desiredProperties) => this.deviceProxy.OnDesiredPropertyUpdates(desiredProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public interface ICloudListener

Task OnDesiredPropertyUpdates(IMessage desiredProperties);

Task CallMethodAsync(DirectMethodRequest request);
Task<DirectMethodResponse> CallMethodAsync(DirectMethodRequest request);
}
}
Loading

0 comments on commit 6435dee

Please sign in to comment.