Skip to content

Commit

Permalink
DurableTask.AzureStorage updates for v1.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored Jan 11, 2019
2 parents e5a713d + 62eec8e commit 8414f6a
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 57 deletions.
49 changes: 49 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,27 @@ public async Task ExtendedSessions_SessionTimeout()
}
}

/// <summary>
/// Tests an orchestration that does two consecutive fan-out, fan-ins.
/// This is a regression test for https://github.com/Azure/durabletask/issues/241.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task DoubleFanOut(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
{
await host.StartAsync();

var client = await host.StartOrchestrationAsync(typeof(Orchestrations.DoubleFanOut), null);
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
await host.StopAsync();
}
}

private static async Task ValidateBlobUrlAsync(string taskHubName, string instanceId, string value, int originalPayloadSize = 0)
{
CloudStorageAccount account = CloudStorageAccount.Parse(TestHelpers.GetTestStorageAccountConnectionString());
Expand Down Expand Up @@ -2136,6 +2157,34 @@ public override Task<int> RunTask(OrchestrationContext context, int input)
return context.CreateSubOrchestrationInstance<int>(typeof(Factorial), input);
}
}

[KnownType(typeof(Activities.Hello))]
internal class DoubleFanOut : TaskOrchestration<string, string>
{
public async override Task<string> RunTask(OrchestrationContext context, string input)
{
Random r = new Random();
var tasks = new Task<string>[5];
for (int i = 0; i < 5; i++)
{
int x = r.Next(10000);
tasks[i] = context.ScheduleTask<string>(typeof(Activities.Hello), i.ToString());
}

await Task.WhenAll(tasks);

var tasks2 = new Task<string>[5];
for (int i = 0; i < 5; i++)
{
int x = r.Next(10000);
tasks2[i] = context.ScheduleTask<string>(typeof(Activities.Hello), (i + 10).ToString());
}

await Task.WhenAll(tasks2);

return "OK";
}
}
}

static class Activities
Expand Down
51 changes: 41 additions & 10 deletions src/DurableTask.AzureStorage/AnalyticsEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ private static void EnsureLogicalTraceActivityId()
#endif
}

[Event(101, Level = EventLevel.Informational, Opcode = EventOpcode.Send, Task = Tasks.Enqueue, Version = 4)]
[Event(101, Level = EventLevel.Informational, Opcode = EventOpcode.Send, Task = Tasks.Enqueue, Version = 5)]
public void SendingMessage(
Guid relatedActivityId,
string Account,
string TaskHub,
string EventType,
int TaskEventId,
string InstanceId,
string ExecutionId,
long SizeInBytes,
Expand All @@ -98,6 +99,7 @@ public void SendingMessage(
Account,
TaskHub,
EventType,
TaskEventId,
InstanceId ?? string.Empty,
ExecutionId ?? string.Empty,
SizeInBytes,
Expand All @@ -109,12 +111,13 @@ public void SendingMessage(
ExtensionVersion);
}

[Event(102, Level = EventLevel.Informational, Opcode = EventOpcode.Receive, Task = Tasks.Dequeue, Version = 4)]
[Event(102, Level = EventLevel.Informational, Opcode = EventOpcode.Receive, Task = Tasks.Dequeue, Version = 5)]
public void ReceivedMessage(
Guid relatedActivityId,
string Account,
string TaskHub,
string EventType,
int TaskEventId,
string InstanceId,
string ExecutionId,
string MessageId,
Expand All @@ -134,6 +137,7 @@ public void ReceivedMessage(
Account,
TaskHub,
EventType,
TaskEventId,
InstanceId,
ExecutionId ?? string.Empty,
MessageId,
Expand All @@ -147,11 +151,12 @@ public void ReceivedMessage(
ExtensionVersion);
}

[Event(103, Level = EventLevel.Informational, Version = 3)]
[Event(103, Level = EventLevel.Informational, Version = 4)]
public void DeletingMessage(
string Account,
string TaskHub,
string EventType,
int TaskEventId,
string MessageId,
string InstanceId,
string ExecutionId,
Expand All @@ -165,6 +170,7 @@ public void DeletingMessage(
Account,
TaskHub,
EventType,
TaskEventId,
MessageId,
InstanceId,
ExecutionId ?? string.Empty,
Expand All @@ -173,11 +179,12 @@ public void DeletingMessage(
ExtensionVersion);
}

[Event(104, Level = EventLevel.Warning, Version = 4)]
[Event(104, Level = EventLevel.Warning, Version = 5)]
public void AbandoningMessage(
string Account,
string TaskHub,
string EventType,
int TaskEventId,
string MessageId,
string InstanceId,
string ExecutionId,
Expand All @@ -192,6 +199,7 @@ public void AbandoningMessage(
Account,
TaskHub,
EventType,
TaskEventId,
MessageId,
InstanceId,
ExecutionId ?? string.Empty,
Expand All @@ -212,14 +220,16 @@ public void AssertFailure(
this.WriteEvent(105, Account, TaskHub, Details, ExtensionVersion);
}

[Event(106, Level = EventLevel.Warning, Version = 2)]
[Event(106, Level = EventLevel.Warning, Version = 3)]
public void MessageGone(
string Account,
string TaskHub,
string MessageId,
string InstanceId,
string ExecutionId,
string PartitionId,
string EventType,
int TaskEventId,
string Details,
string ExtensionVersion)
{
Expand All @@ -232,6 +242,8 @@ public void MessageGone(
InstanceId,
ExecutionId ?? string.Empty,
PartitionId,
EventType,
TaskEventId,
Details,
ExtensionVersion);
}
Expand Down Expand Up @@ -384,14 +396,15 @@ public void OrchestrationServiceStats(
ExtensionVersion);
}

[Event(113, Level = EventLevel.Informational)]
[Event(113, Level = EventLevel.Informational, Version = 2)]
public void RenewingMessage(
string Account,
string TaskHub,
string InstanceId,
string ExecutionId,
string PartitionId,
string EventType,
int TaskEventId,
string MessageId,
int VisibilityTimeoutSeconds,
string ExtensionVersion)
Expand All @@ -405,19 +418,22 @@ public void RenewingMessage(
ExecutionId ?? string.Empty,
PartitionId,
EventType,
TaskEventId,
MessageId,
VisibilityTimeoutSeconds,
ExtensionVersion);
}

[Event(114, Level = EventLevel.Error)]
[Event(114, Level = EventLevel.Error, Version = 2)]
public void MessageFailure(
string Account,
string TaskHub,
string MessageId,
string InstanceId,
string ExecutionId,
string PartitionId,
string EventType,
int TaskEventId,
string Details,
string ExtensionVersion)
{
Expand All @@ -426,10 +442,12 @@ public void MessageFailure(
114,
Account,
TaskHub,
MessageId,
InstanceId,
ExecutionId ?? string.Empty,
PartitionId,
EventType,
TaskEventId,
Details,
ExtensionVersion);
}
Expand Down Expand Up @@ -486,14 +504,15 @@ public void WaitingForMoreMessages(
ExtensionVersion);
}

[Event(118, Level = EventLevel.Warning)]
[Event(118, Level = EventLevel.Warning, Version = 2)]
public void ReceivedOutOfOrderMessage(
string Account,
string TaskHub,
string InstanceId,
string ExecutionId,
string PartitionId,
string EventType,
int TaskEventId,
string MessageId,
int Episode,
string ExtensionVersion)
Expand All @@ -507,6 +526,7 @@ public void ReceivedOutOfOrderMessage(
ExecutionId ?? string.Empty,
PartitionId,
EventType,
TaskEventId,
MessageId,
Episode,
ExtensionVersion);
Expand Down Expand Up @@ -789,7 +809,16 @@ public void InstanceStatusUpdate(
string ExtensionVersion)
{
EnsureLogicalTraceActivityId();
this.WriteEvent(135, Account, TaskHub, InstanceId, ExecutionId ?? string.Empty, EventType, Episode, LatencyMs, ExtensionVersion);
this.WriteEvent(
135,
Account,
TaskHub,
InstanceId,
ExecutionId ?? string.Empty,
EventType,
Episode,
LatencyMs,
ExtensionVersion);
}

[Event(136, Level = EventLevel.Informational)]
Expand Down Expand Up @@ -866,12 +895,13 @@ public void DiscardingWorkItem(
ExtensionVersion);
}

[Event(140, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 3)]
[Event(140, Level = EventLevel.Informational, Task = Tasks.Processing, Opcode = EventOpcode.Receive, Version = 4)]
public void ProcessingMessage(
Guid relatedActivityId,
string Account,
string TaskHub,
string EventType,
int TaskEventId,
string InstanceId,
string ExecutionId,
string MessageId,
Expand All @@ -888,6 +918,7 @@ public void ProcessingMessage(
Account,
TaskHub,
EventType,
TaskEventId,
InstanceId,
ExecutionId ?? string.Empty,
MessageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
session.Instance.ExecutionId,
session.ControlQueue.Name,
message.TaskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(message.TaskMessage.Event),
message.OriginalQueueMessage.Id,
message.Episode,
Utils.ExtensionVersion);
Expand Down Expand Up @@ -731,6 +732,7 @@ internal static void TraceMessageReceived(MessageData data, string storageAccoun
storageAccountName,
taskHubName,
taskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(taskMessage.Event),
taskMessage.OrchestrationInstance.InstanceId,
taskMessage.OrchestrationInstance.ExecutionId,
queueMessage.Id,
Expand Down
11 changes: 10 additions & 1 deletion src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net451</TargetFrameworks>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<FileVersion>1.4.0</FileVersion>
<FileVersion>1.4.1</FileVersion>
<AssemblyVersion>$(FileVersion)</AssemblyVersion>
<Version>$(FileVersion)</Version>
<IncludeSymbols>true</IncludeSymbols>
<Description>Azure Storage provider extension for the Durable Task Framework.</Description>
<PackageTags>Azure Task Durable Orchestration Workflow Activity Reliable AzureStorage</PackageTags>
<PackageId>Microsoft.Azure.DurableTask.AzureStorage</PackageId>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<DebugSymbols>true</DebugSymbols>
<DebugType>embedded</DebugType>
<IncludeSymbols>false</IncludeSymbols>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-*" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net451'">
<PackageReference Include="Newtonsoft.Json" Version="7.0.1" />
Expand Down
8 changes: 5 additions & 3 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ await batch.ParallelForEachAsync(async delegate (CloudQueueMessage queueMessage)
AnalyticsEventSource.Log.MessageFailure(
this.storageAccountName,
this.settings.TaskHubName,
string.Empty,
string.Empty,
string.Empty /* MessageId */,
string.Empty /* InstanceId */,
string.Empty /* ExecutionId */,
this.storageQueue.Name,
string.Empty,
string.Empty /* EventType */,
0 /* TaskEventId */,
e.ToString(),
Utils.ExtensionVersion);

Expand Down
Loading

0 comments on commit 8414f6a

Please sign in to comment.