forked from Azure/durabletask
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Dispatcher middleware for task orchestrations and task activities (Az…
- Loading branch information
1 parent
6391a67
commit fffab58
Showing
8 changed files
with
402 additions
and
52 deletions.
There are no files selected for viewing
165 changes: 165 additions & 0 deletions
165
Test/DurableTask.Core.Tests/DispatcherMiddlewareTests.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
namespace DurableTask.Core.Tests | ||
{ | ||
using System; | ||
using System.Diagnostics; | ||
using System.Text; | ||
using System.Threading.Tasks; | ||
using DurableTask.Core.History; | ||
using DurableTask.Emulator; | ||
using DurableTask.Test.Orchestrations; | ||
using Microsoft.VisualStudio.TestTools.UnitTesting; | ||
|
||
[TestClass] | ||
public class DispatcherMiddlewareTests | ||
{ | ||
TaskHubWorker worker; | ||
TaskHubClient client; | ||
|
||
[TestInitialize] | ||
public async Task Initialize() | ||
{ | ||
var service = new LocalOrchestrationService(); | ||
this.worker = new TaskHubWorker(service); | ||
|
||
await this.worker | ||
.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration)) | ||
.AddTaskActivities(typeof(SimplestGetUserTask), typeof(SimplestSendGreetingTask)) | ||
.StartAsync(); | ||
|
||
this.client = new TaskHubClient(service); | ||
} | ||
|
||
[TestCleanup] | ||
public async Task TestCleanup() | ||
{ | ||
await this.worker.StopAsync(true); | ||
} | ||
|
||
[TestMethod] | ||
public async Task DispatchMiddlewareContextBuiltInProperties() | ||
{ | ||
TaskOrchestration orchestration = null; | ||
OrchestrationRuntimeState state = null; | ||
OrchestrationInstance instance1 = null; | ||
|
||
TaskActivity activity = null; | ||
TaskScheduledEvent taskScheduledEvent = null; | ||
OrchestrationInstance instance2 = null; | ||
|
||
this.worker.AddOrchestrationDispatcherMiddleware((context, next) => | ||
{ | ||
orchestration = context.GetProperty<TaskOrchestration>(); | ||
state = context.GetProperty<OrchestrationRuntimeState>(); | ||
instance1 = context.GetProperty<OrchestrationInstance>(); | ||
|
||
return next(); | ||
}); | ||
|
||
this.worker.AddActivityDispatcherMiddleware((context, next) => | ||
{ | ||
activity = context.GetProperty<TaskActivity>(); | ||
taskScheduledEvent = context.GetProperty<TaskScheduledEvent>(); | ||
instance2 = context.GetProperty<OrchestrationInstance>(); | ||
|
||
return next(); | ||
}); | ||
|
||
var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null); | ||
|
||
TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10); | ||
await this.client.WaitForOrchestrationAsync(instance, timeout); | ||
|
||
Assert.IsNotNull(orchestration); | ||
Assert.IsNotNull(state); | ||
Assert.IsNotNull(instance1); | ||
|
||
Assert.IsNotNull(activity); | ||
Assert.IsNotNull(taskScheduledEvent); | ||
Assert.IsNotNull(instance2); | ||
|
||
Assert.AreNotSame(instance1, instance2); | ||
Assert.AreEqual(instance1.InstanceId, instance2.InstanceId); | ||
} | ||
|
||
[TestMethod] | ||
public async Task OrchestrationDispatcherMiddlewareContextFlow() | ||
{ | ||
StringBuilder output = null; | ||
|
||
for (int i = 0; i < 10; i++) | ||
{ | ||
string value = i.ToString(); | ||
this.worker.AddOrchestrationDispatcherMiddleware(async (context, next) => | ||
{ | ||
output = context.GetProperty<StringBuilder>("output"); | ||
if (output == null) | ||
{ | ||
output = new StringBuilder(); | ||
context.SetProperty("output", output); | ||
} | ||
|
||
output.Append(value); | ||
await next(); | ||
output.Append(value); | ||
}); | ||
} | ||
|
||
var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null); | ||
|
||
TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10); | ||
await this.client.WaitForOrchestrationAsync(instance, timeout); | ||
|
||
// Each reply gets a new context, so the output should stay the same regardless of how | ||
// many replays an orchestration goes through. | ||
Assert.IsNotNull(output); | ||
Assert.AreEqual("01234567899876543210", output.ToString()); | ||
} | ||
|
||
[TestMethod] | ||
public async Task ActivityDispatcherMiddlewareContextFlow() | ||
{ | ||
StringBuilder output = null; | ||
|
||
for (int i = 0; i < 10; i++) | ||
{ | ||
string value = i.ToString(); | ||
this.worker.AddActivityDispatcherMiddleware(async (context, next) => | ||
{ | ||
output = context.GetProperty<StringBuilder>("output"); | ||
if (output == null) | ||
{ | ||
output = new StringBuilder(); | ||
context.SetProperty("output", output); | ||
} | ||
|
||
output.Append(value); | ||
await next(); | ||
output.Append(value); | ||
}); | ||
} | ||
|
||
var instance = await this.client.CreateOrchestrationInstanceAsync(typeof(SimplestGreetingsOrchestration), null); | ||
|
||
TimeSpan timeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 1000 : 10); | ||
await this.client.WaitForOrchestrationAsync(instance, timeout); | ||
|
||
// Each actiivty gets a new context, so the output should stay the same regardless of how | ||
// many activities an orchestration schedules (as long as there is at least one). | ||
Assert.IsNotNull(output); | ||
Assert.AreEqual("01234567899876543210", output.ToString()); | ||
} | ||
} | ||
} |
75 changes: 75 additions & 0 deletions
75
src/DurableTask.Core/Middleware/DispatchMiddlewareContext.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
namespace DurableTask.Core.Middleware | ||
{ | ||
using System; | ||
using System.Collections.Generic; | ||
|
||
/// <summary> | ||
/// Context data that can be used to share data between middleware. | ||
/// </summary> | ||
public class DispatchMiddlewareContext | ||
{ | ||
internal DispatchMiddlewareContext() | ||
{ | ||
} | ||
|
||
/// <summary> | ||
/// Sets a property value to the context using the full name of the type as the key. | ||
/// </summary> | ||
/// <typeparam name="T">The type of the property.</typeparam> | ||
/// <param name="value">The value of the property.</param> | ||
public void SetProperty<T>(T value) | ||
{ | ||
this.SetProperty(typeof(T).FullName, value); | ||
} | ||
|
||
/// <summary> | ||
/// Sets a named property value to the context. | ||
/// </summary> | ||
/// <typeparam name="T">The type of the property.</typeparam> | ||
/// <param name="key">The name of the property.</param> | ||
/// <param name="value">The value of the property.</param> | ||
public void SetProperty<T>(string key, T value) | ||
{ | ||
this.Properties[key] = value; | ||
} | ||
|
||
/// <summary> | ||
/// Gets a property value from the context using the full name of <typeparamref name="T"/>. | ||
/// </summary> | ||
/// <typeparam name="T">The type of the property.</typeparam> | ||
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns> | ||
public T GetProperty<T>() | ||
{ | ||
return this.GetProperty<T>(typeof(T).FullName); | ||
} | ||
|
||
/// <summary> | ||
/// Gets a named property value from the context. | ||
/// </summary> | ||
/// <typeparam name="T"></typeparam> | ||
/// <param name="key">The name of the property value.</param> | ||
/// <returns>The value of the property or <c>default(T)</c> if the property is not defined.</returns> | ||
public T GetProperty<T>(string key) | ||
{ | ||
return this.Properties.TryGetValue(key, out object value) ? (T)value : default(T); | ||
} | ||
|
||
/// <summary> | ||
/// Gets a key/value collection that can be used to share data between middleware. | ||
/// </summary> | ||
public IDictionary<string, object> Properties { get; } = new Dictionary<string, object>(StringComparer.Ordinal); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
src/DurableTask.Core/Middleware/DispatchMiddlewareDelegate.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
namespace DurableTask.Core.Middleware | ||
{ | ||
using System.Threading.Tasks; | ||
|
||
/// <summary> | ||
/// A function that runs in the task execution middleware pipeline. | ||
/// </summary> | ||
/// <param name="context">The <see cref="DispatchMiddlewareContext"/> for the task execution.</param> | ||
/// <returns>A task that represents the completion of the durable task execution.</returns> | ||
public delegate Task DispatchMiddlewareDelegate(DispatchMiddlewareContext context); | ||
} |
49 changes: 49 additions & 0 deletions
49
src/DurableTask.Core/Middleware/DispatchMiddlewarePipeline.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
// ---------------------------------------------------------------------------------- | ||
// Copyright Microsoft Corporation | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// ---------------------------------------------------------------------------------- | ||
|
||
namespace DurableTask.Core.Middleware | ||
{ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
|
||
class DispatchMiddlewarePipeline | ||
{ | ||
readonly IList<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>> components = | ||
new List<Func<DispatchMiddlewareDelegate, DispatchMiddlewareDelegate>>(); | ||
|
||
public Task RunAsync(DispatchMiddlewareContext context, DispatchMiddlewareDelegate handler) | ||
{ | ||
// Build the delegate chain | ||
foreach (var component in this.components.Reverse()) | ||
{ | ||
handler = component(handler); | ||
} | ||
|
||
return handler(context); | ||
} | ||
|
||
public void Add(Func<DispatchMiddlewareContext, Func<Task>, Task> middleware) | ||
{ | ||
this.components.Add(next => | ||
{ | ||
return context => | ||
{ | ||
Func<Task> simpleNext = () => next(context); | ||
return middleware(context, simpleNext); | ||
}; | ||
}); | ||
} | ||
} | ||
} |
Oops, something went wrong.