Skip to content

Commit

Permalink
[Threading] ThreadPool refactor, reduce allocation, fix wrong argument
Browse files Browse the repository at this point in the history
  • Loading branch information
Eideren authored and xen2 committed Aug 29, 2019
1 parent 40d7280 commit b6a4833
Showing 1 changed file with 78 additions and 45 deletions.
123 changes: 78 additions & 45 deletions sources/core/Xenko.Core/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@ namespace Xenko.Core.Threading
/// </remarks>
internal class ThreadPool
{
private const long MaxIdleTimeInTicks = 5 * TimeSpan.TicksPerSecond;
private const int MaxIdleTimeInMS = 5000;
private readonly long MaxIdleTimeTS = (long)((double)Stopwatch.Frequency / 1000 * MaxIdleTimeInMS);

public static readonly ThreadPool Instance = new ThreadPool();

private readonly Action<object> cachedTaskLoop;

private readonly int maxThreadCount = Environment.ProcessorCount + 2;
private readonly Queue<Action> workItems = new Queue<Action>();
private readonly ManualResetEvent workAvailable = new ManualResetEvent(false);

private SpinLock spinLock = new SpinLock();
private int workingCount;
/// <summary> Usage only within <see cref="spinLock"/> </summary>
private int busyCount;
private int aliveCount;

public ThreadPool()
{
// Cache delegate to avoid pointless allocation
cachedTaskLoop = (o) => ProcessWorkItems();
}

public void QueueWorkItem([NotNull] [Pooled] Action workItem)
{
var lockTaken = false;
bool lockTaken = false;
bool startNewTask = false;
try
{
spinLock.Enter(ref lockTaken);
Expand All @@ -41,69 +50,93 @@ public void QueueWorkItem([NotNull] [Pooled] Action workItem)
workItems.Enqueue(workItem);
workAvailable.Set();

var curWorkingCount = Interlocked.CompareExchange(ref workingCount, 0, 0);
if (curWorkingCount + 1 >= aliveCount && aliveCount < maxThreadCount)
// We're only locking when potentially increasing aliveCount as we
// don't want to go above our maximum amount of threads.
int curBusyCount = Interlocked.CompareExchange(ref busyCount, 0, 0);
int curAliveCount = Interlocked.CompareExchange(ref aliveCount, 0, 0);
if (curBusyCount + 1 >= curAliveCount && curAliveCount < maxThreadCount)
{
aliveCount++;
Task.Factory.StartNew(ProcessWorkItems, TaskCreationOptions.LongRunning);
// Start threads as busy otherwise only one thread will be created
// when calling this function multiple times in a row
Interlocked.Increment(ref busyCount);
Interlocked.Increment(ref aliveCount);
startNewTask = true;
}
}
finally
{
if (lockTaken)
{
spinLock.Exit(true);
}
}
// No point in wasting spins on the lock while creating the task
if (startNewTask)
{
Task.Factory.StartNew(cachedTaskLoop, null, TaskCreationOptions.LongRunning);
}
}

private void ProcessWorkItems(object state)
private void ProcessWorkItems()
{
long lastWork = Stopwatch.GetTimestamp();
TimeSpan maxIdleTime = TimeSpan.FromTicks(MaxIdleTimeInTicks);
while(true)
Interlocked.Decrement(ref busyCount);
try
{
Action workItem = null;
var lockTaken = false;
bool idleForTooLong = Utilities.ConvertRawToTimestamp(Stopwatch.GetTimestamp() - lastWork) < maxIdleTime;
try
long lastWorkTS = Stopwatch.GetTimestamp();
while (true)
{
spinLock.Enter(ref lockTaken);

if (workItems.Count > 0)
Action workItem = null;
bool lockTaken = false;
try
{
workItem = workItems.Dequeue();
if (workItems.Count == 0)
workAvailable.Reset();
spinLock.Enter(ref lockTaken);
if (workItems.Count > 0)
{
workItem = workItems.Dequeue();
if (workItems.Count == 0)
{
workAvailable.Reset();
}
}
}
else if (idleForTooLong)
finally
{
aliveCount--;
return;
if (lockTaken)
{
spinLock.Exit(true);
}
}
}
finally
{
if (lockTaken)
spinLock.Exit(true);
}

if (workItem != null)
{
Interlocked.Increment(ref workingCount);
try

if (workItem == null)
{
workItem.Invoke();
bool idleForTooLong = Stopwatch.GetTimestamp() - lastWorkTS > MaxIdleTimeTS;
// Wait for another work item to be (potentially) available
if (idleForTooLong || workAvailable.WaitOne(MaxIdleTimeInMS) == false)
{
// No work given in the last MaxIdleTimeTS, close this task
return;
}
}
catch (Exception)
else
{
// Ignoring Exception
Interlocked.Increment(ref busyCount);
try
{
workItem();
}
// Let exceptions fall into unhandled as we don't have any
// good mechanisms to pass it elegantly over to user-land yet
finally
{
Interlocked.Decrement(ref busyCount);
}
lastWorkTS = Stopwatch.GetTimestamp();
}
Interlocked.Decrement(ref workingCount);
PooledDelegateHelper.Release(workItem);
lastWork = Stopwatch.GetTimestamp();
}

// Wait for another work item to be (potentially) available
workAvailable.WaitOne(maxIdleTime);
}
finally
{
Interlocked.Decrement(ref aliveCount);
}
}
}
Expand Down

0 comments on commit b6a4833

Please sign in to comment.