Skip to content

Commit

Permalink
Hotfix/session reconnect (#531)
Browse files Browse the repository at this point in the history
* refactored session handling in publisher
* session manager is now responsible for maintaining the sessions and subscriptions registered
* delay opc configuration generation after module start-up
* aligned the names for dataset writer group with session name
* aligned dataset writer id with subscription's name
* added waitfordebugger argument in publisher for consistency with the other modules
* cleaned up the logs
* added back diagnostics for not connected sessions
* regression fix

Co-authored-by: Luis Cantero <[email protected]>
Co-authored-by: Marc Schier <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2020
1 parent 77c8424 commit 9f57f9e
Show file tree
Hide file tree
Showing 16 changed files with 876 additions and 490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private async Task EnsureWorkersAsync() {
}
await Task.WhenAll(workerStartTasks);
// the configuration might have been changed by workers execution
var newWorkers = _agentConfigProvider.Config?.MaxWorkers;
var newWorkers = _agentConfigProvider.Config?.MaxWorkers ?? kDefaultWorkers;
if (workers >= newWorkers) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Microsoft.Azure.IIoT.OpcUa.Edge.Publisher.Engine {
using System.Threading.Tasks.Dataflow;
using Prometheus;
using System.Text;
using Serilog.Events;

/// <summary>
/// Dataflow engine
Expand Down Expand Up @@ -63,6 +62,7 @@ public DataFlowProcessingEngine(IMessageTrigger messageTrigger, IMessageEncoder

/// <inheritdoc/>
public void Dispose() {
_logger.Debug("Disposing {name}", Name);
_messageTrigger.OnMessage -= MessageTriggerMessageReceived;
_diagnosticsOutputTimer?.Dispose();
_batchTriggerIntervalTimer?.Dispose();
Expand All @@ -83,13 +83,13 @@ public async Task RunAsync(ProcessMode processMode, CancellationToken cancellati
return;
}
IsRunning = true;
if (_config.DiagnosticsInterval.HasValue && _config.DiagnosticsInterval > TimeSpan.Zero){
if (_config.DiagnosticsInterval.HasValue && _config.DiagnosticsInterval > TimeSpan.Zero) {
_diagnosticsOutputTimer = new Timer(DiagnosticsOutputTimer_Elapsed, null,
_config.DiagnosticsInterval.Value,
_config.DiagnosticsInterval.Value);
}

if (_config.BatchTriggerInterval.HasValue && _config.BatchTriggerInterval > TimeSpan.Zero){
if (_config.BatchTriggerInterval.HasValue && _config.BatchTriggerInterval > TimeSpan.Zero) {
_batchTriggerIntervalTimer = new Timer(BatchTriggerIntervalTimer_Elapsed, null,
_config.BatchTriggerInterval.Value,
_config.BatchTriggerInterval.Value);
Expand Down Expand Up @@ -151,52 +151,47 @@ public Task SwitchProcessMode(ProcessMode processMode, DateTime? timestamp) {
/// </summary>
/// <param name="state"></param>
private void DiagnosticsOutputTimer_Elapsed(object state) {
var totalDuration = (DateTime.UtcNow - _diagnosticStart).TotalSeconds;
var totalDuration = _diagnosticStart != DateTime.MinValue ? (DateTime.UtcNow - _diagnosticStart).TotalSeconds : 0;
_logger.Debug("Identity {deviceId}; {moduleId}", _identity.DeviceId, _identity.ModuleId);

if (_messageTrigger.DataChangesCount > 0 || _messageTrigger.ValueChangesCount > 0 || _messageSink.SentMessagesCount > 0) {
var diagInfo = new StringBuilder();
diagInfo.Append("\n DIAGNOSTICS INFORMATION for : {host}\n");
diagInfo.Append(" # Ingestion duration : {duration,14:dd\\:hh\\:mm\\:ss} (dd:hh:mm:ss)\n");
string dataChangesAverage = _messageTrigger.DataChangesCount > 0 && totalDuration > 0 ? $" ({_messageTrigger.DataChangesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Ingress DataChanges (from OPC) : {dataChangesCount,14:0}{dataChangesAverage}\n");
string valueChangesAverage = _messageTrigger.ValueChangesCount > 0 && totalDuration > 0 ? $" ({_messageTrigger.ValueChangesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Ingress ValueChanges (from OPC) : {valueChangesCount,14:0}{valueChangesAverage}\n");

diagInfo.Append(" # Ingress BatchBlock buffer size : {batchDataSetMessageBlockOutputCount,14:0}\n");
diagInfo.Append(" # Encoding Block input/output size : {encodingBlockInputCount,14:0} | {encodingBlockOutputCount:0}\n");
diagInfo.Append(" # Encoder Notifications processed : {notificationsProcessedCount,14:0}\n");
diagInfo.Append(" # Encoder Notifications dropped : {notificationsDroppedCount,14:0}\n");
diagInfo.Append(" # Encoder IoT Messages processed : {messagesProcessedCount,14:0}\n");
diagInfo.Append(" # Encoder avg Notifications/Message: {notificationsPerMessage,14:0}\n");
diagInfo.Append(" # Encoder avg IoT Message body size: {messageSizeAverage,14:0}\n");
diagInfo.Append(" # Outgress Batch Block buffer size : {batchNetworkMessageBlockOutputCount,14:0}\n");
diagInfo.Append(" # Outgress input buffer count : {sinkBlockInputCount,14:0}\n");

string sentMessagesAverage = _messageSink.SentMessagesCount > 0 && totalDuration > 0 ? $" ({_messageSink.SentMessagesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Outgress IoT message count : {messageSinkSentMessagesCount,14:0}{sentMessagesAverage}\n");

if (_messageTrigger.NumberOfConnectionRetries > 0) {
diagInfo.Append(" # Connection retries : {connectionRetries,14:0}\n");
}

_logger.Information(diagInfo.ToString(),
Utils.LoggingHelper.ExtractHost(Name),
TimeSpan.FromSeconds(totalDuration),
_messageTrigger.DataChangesCount, dataChangesAverage,
_messageTrigger.ValueChangesCount, valueChangesAverage,
_batchDataSetMessageBlock.OutputCount,
_encodingBlock.InputCount, _encodingBlock.OutputCount,
_messageEncoder.NotificationsProcessedCount,
_messageEncoder.NotificationsDroppedCount,
_messageEncoder.MessagesProcessedCount,
_messageEncoder.AvgNotificationsPerMessage,
_messageEncoder.AvgMessageSize,
_batchNetworkMessageBlock.OutputCount,
_sinkBlock.InputCount,
_messageSink.SentMessagesCount, sentMessagesAverage,
_messageTrigger.NumberOfConnectionRetries);
}
var diagInfo = new StringBuilder();
diagInfo.Append("\n DIAGNOSTICS INFORMATION for : {host}\n");
diagInfo.Append(" # Ingestion duration : {duration,14:dd\\:hh\\:mm\\:ss} (dd:hh:mm:ss)\n");
string dataChangesAverage = _messageTrigger.DataChangesCount > 0 && totalDuration > 0 ? $" ({_messageTrigger.DataChangesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Ingress DataChanges (from OPC) : {dataChangesCount,14:0}{dataChangesAverage}\n");
string valueChangesAverage = _messageTrigger.ValueChangesCount > 0 && totalDuration > 0 ? $" ({_messageTrigger.ValueChangesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Ingress ValueChanges (from OPC) : {valueChangesCount,14:0}{valueChangesAverage}\n");

diagInfo.Append(" # Ingress BatchBlock buffer size : {batchDataSetMessageBlockOutputCount,14:0}\n");
diagInfo.Append(" # Encoding Block input/output size : {encodingBlockInputCount,14:0} | {encodingBlockOutputCount:0}\n");
diagInfo.Append(" # Encoder Notifications processed : {notificationsProcessedCount,14:0}\n");
diagInfo.Append(" # Encoder Notifications dropped : {notificationsDroppedCount,14:0}\n");
diagInfo.Append(" # Encoder IoT Messages processed : {messagesProcessedCount,14:0}\n");
diagInfo.Append(" # Encoder avg Notifications/Message: {notificationsPerMessage,14:0}\n");
diagInfo.Append(" # Encoder avg IoT Message body size: {messageSizeAverage,14:0}\n");
diagInfo.Append(" # Outgress Batch Block buffer size : {batchNetworkMessageBlockOutputCount,14:0}\n");
diagInfo.Append(" # Outgress input buffer count : {sinkBlockInputCount,14:0}\n");

string sentMessagesAverage = _messageSink.SentMessagesCount > 0 && totalDuration > 0 ? $" ({_messageSink.SentMessagesCount / totalDuration:0.##}/s)" : "";
diagInfo.Append(" # Outgress IoT message count : {messageSinkSentMessagesCount,14:0}{sentMessagesAverage}\n");
diagInfo.Append(" # Connection retries : {connectionRetries,14:0}\n");

_logger.Information(diagInfo.ToString(),
Name,
TimeSpan.FromSeconds(totalDuration),
_messageTrigger.DataChangesCount, dataChangesAverage,
_messageTrigger.ValueChangesCount, valueChangesAverage,
_batchDataSetMessageBlock.OutputCount,
_encodingBlock.InputCount, _encodingBlock.OutputCount,
_messageEncoder.NotificationsProcessedCount,
_messageEncoder.NotificationsDroppedCount,
_messageEncoder.MessagesProcessedCount,
_messageEncoder.AvgNotificationsPerMessage,
_messageEncoder.AvgMessageSize,
_batchNetworkMessageBlock.OutputCount,
_sinkBlock.InputCount,
_messageSink.SentMessagesCount, sentMessagesAverage,
_messageTrigger.NumberOfConnectionRetries);

kDataChangesCount.WithLabels(_identity.DeviceId ?? "",
_identity.ModuleId ?? "", Name).Set(_messageTrigger.DataChangesCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ public class WriterGroupMessageTrigger : IMessageTrigger, IDisposable {
public string Id => _writerGroup.WriterGroupId;

/// <inheritdoc/>
public long NumberOfConnectionRetries => _subscriptions.Where(sc => sc.Subscription != null)
.Select(sc => sc.Subscription).Sum(sc => sc.NumberOfConnectionRetries);
public int NumberOfConnectionRetries => null != _subscriptions.FirstOrDefault()?.
Subscription?.NumberOfConnectionRetries ? _subscriptions.FirstOrDefault().
Subscription.NumberOfConnectionRetries : 0;

/// <inheritdoc/>
public long ValueChangesCount { get; private set; } = 0;
public int ValueChangesCount { get; private set; } = 0;

/// <inheritdoc/>
public long DataChangesCount { get; private set; } = 0;
public int DataChangesCount { get; private set; } = 0;

/// <inheritdoc/>
public event EventHandler<DataSetMessageModel> OnMessage;
Expand Down Expand Up @@ -62,17 +63,18 @@ public WriterGroupMessageTrigger(IWriterGroupConfig writerGroupConfig,
/// <inheritdoc/>
public async Task RunAsync(CancellationToken ct) {

_subscriptions.ForEach(sc => sc.OpenAsync().ConfigureAwait(false));
_subscriptions.ForEach(sc => sc.ActivateAsync(ct).ConfigureAwait(false));
await Task.Delay(-1, ct); // TODO - add managemnt of subscriptions, etc.
_subscriptions.ForEach(sc => sc.DeactivateAsync().ConfigureAwait(false));
_subscriptions.ForEach(sc => sc.OpenAsync().Wait());
_subscriptions.ForEach(sc => sc.ActivateAsync(ct).Wait());
try {
await Task.Delay(-1, ct);
}
finally {
_subscriptions.ForEach(sc => sc.DeactivateAsync().Wait());
}
}

/// <summary>
///
/// </summary>
/// <inheritdoc/>
public void Dispose() {
_subscriptions.ForEach(sc => sc.DeactivateAsync().ConfigureAwait(false));
_subscriptions.ForEach(sc => sc.Dispose());
_subscriptions.Clear();
}
Expand Down Expand Up @@ -134,8 +136,8 @@ public async Task OpenAsync() {
var sc = await _outer._subscriptionManager.GetOrCreateSubscriptionAsync(
_subscriptionInfo);
sc.OnSubscriptionChange += OnSubscriptionChangedAsync;
await sc.ApplyAsync(_subscriptionInfo.MonitoredItems,
_subscriptionInfo.Configuration, false);
await sc.ApplyAsync(_subscriptionInfo.MonitoredItems,
_subscriptionInfo.Configuration);
Subscription = sc;
}

Expand All @@ -149,9 +151,11 @@ public async Task ActivateAsync(CancellationToken ct) {
_outer._logger.Warning("Subscription not registered");
return;
}

await Subscription.ApplyAsync(_subscriptionInfo.MonitoredItems,
_subscriptionInfo.Configuration, true);
// only try to activate if already enabled. Otherwise the activation
// will be handled by the session's keep alive mechanism
if (Subscription.Enabled) {
await Subscription.ActivateAsync(null);
}

if (_keyframeTimer != null) {
ct.Register(() => _keyframeTimer.Stop());
Expand All @@ -175,8 +179,7 @@ public async Task DeactivateAsync() {
return;
}

await Subscription.ApplyAsync(_subscriptionInfo.MonitoredItems,
_subscriptionInfo.Configuration, false);
await Subscription.CloseAsync();

if (_keyframeTimer != null) {
_keyframeTimer.Stop();
Expand All @@ -192,7 +195,6 @@ await Subscription.ApplyAsync(_subscriptionInfo.MonitoredItems,
public void Dispose() {
if (Subscription != null) {
Subscription.OnSubscriptionChange -= OnSubscriptionChangedAsync;
Subscription.ApplyAsync(null,_subscriptionInfo.Configuration, false);
Subscription.Dispose();
}
_keyframeTimer?.Dispose();
Expand Down Expand Up @@ -311,6 +313,6 @@ private void CallMessageReceiverDelegates(object sender, uint sequenceNumber,
private readonly List<DataSetWriterSubscription> _subscriptions;
private readonly WriterGroupModel _writerGroup;
private readonly ISubscriptionManager _subscriptionManager;
private const long kNumberOfInvokedMessagesResetThreshold = long.MaxValue - 10000;
private const int kNumberOfInvokedMessagesResetThreshold = int.MaxValue - 10000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ public interface IMessageTrigger : IDisposable {
/// <summary>
/// Number of retries
/// </summary>
long NumberOfConnectionRetries { get; }
int NumberOfConnectionRetries { get; }

/// <summary>
/// The number of all monitored items value changes
/// that have been invoked by this message source.
/// </summary>
long ValueChangesCount { get; }
int ValueChangesCount { get; }

/// <summary>
/// The number of all dataChange Notifications
/// that have been invoked by this message source.
/// </summary>
long DataChangesCount { get; }
int DataChangesCount { get; }

/// <summary>
/// Writer events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace Microsoft.Azure.IIoT.OpcUa.Edge.Publisher.Models {
using Microsoft.Azure.IIoT.OpcUa.Publisher.Models;
using Microsoft.Azure.IIoT.OpcUa.Publisher;
using Microsoft.Azure.IIoT.OpcUa.Protocol.Models;
using Microsoft.Azure.IIoT.OpcUa.Core.Models;
using Microsoft.Azure.IIoT.Crypto;
using Microsoft.Azure.IIoT.Serializers;
Expand Down Expand Up @@ -135,7 +136,7 @@ private IEnumerable<WriterGroupJobModel> ToWriterGroupJobs(
WriterGroup = new WriterGroupModel {
MessageType = legacyCliModel.MessageEncoding,
WriterGroupId = $"{dataSetSourceBatches.First().Connection.Endpoint.Url}_" +
$"{dataSetSourceBatches.First().GetHashSafe()}",
$"{new ConnectionIdentifier(dataSetSourceBatches.First().Connection)}",
DataSetWriters = dataSetSourceBatches.Select(dataSetSource => new DataSetWriterModel {
DataSetWriterId = $"{dataSetSource.Connection.Endpoint.Url}_" +
$"{dataSetSource.GetHashSafe()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ namespace Microsoft.Azure.IIoT.OpcUa.Protocol {
/// </summary>
public interface IClientHost {

/// <summary>
/// initializes the client configuration
/// </summary>
/// <returns></returns>
Task InitializeAzync();

/// <summary>
/// Add certificate to trust list
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

namespace Microsoft.Azure.IIoT.OpcUa.Protocol {
using Microsoft.Azure.IIoT.OpcUa.Core.Models;
using Microsoft.Azure.IIoT.OpcUa.Protocol.Models;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;

/// <summary>
Expand All @@ -19,23 +19,41 @@ public interface ISessionManager {
/// </summary>
int SessionCount { get; }

/// <summary>
/// gets the number of retiries for a speciffic session
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
int GetNumberOfConnectionRetries(ConnectionModel connection);

/// <summary>
/// Get or create session for subscription
/// </summary>
/// <param name="connection"></param>
/// <param name="createIfNotExists"></param>
/// <param name="statusCode"></param>
/// <returns></returns>
Task<Session> GetOrCreateSessionAsync(ConnectionModel connection,
bool createIfNotExists, uint statusCode = StatusCodes.Good);
Session GetOrCreateSession(ConnectionModel connection, bool createIfNotExists);

/// <summary>
/// Remove session if empty
/// </summary>
/// <param name="connection"></param>
/// <param name="onlyIfEmpty"></param>
/// <returns></returns>
Task RemoveSessionAsync(ConnectionModel connection,
bool onlyIfEmpty = true);
Task RemoveSessionAsync(ConnectionModel connection, bool onlyIfEmpty = true);

/// <summary>
/// Get or create a subscription
/// </summary>
/// <param name="subscription"></param>
/// <returns></returns>
void RegisterSubscription(ISubscription subscription);

/// <summary>
/// Removes a subscription
/// </summary>
/// <param name="subscription"></param>
/// <returns></returns>
void UnregisterSubscription(ISubscription subscription);
}
}
Loading

0 comments on commit 9f57f9e

Please sign in to comment.