Skip to content

Commit

Permalink
Edge Agent: Support feature flag ModuleUpdateMode (Azure#6508)
Browse files Browse the repository at this point in the history
  • Loading branch information
and-rewsmith authored Jul 21, 2022
1 parent bf1e589 commit 303b3fd
Show file tree
Hide file tree
Showing 33 changed files with 763 additions and 237 deletions.
3 changes: 2 additions & 1 deletion doc/BuiltInMetrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ instance_number | A Guid representing the current runtime. On restart, all metri
| `edgeAgent_total_time_expected_running_seconds` | `module_name` | The amount of time the module was specified in the deployment | Gauge |
| `edgeAgent_module_start_total` | `module_name`, `module_version` | Number of times edgeAgent asked docker to start the module. | Counter |
| `edgeAgent_module_stop_total` | `module_name`, `module_version` | Number of times edgeAgent asked docker to stop the module. | Counter |
| `edgeAgent_command_latency_seconds` | `command` | How long it took docker to execute the given command. Possible commands are: create, update, remove, start, stop, restart | Gauge |
| `edgeAgent_module_prepare_update_total` | `module_name`, `module_version` | Number of times edgeAgent asked docker to pull an image for the module. | Counter |
| `edgeAgent_command_latency_seconds` | `command` | How long it took docker to execute the given command. Possible commands are: create, prepareUpdate, update, remove, start, stop, restart | Gauge |
| `edgeAgent_iothub_syncs_total` | | The amount of times edgeAgent attempted to sync its twin with iotHub, both successful and unsuccessful. This incudes both agent requesting a twin and hub notifying of a twin update | Counter |
| `edgeAgent_unsuccessful_iothub_syncs_total` | | The amount of times edgeAgent failed to sync its twin with iotHub. | Counter |
| `edgeAgent_deployment_time_seconds` | | The amount of time it took to complete a new deployment after receiving a change. | Counter |
Expand Down
71 changes: 36 additions & 35 deletions doc/EnvironmentVariables.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public static class Constants

public const string EdgeModuleHubServerCertificateFileKey = "EdgeModuleHubServerCertificateFile";

public const string CheckImagePullBeforeModuleCreate = "CheckImagePullBeforeModuleCreate";

public const string Unknown = "Unknown";

public const string UpstreamProtocolKey = "UpstreamProtocol";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface ICommandFactory

Task<ICommand> CreateAsync(IModuleWithIdentity module, IRuntimeInfo runtimeInfo);

Task<ICommand> PrepareUpdateAsync(IModule module, IRuntimeInfo runtimeInfo);

Task<ICommand> UpdateAsync(IModule current, IModuleWithIdentity next, IRuntimeInfo runtimeInfo);

Task<ICommand> RemoveAsync(IModule module);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public LoggingCommandFactory(ICommandFactory underlying, ILoggerFactory loggerFa

public async Task<ICommand> CreateAsync(IModuleWithIdentity module, IRuntimeInfo runtimeInfo) => new LoggingCommand(await this.underlying.CreateAsync(module, runtimeInfo), "create", this.logger);

public async Task<ICommand> PrepareUpdateAsync(IModule module, IRuntimeInfo runtimeInfo) => new LoggingCommand(await this.underlying.PrepareUpdateAsync(module, runtimeInfo), "prepareUpdate", this.logger);

public async Task<ICommand> UpdateAsync(IModule current, IModuleWithIdentity next, IRuntimeInfo runtimeInfo) => new LoggingCommand(await this.underlying.UpdateAsync(current, next, runtimeInfo), "update", this.logger);

public async Task<ICommand> UpdateEdgeAgentAsync(IModuleWithIdentity module, IRuntimeInfo runtimeInfo) => new LoggingCommand(await this.underlying.UpdateEdgeAgentAsync(module, runtimeInfo), "update Edge Agent", this.logger);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core
{
public enum ModuleUpdateMode
{
NonBlocking,
WaitForAllPulls
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class NullCommandFactory : ICommandFactory

public Task<ICommand> CreateAsync(IModuleWithIdentity module, IRuntimeInfo runtimeInfo) => Task.FromResult<ICommand>(NullCommand.Instance);

public Task<ICommand> PrepareUpdateAsync(IModule module, IRuntimeInfo runtimeInfo) => Task.FromResult<ICommand>(NullCommand.Instance);

public Task<ICommand> UpdateAsync(IModule current, IModuleWithIdentity next, IRuntimeInfo runtimeInfo) => Task.FromResult<ICommand>(NullCommand.Instance);

public Task<ICommand> UpdateEdgeAgentAsync(IModuleWithIdentity module, IRuntimeInfo runtimeInfo) => Task.FromResult<ICommand>(NullCommand.Instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public async Task<ICommand> CreateAsync(IModuleWithIdentity module, IRuntimeInfo
}
}

public async Task<ICommand> PrepareUpdateAsync(IModule module, IRuntimeInfo runtimeInfo)
{
this.factoryMetrics.AddMessage(module, FactoryMetrics.ModuleCommandMetric.PrepareUpdate);
using (this.factoryMetrics.MeasureTime("prepareUpdate"))
{
return await this.underlying.PrepareUpdateAsync(module, runtimeInfo);
}
}

public async Task<ICommand> UpdateAsync(IModule current, IModuleWithIdentity next, IRuntimeInfo runtimeInfo)
{
this.factoryMetrics.AddMessage(current, FactoryMetrics.ModuleCommandMetric.Start);
Expand Down Expand Up @@ -105,7 +114,8 @@ public class FactoryMetrics
public enum ModuleCommandMetric
{
Start,
Stop
Stop,
PrepareUpdate
}

readonly Dictionary<ModuleCommandMetric, IMetricsCounter> commandCounters;
Expand All @@ -116,6 +126,11 @@ public FactoryMetrics(IMetricsProvider metricsProvider)
this.commandCounters = Enum.GetValues(typeof(ModuleCommandMetric)).Cast<ModuleCommandMetric>().ToDictionary(c => c, command =>
{
string commandName = Enum.GetName(typeof(ModuleCommandMetric), command).ToLower();
if (commandName == ModuleCommandMetric.PrepareUpdate.ToString().ToLower())
{
commandName = "prepare_update";
}

return metricsProvider.CreateCounter(
$"module_{commandName}",
"Command sent to module",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Planners
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Planner
{
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -62,7 +62,7 @@ public async Task<Plan> CreateShutdownPlanAsync(ModuleSet current)
ICommand[] stopCommands = await Task.WhenAll(stopTasks);
ICommand parallelCommand = new ParallelGroupCommand(stopCommands);
Events.ShutdownPlanCreated(stopCommands);
return new Plan(new[] { parallelCommand });
return new Plan(ImmutableList.Create(parallelCommand));
}

public async Task<Plan> PlanAsync(
Expand All @@ -74,7 +74,8 @@ public async Task<Plan> PlanAsync(
Events.LogDesired(desired);
Events.LogCurrent(current);

List<ICommand> commands = new List<ICommand>();
List<ICommand> plan = new List<ICommand>();
List<ICommand> upfrontImagePullPlan = new List<ICommand>();

// Create a grouping of desired and current modules based on their priority.
// We want to process all the modules in the deployment (desired modules) and also include the modules
Expand Down Expand Up @@ -103,14 +104,20 @@ public async Task<Plan> PlanAsync(
.ToArray());
processedDesiredMatchingCurrentModules.UnionWith(desiredMatchingCurrentModules.Select(x => x.Key));

commands.AddRange(await this.ProcessDesiredAndCurrentSets(priorityBasedDesiredSet, priorityBasedCurrentSet, runtimeInfo, moduleIdentities));
(IEnumerable<ICommand> upfrontImagePullCommands, IEnumerable<ICommand> commands) = await this.ProcessDesiredAndCurrentSets(priorityBasedDesiredSet, priorityBasedCurrentSet, runtimeInfo, moduleIdentities);
upfrontImagePullPlan.AddRange(upfrontImagePullCommands);
plan.AddRange(commands);
}

Events.PlanCreated(commands);
return new Plan(commands);
// If we need to premept the constructured plan with image pull
// commands, do so here.
plan = upfrontImagePullPlan.Concat(plan).ToList();

Events.PlanCreated(plan);
return new Plan(plan.ToImmutableList());
}

async Task<IEnumerable<ICommand>> ProcessDesiredAndCurrentSets(
async Task<(IEnumerable<ICommand> upfrontImagePullCommands, IEnumerable<ICommand> commands)> ProcessDesiredAndCurrentSets(
ModuleSet desired,
ModuleSet current,
IRuntimeInfo runtimeInfo,
Expand All @@ -119,7 +126,7 @@ async Task<IEnumerable<ICommand>> ProcessDesiredAndCurrentSets(
// extract list of modules that need attention
(IList<IModule> added, IList<IModule> updateDeployed, IList<IModule> desiredStatusChanged, IList<IRuntimeModule> updateStateChanged, IList<IRuntimeModule> removed, IList<IRuntimeModule> deadModules, IList<IRuntimeModule> runningGreat) = this.ProcessDiff(desired, current);

List<ICommand> updateRuntimeCommands = await this.GetUpdateRuntimeCommands(updateDeployed, moduleIdentities, runtimeInfo);
IEnumerable<ICommand> updateRuntimeCommands = await this.GetUpdateRuntimeCommands(updateDeployed, moduleIdentities, runtimeInfo);

// create "stop" commands for modules that have been removed
IEnumerable<Task<ICommand>> stopTasks = removed
Expand All @@ -135,21 +142,20 @@ async Task<IEnumerable<ICommand>> ProcessDesiredAndCurrentSets(
IEnumerable<ICommand> dead = await Task.WhenAll(deadTasks);

// create pull, create, update and start commands for added/updated modules
IEnumerable<ICommand> addedCommands = await this.ProcessAddedUpdatedModules(
(IEnumerable<ICommand> upfrontPullCommandsForAdded, IEnumerable<ICommand> addedCommands) = await this.ProcessAddedUpdatedModules(
added,
moduleIdentities,
runtimeInfo,
m => this.commandFactory.CreateAsync(m, runtimeInfo));

IEnumerable<ICommand> updatedCommands = await this.ProcessAddedUpdatedModules(
(IEnumerable<ICommand> upfrontPullCommandsForUpdated, IEnumerable<ICommand> updatedCommands) = await this.ProcessAddedUpdatedModules(
updateDeployed,
moduleIdentities,
runtimeInfo,
m =>
{
current.TryGetModule(m.Module.Name, out IModule currentModule);
return this.commandFactory.UpdateAsync(
currentModule,
m,
runtimeInfo);
return this.commandFactory.UpdateAsync(currentModule, m, runtimeInfo);
});

// Get commands to start / stop modules whose desired status has changed.
Expand All @@ -172,7 +178,8 @@ async Task<IEnumerable<ICommand>> ProcessDesiredAndCurrentSets(
// for more than "IntensiveCareTime" & still have an entry for them in the store
IEnumerable<ICommand> resetHealthStatus = await this.ResetStatsForHealthyModulesAsync(runningGreat);

return updateRuntimeCommands
IEnumerable<ICommand> upfrontPullCommands = upfrontPullCommandsForAdded.Concat(upfrontPullCommandsForUpdated);
updateRuntimeCommands = updateRuntimeCommands
.Concat(stop)
.Concat(remove)
.Concat(dead)
Expand All @@ -182,6 +189,8 @@ async Task<IEnumerable<ICommand>> ProcessDesiredAndCurrentSets(
.Concat(stateChangedCommands)
.Concat(desiredStatedChangedCommands.Select(d => d.command))
.Concat(resetHealthStatus);

return (upfrontPullCommands, updateRuntimeCommands);
}

async Task<IList<(ICommand command, string module)>> ProcessDesiredStatusChangedModules(IList<IModule> desiredStatusChanged, ModuleSet current)
Expand Down Expand Up @@ -265,40 +274,54 @@ await this.commandFactory.StartAsync(module),
return restart;
}

async Task<IEnumerable<ICommand>> ProcessAddedUpdatedModules(
async Task<(IEnumerable<ICommand> upfrontPullCommands, IEnumerable<ICommand> otherCommands)> ProcessAddedUpdatedModules(
IList<IModule> modules,
IImmutableDictionary<string, IModuleIdentity> moduleIdentities,
IRuntimeInfo runtimeInfo,
Func<IModuleWithIdentity, Task<ICommand>> createUpdateCommandMaker)
{
// new modules become a command group containing:
// create followed by a start command if the desired
// status is "running"
var addedTasks = new List<Task<ICommand[]>>();
var upfrontPullTasks = new List<Task<ICommand>>();
var nonPullGroupTasks = new List<Task<ICommand[]>>();
foreach (IModule module in modules)
{
if (moduleIdentities.TryGetValue(module.Name, out IModuleIdentity moduleIdentity))
{
var tasks = new List<Task<ICommand>>();
var nonPullTasks = new List<Task<ICommand>>();
var moduleWithIdentity = new ModuleWithIdentity(module, moduleIdentity);
tasks.Add(createUpdateCommandMaker(moduleWithIdentity));

Task<ICommand> prepareForUpdateCommand = this.commandFactory.PrepareUpdateAsync(module, runtimeInfo);
Task<ICommand> createOrUpdateCommand = createUpdateCommandMaker(moduleWithIdentity);

upfrontPullTasks.Add(prepareForUpdateCommand);
nonPullTasks.Add(createOrUpdateCommand);

if (module.DesiredStatus == ModuleStatus.Running)
{
tasks.Add(this.commandFactory.StartAsync(module));
nonPullTasks.Add(this.commandFactory.StartAsync(module));
}

addedTasks.Add(Task.WhenAll(tasks));
nonPullGroupTasks.Add(Task.WhenAll(nonPullTasks));
}
else
{
Events.UnableToProcessModule(module);
}
}

// Filter out any commands that won't take some action. Necessary
// because depending on `ModuleUpdateMode` feature flag, these
// pulls may not be separated out and will instead be grouped with
// create/update commands. In that case they will be null here.
IEnumerable<ICommand> upfrontPullCommands = (await Task.WhenAll(upfrontPullTasks)).Where(t =>
{
return !(t is NullCommand);
});

// build GroupCommands from each command set
IEnumerable<Task<ICommand>> commands = (await Task.WhenAll(addedTasks))
IEnumerable<Task<ICommand>> otherCommands = (await Task.WhenAll(nonPullGroupTasks))
.Select(cmds => this.commandFactory.WrapAsync(new GroupCommand(cmds)));

return await Task.WhenAll(commands);
return (upfrontPullCommands, await Task.WhenAll(otherCommands));
}

async Task<IEnumerable<ICommand>> ResetStatsForHealthyModulesAsync(IEnumerable<IRuntimeModule> modules)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.PlanRunner
{
using System;

public class ExcecutionPrerequisiteException : Exception
{
public ExcecutionPrerequisiteException()
{
}

public ExcecutionPrerequisiteException(string message)
: base(message)
{
}

public ExcecutionPrerequisiteException(string message, Exception inner)
: base(message, inner)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.PlanRunners
namespace Microsoft.Azure.Devices.Edge.Agent.Core.PlanRunner
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
Expand Down Expand Up @@ -70,11 +71,18 @@ public async Task<bool> ExecuteAsync(long deploymentId, Plan plan, CancellationT
foreach (ICommand command in plan.Commands)
{
(bool shouldRun, int runCount, TimeSpan coolOffPeriod, TimeSpan elapsedTime) = this.ShouldRunCommand(command);
if (this.ShouldSkipRemaining(shouldRun, command))
{
Events.SkipRemainingCommands(deploymentId, command);
break;
}

try
{
if (token.IsCancellationRequested)
{
Events.PlanExecCancelled(deploymentId);
skippedModules = true;
break;
}

Expand Down Expand Up @@ -108,6 +116,12 @@ public async Task<bool> ExecuteAsync(long deploymentId, Plan plan, CancellationT
// since this command failed, record its status
int newRunCount = this.commandRunStatus.ContainsKey(command.Id) ? this.commandRunStatus[command.Id].RunCount : 0;
this.commandRunStatus[command.Id] = new CommandRunStats(newRunCount + 1, this.systemTime.UtcNow, ex);

if (ex is ExcecutionPrerequisiteException)
{
Events.StopProcessingCommands(deploymentId, command);
break;
}
}
}

Expand All @@ -117,6 +131,21 @@ public async Task<bool> ExecuteAsync(long deploymentId, Plan plan, CancellationT
}
}

bool ShouldSkipRemaining(bool shouldRun, ICommand command)
{
bool didCommandFailWithPrereqException = this.commandRunStatus.ContainsKey(command.Id) && this.commandRunStatus[command.Id].Exception.Match(
e =>
{
return e is ExcecutionPrerequisiteException;
},
() =>
{
return false;
});

return !shouldRun && didCommandFailWithPrereqException;
}

(bool shouldRun, int runCount, TimeSpan coolOffPeriod, TimeSpan elapsedTime) ShouldRunCommand(ICommand command)
{
// the command should be run if there's no entry for it in our status dictionary
Expand Down Expand Up @@ -220,6 +249,20 @@ public static void PlanExecEnded(long deploymentId)
{
Log.LogInformation((int)EventIds.PlanExecEnded, $"Plan execution ended for deployment {deploymentId}");
}

public static void StopProcessingCommands(long deploymentId, ICommand command)
{
Log.LogError(
(int)EventIds.PlanExecStepFailed,
$"Step failed in deployment {deploymentId}. Failure when running command {command.Show()}. Skipping remaining commands in deployment.");
}

public static void SkipRemainingCommands(long deploymentId, ICommand command)
{
Log.LogError(
(int)EventIds.PlanExecStepFailed,
$"Step previously failed in deployment {deploymentId} on prior attempt. Not running command {command.Show()}. Skipping remaining commands in deployment.");
}
}
}
}
Loading

0 comments on commit 303b3fd

Please sign in to comment.