forked from dotnet/docs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample03.cs
96 lines (76 loc) · 3.02 KB
/
example03.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//<snippet03>
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
class Example
{
// Limit the collection size to 2000 items at any given time.
// Set itemsToProduce to > 500 to hit the limit.
const int upperLimit = 1000;
// Adjust this number to see how it impacts the producing-consuming pattern.
const int itemsToProduce = 100;
static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);
// Variables for diagnostic output only.
static Stopwatch sw = new Stopwatch();
static int totalAdditions = 0;
// Counter for synchronizing producers.
static int producersStillRunning = 2;
static void Main()
{
// Start the stopwatch.
sw.Start();
// Queue the Producer threads. Store in an array
// for use with ContinueWhenAll
Task[] producers = new Task[2];
producers[0] = Task.Run(() => RunProducer("A", 0));
producers[1] = Task.Run(() => RunProducer("B", itemsToProduce));
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());
// Queue the Consumer thread. Put this call
// before Parallel.Invoke to begin consuming as soon as
// the producers add items.
Task.Run(() => RunConsumer());
// Keep the console window open while the
// consumer thread completes its output.
Console.ReadKey(true);
}
static void RunProducer(string ID, int start)
{
int additions = 0;
for (int i = start; i < start + itemsToProduce; i++)
{
// The data that is added to the collection.
long ticks = sw.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);
if(!collection.IsAddingCompleted)
collection.Add(ticks);
// Counter for demonstration purposes only.
additions++;
// Uncomment this line to
// slow down the producer threads ing.
Thread.SpinWait(100000);
}
Interlocked.Add(ref totalAdditions, additions);
Console.WriteLine("{0} is done adding: {1} items", ID, additions);
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the
// underlying collection.
int subtractions = 0;
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions++, collection.Count);
}
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count);
sw.Stop();
Console.WriteLine("Press any key to exit");
}
}
//</snippet03>