Skip to content

Commit

Permalink
Merged PR 818438: Device SDK update with DeviceClient, ModuleClient s…
Browse files Browse the repository at this point in the history
…plit

Updated Device SDK and use DeviceClient or ModuleClient
  • Loading branch information
ancaantochi committed May 8, 2018
1 parent ab91ff2 commit bf3954f
Show file tree
Hide file tree
Showing 42 changed files with 553 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public struct AgentEventIds
public const int DockerEnvironment = EventIdStart + 900;
public const int ModuleLifecycleCommandFactory = EventIdStart + 1000;
public const int EdgeAgentConnection = EventIdStart + 1100;
public const int DeviceClient = EventIdStart + 1200;
public const int ModuleClient = EventIdStart + 1200;
public const int RetryingServiceClient = EventIdStart + 1300;
public const int OrderedRetryPlanRunner = EventIdStart + 1400;
public const int ModuleManagementHttpClient = EventIdStart + 1500;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class EdgeAgentConnection : IEdgeAgentConnection
readonly Task initTask;
readonly RetryStrategy retryStrategy;

Option<IDeviceClient> deviceClient;
Option<IModuleClient> deviceClient;
TwinCollection desiredProperties;
Option<TwinCollection> reportedProperties;
Option<DeploymentConfigInfo> deploymentConfigInfo;
Expand All @@ -34,28 +34,28 @@ public class EdgeAgentConnection : IEdgeAgentConnection
static readonly RetryStrategy TransientRetryStrategy =
new Util.TransientFaultHandling.ExponentialBackoff(5, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));

public EdgeAgentConnection(IDeviceClientProvider deviceClientProvider,
public EdgeAgentConnection(IModuleClientProvider moduleClientProvider,
ISerde<DeploymentConfig> desiredPropertiesSerDe)
: this(deviceClientProvider, desiredPropertiesSerDe, TransientRetryStrategy)
: this(moduleClientProvider, desiredPropertiesSerDe, TransientRetryStrategy)
{ }

internal EdgeAgentConnection(IDeviceClientProvider deviceClientProvider,
internal EdgeAgentConnection(IModuleClientProvider moduleClientProvider,
ISerde<DeploymentConfig> desiredPropertiesSerDe,
RetryStrategy retryStrategy)
{
this.desiredPropertiesSerDe = Preconditions.CheckNotNull(desiredPropertiesSerDe, nameof(desiredPropertiesSerDe));
this.deploymentConfigInfo = Option.None<DeploymentConfigInfo>();
this.reportedProperties = Option.None<TwinCollection>();
this.deviceClient = Option.None<IDeviceClient>();
this.deviceClient = Option.None<IModuleClient>();
this.retryStrategy = Preconditions.CheckNotNull(retryStrategy, nameof(retryStrategy));
this.initTask = this.CreateAndInitDeviceClient(Preconditions.CheckNotNull(deviceClientProvider, nameof(deviceClientProvider)));
this.initTask = this.CreateAndInitDeviceClient(Preconditions.CheckNotNull(moduleClientProvider, nameof(moduleClientProvider)));
}

async Task CreateAndInitDeviceClient(IDeviceClientProvider deviceClientProvider)
async Task CreateAndInitDeviceClient(IModuleClientProvider moduleClientProvider)
{
using (await this.twinLock.LockAsync())
{
IDeviceClient dc = await deviceClientProvider.Create(
IModuleClient dc = await moduleClientProvider.Create(
this.OnConnectionStatusChanged,
async d =>
{
Expand Down Expand Up @@ -116,7 +116,7 @@ async Task RefreshTwinAsync()
// recover from this situation
var retryPolicy = new RetryPolicy(AllButFatalErrorDetectionStrategy, this.retryStrategy);
retryPolicy.Retrying += (_, args) => Events.RetryingGetTwin(args);
IDeviceClient dc = this.deviceClient.Expect(() => new InvalidOperationException("DeviceClient not yet initialized"));
IModuleClient dc = this.deviceClient.Expect(() => new InvalidOperationException("DeviceClient not yet initialized"));
Twin twin = await retryPolicy.ExecuteAsync(() => dc.GetTwinAsync());

this.desiredProperties = twin.Properties.Desired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Util;

public class EnvironmentDeviceClientProvider : IDeviceClientProvider
public class EnvironmentModuleClientProvider : IModuleClientProvider
{
readonly Option<UpstreamProtocol> upstreamProtocol;

public EnvironmentDeviceClientProvider(Option<UpstreamProtocol> upstreamProtocol)
public EnvironmentModuleClientProvider(Option<UpstreamProtocol> upstreamProtocol)
{
this.upstreamProtocol = upstreamProtocol;
}

public Task<IDeviceClient> Create(
public Task<IModuleClient> Create(
ConnectionStatusChangesHandler statusChangedHandler,
Func<IDeviceClient, Task> initialize) =>
DeviceClient.Create(this.upstreamProtocol, statusChangedHandler, initialize);
Func<IModuleClient, Task> initialize) =>
ModuleClient.Create(this.upstreamProtocol, statusChangedHandler, initialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Shared;

public interface IDeviceClient : IDisposable
public interface IModuleClient : IDisposable
{
Task SetDesiredPropertyUpdateCallbackAsync(DesiredPropertyUpdateCallback onDesiredPropertyChanged);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;

public interface IDeviceClientProvider
public interface IModuleClientProvider
{
Task<IDeviceClient> Create(ConnectionStatusChangesHandler statusChangedHandler, Func<IDeviceClient, Task> initialize);
Task<IModuleClient> Create(ConnectionStatusChangesHandler statusChangedHandler, Func<IModuleClient, Task> initialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices" Version="1.16.0-preview-003" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.17.0-preview-006" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.17.0-preview-007" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using Microsoft.Azure.Devices.Shared;
using Microsoft.Extensions.Logging;

public class DeviceClient : IDeviceClient
public class ModuleClient : IModuleClient
{
static readonly ITransientErrorDetectionStrategy TransientErrorDetectionStrategy = new ErrorDetectionStrategy();
static readonly RetryStrategy TransientRetryStrategy =
Expand All @@ -28,38 +28,38 @@ public class DeviceClient : IDeviceClient
[UpstreamProtocol.MqttWs] = TransportType.Mqtt_WebSocket_Only
};

readonly Client.DeviceClient deviceClient;
readonly Client.ModuleClient deviceClient;

DeviceClient(Client.DeviceClient deviceClient)
ModuleClient(Client.ModuleClient deviceClient)
{
this.deviceClient = deviceClient;
}

public static Task<IDeviceClient> Create(Option<UpstreamProtocol> upstreamProtocol,
public static Task<IModuleClient> Create(Option<UpstreamProtocol> upstreamProtocol,
ConnectionStatusChangesHandler statusChangedHandler,
Func<DeviceClient, Task> initialize)
Func<ModuleClient, Task> initialize)
{
return Create(upstreamProtocol, initialize, t => CreateAndOpenDeviceClient(t, statusChangedHandler));
}

public static Task<IDeviceClient> Create(string connectionString,
public static Task<IModuleClient> Create(string connectionString,
Option<UpstreamProtocol> upstreamProtocol,
ConnectionStatusChangesHandler statusChangedHandler,
Func<DeviceClient, Task> initialize)
Func<ModuleClient, Task> initialize)
{
return Create(upstreamProtocol, initialize, t => CreateAndOpenDeviceClient(t, connectionString, statusChangedHandler));
}

static async Task<IDeviceClient> Create(Option<UpstreamProtocol> upstreamProtocol, Func<DeviceClient, Task> initialize, Func<TransportType, Task<Client.DeviceClient>> deviceClientCreator)
static async Task<IModuleClient> Create(Option<UpstreamProtocol> upstreamProtocol, Func<ModuleClient, Task> initialize, Func<TransportType, Task<Client.ModuleClient>> deviceClientCreator)
{
try
{
return await ExecuteWithRetry(
async () =>
{
Client.DeviceClient dc = await CreateDeviceClientForUpstreamProtocol(upstreamProtocol, deviceClientCreator);
Client.ModuleClient dc = await CreateDeviceClientForUpstreamProtocol(upstreamProtocol, deviceClientCreator);
Events.DeviceClientCreated();
var deviceClient = new DeviceClient(dc);
var deviceClient = new ModuleClient(dc);
if (initialize != null)
{
await initialize(deviceClient);
Expand All @@ -77,17 +77,17 @@ static async Task<IDeviceClient> Create(Option<UpstreamProtocol> upstreamProtoco
}
}

internal static Task<Client.DeviceClient> CreateDeviceClientForUpstreamProtocol(
internal static Task<Client.ModuleClient> CreateDeviceClientForUpstreamProtocol(
Option<UpstreamProtocol> upstreamProtocol,
Func<TransportType, Task<Client.DeviceClient>> deviceClientCreator)
Func<TransportType, Task<Client.ModuleClient>> deviceClientCreator)
=> upstreamProtocol
.Map(u => deviceClientCreator(UpstreamProtocolTransportTypeMap[u]))
.GetOrElse(
async () =>
{
// The device SDK doesn't appear to be falling back to WebSocket from TCP,
// so we'll do it explicitly until we can get the SDK sorted out.
Try<Client.DeviceClient> result = await Fallback.ExecuteAsync(
Try<Client.ModuleClient> result = await Fallback.ExecuteAsync(
() => deviceClientCreator(TransportType.Amqp_Tcp_Only),
() => deviceClientCreator(TransportType.Amqp_WebSocket_Only));
if (!result.Success)
Expand All @@ -98,25 +98,25 @@ static async Task<IDeviceClient> Create(Option<UpstreamProtocol> upstreamProtoco
return result.Value;
});

static async Task<Client.DeviceClient> CreateAndOpenDeviceClient(TransportType transport, ConnectionStatusChangesHandler statusChangedHandler)
static async Task<Client.ModuleClient> CreateAndOpenDeviceClient(TransportType transport, ConnectionStatusChangesHandler statusChangedHandler)
{
Events.AttemptingConnectionWithTransport(transport);
Client.DeviceClient deviceClient = new DeviceClientFactory(transport).Create();
Client.ModuleClient deviceClient = Client.ModuleClient.CreateFromEnvironment(transport);
await OpenAsync(statusChangedHandler, deviceClient);
Events.ConnectedWithTransport(transport);
return deviceClient;
}

static async Task<Client.DeviceClient> CreateAndOpenDeviceClient(TransportType transport, string connectionString, ConnectionStatusChangesHandler statusChangedHandler)
static async Task<Client.ModuleClient> CreateAndOpenDeviceClient(TransportType transport, string connectionString, ConnectionStatusChangesHandler statusChangedHandler)
{
Events.AttemptingConnectionWithTransport(transport);
Client.DeviceClient deviceClient = Client.DeviceClient.CreateFromConnectionString(connectionString, transport);
Client.ModuleClient deviceClient = Client.ModuleClient.CreateFromConnectionString(connectionString, transport);
await OpenAsync(statusChangedHandler, deviceClient);
Events.ConnectedWithTransport(transport);
return deviceClient;
}

static async Task OpenAsync(ConnectionStatusChangesHandler statusChangedHandler, Client.DeviceClient deviceClient)
static async Task OpenAsync(ConnectionStatusChangesHandler statusChangedHandler, Client.ModuleClient deviceClient)
{
// note: it's important to set the status-changed handler and
// timeout value *before* we open a connection to the hub
Expand Down Expand Up @@ -157,8 +157,8 @@ public Task SetMethodHandlerAsync(string methodName, MethodCallback callback) =>

static class Events
{
static readonly ILogger Log = Logger.Factory.CreateLogger<DeviceClient>();
const int IdStart = AgentEventIds.DeviceClient;
static readonly ILogger Log = Logger.Factory.CreateLogger<ModuleClient>();
const int IdStart = AgentEventIds.ModuleClient;

enum EventIds
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Util;

public class DeviceClientProvider : IDeviceClientProvider
public class ModuleClientProvider : IModuleClientProvider
{
readonly string edgeAgentConnectionString;
readonly Option<UpstreamProtocol> upstreamProtocol;

public DeviceClientProvider(string edgeAgentConnectionString, Option<UpstreamProtocol> upstreamProtocol)
public ModuleClientProvider(string edgeAgentConnectionString, Option<UpstreamProtocol> upstreamProtocol)
{
this.edgeAgentConnectionString = Preconditions.CheckNonWhiteSpace(edgeAgentConnectionString, nameof(edgeAgentConnectionString));
this.upstreamProtocol = upstreamProtocol;
}

public Task<IDeviceClient> Create(
public Task<IModuleClient> Create(
ConnectionStatusChangesHandler statusChangedHandler,
Func<IDeviceClient, Task> initialize) =>
DeviceClient.Create(this.edgeAgentConnectionString, this.upstreamProtocol, statusChangedHandler, initialize);
Func<IModuleClient, Task> initialize) =>
ModuleClient.Create(this.edgeAgentConnectionString, this.upstreamProtocol, statusChangedHandler, initialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ protected override void Load(ContainerBuilder builder)
{
// IDeviceClientProvider
string edgeAgentConnectionString = $"{this.edgeDeviceConnectionString};{Constants.ModuleIdKey}={Constants.EdgeAgentModuleIdentityName}";
builder.Register(c => new DeviceClientProvider(edgeAgentConnectionString, this.upstreamProtocol))
.As<IDeviceClientProvider>()
builder.Register(c => new ModuleClientProvider(edgeAgentConnectionString, this.upstreamProtocol))
.As<IModuleClientProvider>()
.SingleInstance();

// IServiceClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public EdgeletModule(string iotHubHostname, string gatewayHostName, string devic
protected override void Load(ContainerBuilder builder)
{
// IDeviceClientProvider
builder.Register(c => new EnvironmentDeviceClientProvider(this.upstreamProtocol))
.As<IDeviceClientProvider>()
builder.Register(c => new EnvironmentModuleClientProvider(this.upstreamProtocol))
.As<IModuleClientProvider>()
.SingleInstance();

// IModuleManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override void Load(ContainerBuilder builder)
c =>
{
var serde = c.Resolve<ISerde<DeploymentConfig>>();
var deviceClientprovider = c.Resolve<IDeviceClientProvider>();
var deviceClientprovider = c.Resolve<IModuleClientProvider>();
IEdgeAgentConnection edgeAgentConnection = new EdgeAgentConnection(deviceClientprovider, serde);
return edgeAgentConnection;
})
Expand Down
Loading

0 comments on commit bf3954f

Please sign in to comment.