Skip to content

Commit

Permalink
Merged PR 912267: Change functions binding client to amqp
Browse files Browse the repository at this point in the history
Removed TransportType as an input and only use AMQP because it recovers when the connection is lost
Changed ModuleClientCache to have only one ModuleClient instance, multiple client connections can't be configured because it creates it uses CreateFromEnvironmentAsync.
  • Loading branch information
ancaantochi committed Jun 20, 2018
1 parent 6f21f5f commit f3e94ba
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding
/// </summary>
public class EdgeHubAsyncCollector : IAsyncCollector<Message>
{
readonly TransportType transportType;
readonly EdgeHubAttribute attribute;
readonly int batchSize;

Expand All @@ -35,11 +34,9 @@ public class EdgeHubAsyncCollector : IAsyncCollector<Message>
/// <summary>
/// Create a sender around the given client.
/// </summary>
/// <param name="transportType">Device Client transport type. </param>
/// <param name="attribute">Attributes used by EdgeHub when receiving a message from function.</param>
public EdgeHubAsyncCollector(TransportType transportType, EdgeHubAttribute attribute)
public EdgeHubAsyncCollector(EdgeHubAttribute attribute)
{
this.transportType = transportType;
this.attribute = attribute;
this.batchSize = attribute.BatchSize > 0 ? (attribute.BatchSize > MaxBatchSize ? MaxBatchSize : attribute.BatchSize) : DefaultBatchSize;
}
Expand Down Expand Up @@ -104,7 +101,7 @@ protected virtual async Task SendBatchAsync(IList<Message> batch)
return;
}

ModuleClient client = await ModuleClientCache.Instance.GetOrCreateAsync(this.transportType).ConfigureAwait(false);
ModuleClient client = await ModuleClientCache.Instance.GetOrCreateAsync().ConfigureAwait(false);

if (string.IsNullOrEmpty(this.attribute.OutputName))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding
class EdgeHubTriggerBindingProvider : ITriggerBindingProvider
{
readonly ConcurrentDictionary<string, IList<EdgeHubMessageProcessor>> receivers = new ConcurrentDictionary<string, IList<EdgeHubMessageProcessor>>();
readonly TransportType transportType;
ModuleClient moduleClient;

public EdgeHubTriggerBindingProvider(TransportType transportType)
{
this.transportType = transportType;
}

public async Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
if (context == null)
Expand All @@ -46,7 +40,7 @@ public async Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext
throw new InvalidOperationException($"Can't bind EdgeHubTriggerAttribute to type '{parameter.ParameterType}'.");
}

await TrySetEventDefaultHandlerAsync().ConfigureAwait(false);
await this.TrySetEventDefaultHandlerAsync().ConfigureAwait(false);

var messageProcessor = new EdgeHubMessageProcessor();
var triggerBinding = new EdgeHubTriggerBinding(context.Parameter, messageProcessor);
Expand Down Expand Up @@ -77,7 +71,7 @@ async Task TrySetEventDefaultHandlerAsync()
return;
}

this.moduleClient = await ModuleClientCache.Instance.GetOrCreateAsync(transportType).ConfigureAwait(false);
this.moduleClient = await ModuleClientCache.Instance.GetOrCreateAsync().ConfigureAwait(false);
await this.moduleClient.SetMessageHandlerAsync(this.FunctionsMessageHandler, null).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
{
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
using ExponentialBackoff = Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling.ExponentialBackoff;
using ExponentialBackoff = Util.TransientFaultHandling.ExponentialBackoff;

class ModuleClientCache
{
const int RetryCount = 5;
static readonly ITransientErrorDetectionStrategy TimeoutErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
static readonly RetryStrategy TransientRetryStrategy =
new ExponentialBackoff(RetryCount, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));
readonly ConcurrentDictionary<string, Task<ModuleClient>> clients = new ConcurrentDictionary<string, Task<ModuleClient>>();
readonly AsyncLock asyncLock = new AsyncLock();
ModuleClient client;

// Private constructor to ensure single instance
ModuleClientCache()
Expand All @@ -26,10 +26,11 @@ class ModuleClientCache

public static ModuleClientCache Instance { get; } = new ModuleClientCache();

public Task<ModuleClient> GetOrCreateAsync(TransportType transportType) =>
this.clients.GetOrAdd(
transportType.ToString(),
client =>
public async Task<ModuleClient> GetOrCreateAsync()
{
using (await this.asyncLock.LockAsync())
{
if (this.client == null)
{
var retryPolicy = new RetryPolicy(TimeoutErrorDetectionStrategy, TransientRetryStrategy);
retryPolicy.Retrying += (_, args) =>
Expand All @@ -40,16 +41,16 @@ public Task<ModuleClient> GetOrCreateAsync(TransportType transportType) =>
Console.WriteLine("Retrying...");
}
};
return retryPolicy.ExecuteAsync(() => this.CreateModuleClient(transportType));
});
this.client = await retryPolicy.ExecuteAsync(() => this.CreateModuleClient());
}

return this.client;
}
}

async Task<ModuleClient> CreateModuleClient(TransportType transportType)
async Task<ModuleClient> CreateModuleClient()
{
var mqttSetting = new MqttTransportSettings(transportType);

ITransportSettings[] settings = { mqttSetting };
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync(settings).ConfigureAwait(false);
ModuleClient moduleClient = await ModuleClient.CreateFromEnvironmentAsync().ConfigureAwait(false);

moduleClient.ProductInfo = "Microsoft.Azure.Devices.Edge.Functions.Binding";
return moduleClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
{
using System.Collections.Generic;
using System.ComponentModel;
using Microsoft.Azure.Devices.Client;

static class Utils
Expand All @@ -19,22 +18,5 @@ public static Message GetMessageCopy(byte[] payload, Message message)

return copy;
}

public static TransportType ToTransportType(string transportName, TransportType defaultTransport)
{
TransportType transportType;

if (transportName == null)
{
transportType = defaultTransport;
}
else
{
TypeConverter converter = TypeDescriptor.GetConverter(typeof(TransportType));
transportType = (TransportType)converter.ConvertFromInvariantString(transportName);
}

return transportType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,9 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings

class EdgeHubCollectorBuilder : IConverter<EdgeHubAttribute, IAsyncCollector<Message>>
{
readonly TransportType transportType;

public EdgeHubCollectorBuilder(TransportType transportType)
{
this.transportType = transportType;
}

public IAsyncCollector<Message> Convert(EdgeHubAttribute attribute)
{
return new EdgeHubAsyncCollector(this.transportType, attribute);
return new EdgeHubAsyncCollector(attribute);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Config
using System;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Triggers;
Expand All @@ -18,26 +17,20 @@ public class EdgeHubExtensionConfigProvider : IExtensionConfigProvider
{
public void Initialize(ExtensionConfigContext context)
{
const string ClientTransportType = "ClientTransportType";

if (context == null)
{
throw new ArgumentNullException(nameof(context));
}

var extensions = context.Config.GetService<IExtensionRegistry>();
var nameResolver = context.Config.GetService<INameResolver>();

TransportType transportType = Utils.ToTransportType(
nameResolver.Resolve(ClientTransportType), TransportType.Mqtt_Tcp_Only);

// register trigger binding provider
var triggerBindingProvider = new EdgeHubTriggerBindingProvider(transportType);
var triggerBindingProvider = new EdgeHubTriggerBindingProvider();
extensions.RegisterExtension<ITriggerBindingProvider>(triggerBindingProvider);

extensions.RegisterBindingRules<EdgeHubAttribute>();
FluentBindingRule<EdgeHubAttribute> rule = context.AddBindingRule<EdgeHubAttribute>();
rule.BindToCollector<Message>(typeof(EdgeHubCollectorBuilder), transportType);
rule.BindToCollector<Message>(typeof(EdgeHubCollectorBuilder));

context.AddConverter<Message, string>(this.MessageConverter);
context.AddConverter<string, Message>(this.ConvertToMessage);
Expand Down

0 comments on commit f3e94ba

Please sign in to comment.