Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create benchmarks for System.Threading.Tasks.Dataflow #951

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/benchmarks/micro/MicroBenchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<PackageReference Include="System.Runtime.Serialization.Json" Version="4.3.0" />
<PackageReference Include="System.Security.Cryptography.Primitives" Version="4.3.0" />
<PackageReference Include="System.Threading.Channels" Version="4.5.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.10.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.2" />
<PackageReference Include="System.Xml.XmlSerializer" Version="4.3.0" />
<PackageReference Include="Utf8Json" Version="1.3.7" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace System.Threading.Tasks.Dataflow.Tests
{
public class ActionBlockPerfTests : TargetPerfTests<ActionBlock<int>>
{
public override ActionBlock<int> CreateBlock() => new ActionBlock<int>(i => { });
}

public class ParallelActionBlockPerfTests : TargetPerfTests<ActionBlock<int>>
{
public override ActionBlock<int> CreateBlock() =>
new ActionBlock<int>(
i => { },
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
}
);
}

public class SingleProducerConstrainedActionBlockPerfTests : TargetPerfTests<ITargetBlock<int>>
{
public override ITargetBlock<int> CreateBlock() =>
new ActionBlock<int>(
i => { },
new ExecutionDataflowBlockOptions
{
SingleProducerConstrained = true
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace System.Threading.Tasks.Dataflow.Tests
{
public class BatchBlockPerfTests : ReceivablePropagatorPerfTests<BatchBlock<int>, int[]>
{
protected override int ReceiveSize { get; } = 100;
public override BatchBlock<int> CreateBlock() => new BatchBlock<int>(ReceiveSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using BenchmarkDotNet.Attributes;
using MicroBenchmarks;

namespace System.Threading.Tasks.Dataflow.Tests
{
[BenchmarkCategory(Categories.CoreFX)]
public class BatchedJoin2BlockPerfTests : MultiTargetReceivableSourceBlockPerfTests<BatchedJoinBlock<int, int>, Tuple<IList<int>, IList<int>>>
{
protected override int ReceiveSize { get; } = 100;

public override BatchedJoinBlock<int, int> CreateBlock() => new BatchedJoinBlock<int, int>(ReceiveSize);

protected override ITargetBlock<int>[] Targets => new[] { block.Target1, block.Target2 };
}

[BenchmarkCategory(Categories.CoreFX)]
public class BatchedJoin3BlockPerfTests : MultiTargetReceivableSourceBlockPerfTests<BatchedJoinBlock<int, int, int>, Tuple<IList<int>, IList<int>, IList<int>>>
{
protected override int ReceiveSize { get; } = 100;

public override BatchedJoinBlock<int, int, int> CreateBlock() => new BatchedJoinBlock<int, int, int>(ReceiveSize);

protected override ITargetBlock<int>[] Targets => new[] { block.Target1, block.Target2, block.Target3 };
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using BenchmarkDotNet.Attributes;
using MicroBenchmarks;

namespace System.Threading.Tasks.Dataflow.Tests
{
[BenchmarkCategory(Categories.CoreFX)]
public class BroadcastBlockPerfTests : ReceivablePropagatorPerfTests<BroadcastBlock<int>, int>
{
public override BroadcastBlock<int> CreateBlock() => new BroadcastBlock<int>(i => i);

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task PostMultiReceiveParallel() => MultiParallel(() => Post());

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task SendMultiReceiveAsyncParallel() => MultiParallel(() => SendAsync());

private async Task MultiParallel(Func<Task> doTask)
{
BlockSetup();
var options = new DataflowLinkOptions { PropagateCompletion = true };
var action1 = new ActionBlock<int>(i => { });
var action2 = new ActionBlock<int>(i => { });
block.LinkTo(action1, options);
block.LinkTo(action2, options);

await doTask();
block.Complete();
manandre marked this conversation as resolved.
Show resolved Hide resolved

await Task.WhenAll(action1.Completion, action2.Completion);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace System.Threading.Tasks.Dataflow.Tests
{
public class UnboundedBufferBlockPerfTests : ReceivablePropagatorPerfTests<BufferBlock<int>, int>
{
public override BufferBlock<int> CreateBlock() => new BufferBlock<int>();
}

public class BoundedBufferBlockPerfTests : BoundedReceivablePropagatorPerfTests<BufferBlock<int>, int>
{
public override BufferBlock<int> CreateBlock() =>
new BufferBlock<int>(
new DataflowBlockOptions
{
BoundedCapacity = 100
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Linq;
using BenchmarkDotNet.Attributes;
using MicroBenchmarks;

namespace System.Threading.Tasks.Dataflow.Tests
{
[BenchmarkCategory(Categories.CoreFX)]
public abstract class PerfTests<T> where T : IDataflowBlock
{
protected const int MessagesCount = 100_000;
protected T block;

public abstract T CreateBlock();

[GlobalSetup]
public void BlockSetup()
{
block = CreateBlock();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this GlobalSetup still needed when we're calling BlockSetup on each benchmark invocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only call BlockSetup from the benchmark code when we are forced to call Complete on the tested block to have a valid test case. This concerns only two benchmarks.


[Benchmark]
public async Task Completion()
{
BlockSetup();
block.Complete();
await block.Completion;
manandre marked this conversation as resolved.
Show resolved Hide resolved
}

protected static Task Post(ITargetBlock<int> target, bool retry = false) => Task.Run(() =>
{
for (int i = 0; i < MessagesCount; i++)
{
while (!target.Post(i) && retry) ;
}
});

protected static Task Receive<U>(ISourceBlock<U> source, int receiveSize = 1) => Task.Run(() =>
{
for (int i = 0; i < MessagesCount / receiveSize; i++)
{
source.Receive();
}
});

protected static Task TryReceive<U>(IReceivableSourceBlock<U> source, int receiveSize = 1, bool retry = false) => Task.Run(() =>
manandre marked this conversation as resolved.
Show resolved Hide resolved
{
for (int i = 0; i < MessagesCount / receiveSize; i++)
{
while (!source.TryReceive(out _) && retry) ;
}
});

protected static Task TryReceiveAll<U>(IReceivableSourceBlock<U> source) => Task.Run(() =>
{
source.TryReceiveAll(out _);
});
manandre marked this conversation as resolved.
Show resolved Hide resolved

protected static async Task SendAsync(ITargetBlock<int> target)
{
for (int i = 0; i < MessagesCount; i++)
{
await target.SendAsync(i);
}
}

protected static async Task ReceiveAsync<U>(ISourceBlock<U> source, int receiveSize = 1)
{
for (int i = 0; i < MessagesCount / receiveSize; i++)
{
if (await source.OutputAvailableAsync())
{
await source.ReceiveAsync();
}
}
}
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class SourceBlockPerfTests<T, U> : PerfTests<T> where T : ISourceBlock<U>
{
protected virtual int ReceiveSize { get; } = 1;
protected virtual Task Receive() => Receive(block, ReceiveSize);
protected virtual Task ReceiveAsync() => ReceiveAsync(block, ReceiveSize);
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class ReceivableSourceBlockPerfTests<T, U> : SourceBlockPerfTests<T, U> where T : IReceivableSourceBlock<U>
{
protected virtual Task TryReceive() => TryReceive(block, ReceiveSize);
protected virtual Task TryReceiveAll() => TryReceiveAll(block);
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class TargetPerfTests<T> : PerfTests<T> where T : ITargetBlock<int>
{
[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task Post() => Post(block);

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task SendAsync() => SendAsync(block);
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class PropagatorPerfTests<T, U> : TargetPerfTests<T> where T : IPropagatorBlock<int, U>
{
protected virtual int ReceiveSize { get; } = 1;
protected Task Receive() => Receive(block, ReceiveSize);
protected Task ReceiveAsync() => ReceiveAsync(block, ReceiveSize);

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostReceiveSequential()
{
await Post();
await Receive();
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task SendReceiveAsyncSequential()
{
await SendAsync();
await ReceiveAsync();
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task SendReceiveAsyncParallel()
{
await Task.WhenAll(SendAsync(), ReceiveAsync());
}
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class ReceivablePropagatorPerfTests<T, U> : PropagatorPerfTests<T, U> where T : IPropagatorBlock<int, U>, IReceivableSourceBlock<U>
{
protected Task TryReceive() => TryReceive(block, ReceiveSize);
protected Task TryReceiveAll() => TryReceiveAll(block);

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostTryReceiveSequential()
{
await Post();
await TryReceive();
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostTryReceiveParallel()
{
await Task.WhenAll(Post(), TryReceive());
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostTryReceiveAllSequential()
{
await Post();
await TryReceiveAll();
}
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class BoundedReceivablePropagatorPerfTests<T, U> : PerfTests<T> where T : IPropagatorBlock<int, U>, IReceivableSourceBlock<U>
{
[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostTryReceiveParallel()
{
await Task.WhenAll(Post(block, retry: true), TryReceive(block, retry: true));
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task SendReceiveAsyncParallel()
{
await Task.WhenAll(SendAsync(block), ReceiveAsync(block));
}
}

[BenchmarkCategory(Categories.CoreFX)]
public abstract class MultiTargetReceivableSourceBlockPerfTests<T, U> : ReceivableSourceBlockPerfTests<T, U> where T : IReceivableSourceBlock<U>
{
protected abstract ITargetBlock<int>[] Targets { get; }

protected Task Post() => Task.WhenAll(Targets.Select(target => Post(target)));
protected Task SendAsync() => Task.WhenAll(Targets.Select(target => SendAsync(target)));

protected override Task Receive() => MultiParallel(() => Receive(block, ReceiveSize));
protected override Task TryReceive() => MultiParallel(() => TryReceive(block, ReceiveSize));
protected override Task ReceiveAsync() => MultiParallel(() => ReceiveAsync(block, ReceiveSize));

private Task MultiParallel(Func<Task> doTask) =>
Task.WhenAll(
Enumerable.Range(
0,
ReceiveSize == 1 ? 1 : Targets.Length
)
.Select(
_ => doTask()
)
);

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task PostMultiReceiveOnceParallel() => Task.WhenAll(Post(), Receive());

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task PostMultiTryReceiveOnceParallel() => Task.WhenAll(Post(), TryReceive());

[Benchmark(OperationsPerInvoke = MessagesCount)]
public async Task PostMultiTryReceiveAllOnceParallel()
{
await Post();
await TryReceiveAll();
}

[Benchmark(OperationsPerInvoke = MessagesCount)]
public Task SendAsyncMultiReceiveOnceParallel() => Task.WhenAll(SendAsync(), ReceiveAsync());

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using BenchmarkDotNet.Attributes;
using MicroBenchmarks;

namespace System.Threading.Tasks.Dataflow.Tests
{
[BenchmarkCategory(Categories.CoreFX)]
public class JoinBlock2PerfTests : MultiTargetReceivableSourceBlockPerfTests<JoinBlock<int, int>, Tuple<int, int>>
{
public override JoinBlock<int, int> CreateBlock() => new JoinBlock<int, int>();

protected override ITargetBlock<int>[] Targets => new[] { block.Target1, block.Target2 };
}

[BenchmarkCategory(Categories.CoreFX)]
public class JoinBlock3PerfTests : MultiTargetReceivableSourceBlockPerfTests<JoinBlock<int, int, int>, Tuple<int, int, int>>
{
public override JoinBlock<int, int, int> CreateBlock() => new JoinBlock<int, int, int>();

protected override ITargetBlock<int>[] Targets => new[] { block.Target1, block.Target2, block.Target3 };
}
}
Loading