Skip to content
This repository was archived by the owner on Apr 8, 2020. It is now read-only.

Commit 2799861

Browse files
Support cancellation of NodeServices invocations
1 parent f358d8e commit 2799861

File tree

7 files changed

+86
-26
lines changed

7 files changed

+86
-26
lines changed

src/Microsoft.AspNetCore.NodeServices/HostingModels/HttpNodeInstance.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Net.Http;
55
using System.Text;
66
using System.Text.RegularExpressions;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Microsoft.Extensions.Logging;
910
using Newtonsoft.Json;
@@ -57,15 +58,17 @@ private static string MakeCommandLineOptions(int port)
5758
return $"--port {port}";
5859
}
5960

60-
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
61+
protected override async Task<T> InvokeExportAsync<T>(
62+
NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
6163
{
6264
var payloadJson = JsonConvert.SerializeObject(invocationInfo, jsonSerializerSettings);
6365
var payload = new StringContent(payloadJson, Encoding.UTF8, "application/json");
64-
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload);
66+
var response = await _client.PostAsync("http://localhost:" + _portNumber, payload, cancellationToken);
6567

6668
if (!response.IsSuccessStatusCode)
6769
{
68-
var responseErrorString = await response.Content.ReadAsStringAsync();
70+
// Unfortunately there's no true way to cancel ReadAsStringAsync calls, hence AbandonIfCancelled
71+
var responseErrorString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
6972
throw new Exception("Call to Node module failed with error: " + responseErrorString);
7073
}
7174

@@ -81,11 +84,11 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
8184
typeof(T).FullName);
8285
}
8386

84-
var responseString = await response.Content.ReadAsStringAsync();
87+
var responseString = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
8588
return (T)(object)responseString;
8689

8790
case "application/json":
88-
var responseJson = await response.Content.ReadAsStringAsync();
91+
var responseJson = await response.Content.ReadAsStringAsync().OrThrowOnCancellation(cancellationToken);
8992
return JsonConvert.DeserializeObject<T>(responseJson, jsonSerializerSettings);
9093

9194
case "application/octet-stream":
@@ -97,7 +100,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
97100
typeof(T).FullName + ". Instead you must use the generic type System.IO.Stream.");
98101
}
99102

100-
return (T)(object)(await response.Content.ReadAsStreamAsync());
103+
return (T)(object)(await response.Content.ReadAsStreamAsync().OrThrowOnCancellation(cancellationToken));
101104

102105
default:
103106
throw new InvalidOperationException("Unexpected response content type: " + responseContentType.MediaType);
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace Microsoft.AspNetCore.NodeServices.HostingModels
56
{
67
public interface INodeInstance : IDisposable
78
{
8-
Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args);
9+
Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args);
910
}
1011
}

src/Microsoft.AspNetCore.NodeServices/HostingModels/OutOfProcessNodeInstance.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Diagnostics;
44
using System.IO;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Microsoft.Extensions.Logging;
89

@@ -67,7 +68,8 @@ public OutOfProcessNodeInstance(
6768
ConnectToInputOutputStreams();
6869
}
6970

70-
public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOrNull, params object[] args)
71+
public async Task<T> InvokeExportAsync<T>(
72+
CancellationToken cancellationToken, string moduleName, string exportNameOrNull, params object[] args)
7173
{
7274
if (_nodeProcess.HasExited || _nodeProcessNeedsRestart)
7375
{
@@ -79,15 +81,17 @@ public async Task<T> InvokeExportAsync<T>(string moduleName, string exportNameOr
7981
throw new NodeInvocationException(message, null, nodeInstanceUnavailable: true);
8082
}
8183

82-
// Wait until the connection is established. This will throw if the connection fails to initialize.
83-
await _connectionIsReadySource.Task;
84+
// Wait until the connection is established. This will throw if the connection fails to initialize,
85+
// or if cancellation is requested first. Note that we can't really cancel the "establishing connection"
86+
// task because that's shared with all callers, but we can stop waiting for it if this call is cancelled.
87+
await _connectionIsReadySource.Task.OrThrowOnCancellation(cancellationToken);
8488

8589
return await InvokeExportAsync<T>(new NodeInvocationInfo
8690
{
8791
ModuleName = moduleName,
8892
ExportedFunctionName = exportNameOrNull,
8993
Args = args
90-
});
94+
}, cancellationToken);
9195
}
9296

9397
public void Dispose()
@@ -96,7 +100,9 @@ public void Dispose()
96100
GC.SuppressFinalize(this);
97101
}
98102

99-
protected abstract Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo);
103+
protected abstract Task<T> InvokeExportAsync<T>(
104+
NodeInvocationInfo invocationInfo,
105+
CancellationToken cancellationToken);
100106

101107
// This method is virtual, as it provides a way to override the NODE_PATH or the path to node.exe
102108
protected virtual ProcessStartInfo PrepareNodeProcessStartInfo(

src/Microsoft.AspNetCore.NodeServices/HostingModels/SocketNodeInstance.cs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public SocketNodeInstance(string projectPath, string[] watchFileExtensions, stri
5757
_socketAddress = socketAddress;
5858
}
5959

60-
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo)
60+
protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocationInfo, CancellationToken cancellationToken)
6161
{
6262
if (_connectionHasFailed)
6363
{
@@ -70,7 +70,12 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
7070

7171
if (_virtualConnectionClient == null)
7272
{
73-
await EnsureVirtualConnectionClientCreated();
73+
// Although we could pass the cancellationToken into EnsureVirtualConnectionClientCreated and
74+
// have it signal cancellations upstream, that would be a bad thing to do, because all callers
75+
// wait for the same connection task. There's no reason why the first caller should have the
76+
// special ability to cancel the connection process in a way that would affect subsequent
77+
// callers. So, each caller just independently stops awaiting connection if that call is cancelled.
78+
await EnsureVirtualConnectionClientCreated().OrThrowOnCancellation(cancellationToken);
7479
}
7580

7681
// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
@@ -83,7 +88,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
8388
virtualConnection = _virtualConnectionClient.OpenVirtualConnection();
8489

8590
// Send request
86-
await WriteJsonLineAsync(virtualConnection, invocationInfo);
91+
await WriteJsonLineAsync(virtualConnection, invocationInfo, cancellationToken);
8792

8893
// Determine what kind of response format is expected
8994
if (typeof(T) == typeof(Stream))
@@ -96,7 +101,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
96101
else
97102
{
98103
// Parse and return non-streamed JSON response
99-
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection);
104+
var response = await ReadJsonAsync<RpcJsonResponse<T>>(virtualConnection, cancellationToken);
100105
if (response.ErrorMessage != null)
101106
{
102107
throw new NodeInvocationException(response.ErrorMessage, response.ErrorDetails);
@@ -163,27 +168,27 @@ protected override void Dispose(bool disposing)
163168
base.Dispose(disposing);
164169
}
165170

166-
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject)
171+
private static async Task WriteJsonLineAsync(Stream stream, object serializableObject, CancellationToken cancellationToken)
167172
{
168173
var json = JsonConvert.SerializeObject(serializableObject, jsonSerializerSettings);
169174
var bytes = Encoding.UTF8.GetBytes(json + '\n');
170-
await stream.WriteAsync(bytes, 0, bytes.Length);
175+
await stream.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
171176
}
172177

173-
private static async Task<T> ReadJsonAsync<T>(Stream stream)
178+
private static async Task<T> ReadJsonAsync<T>(Stream stream, CancellationToken cancellationToken)
174179
{
175-
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream));
180+
var json = Encoding.UTF8.GetString(await ReadAllBytesAsync(stream, cancellationToken));
176181
return JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
177182
}
178183

179-
private static async Task<byte[]> ReadAllBytesAsync(Stream input)
184+
private static async Task<byte[]> ReadAllBytesAsync(Stream input, CancellationToken cancellationToken)
180185
{
181186
byte[] buffer = new byte[16 * 1024];
182187

183188
using (var ms = new MemoryStream())
184189
{
185190
int read;
186-
while ((read = await input.ReadAsync(buffer, 0, buffer.Length)) > 0)
191+
while ((read = await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
187192
{
188193
ms.Write(buffer, 0, read);
189194
}

src/Microsoft.AspNetCore.NodeServices/INodeServices.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace Microsoft.AspNetCore.NodeServices
56
{
67
public interface INodeServices : IDisposable
78
{
89
Task<T> InvokeAsync<T>(string moduleName, params object[] args);
10+
Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args);
911

1012
Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args);
13+
Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args);
14+
1115

1216
[Obsolete("Use InvokeAsync instead")]
1317
Task<T> Invoke<T>(string moduleName, params object[] args);

src/Microsoft.AspNetCore.NodeServices/NodeServicesImpl.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using Microsoft.AspNetCore.NodeServices.HostingModels;
45

@@ -34,19 +35,29 @@ public Task<T> InvokeAsync<T>(string moduleName, params object[] args)
3435
return InvokeExportAsync<T>(moduleName, null, args);
3536
}
3637

38+
public Task<T> InvokeAsync<T>(CancellationToken cancellationToken, string moduleName, params object[] args)
39+
{
40+
return InvokeExportAsync<T>(cancellationToken, moduleName, null, args);
41+
}
42+
3743
public Task<T> InvokeExportAsync<T>(string moduleName, string exportedFunctionName, params object[] args)
3844
{
39-
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: true);
45+
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, CancellationToken.None);
46+
}
47+
48+
public Task<T> InvokeExportAsync<T>(CancellationToken cancellationToken, string moduleName, string exportedFunctionName, params object[] args)
49+
{
50+
return InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ true, cancellationToken);
4051
}
4152

42-
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry)
53+
public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, string exportedFunctionName, object[] args, bool allowRetry, CancellationToken cancellationToken)
4354
{
4455
ThrowAnyOutstandingDelayedDisposalException();
4556
var nodeInstance = GetOrCreateCurrentNodeInstance();
4657

4758
try
4859
{
49-
return await nodeInstance.InvokeExportAsync<T>(moduleName, exportedFunctionName, args);
60+
return await nodeInstance.InvokeExportAsync<T>(cancellationToken, moduleName, exportedFunctionName, args);
5061
}
5162
catch (NodeInvocationException ex)
5263
{
@@ -69,7 +80,7 @@ public async Task<T> InvokeExportWithPossibleRetryAsync<T>(string moduleName, st
6980
// One the next call, don't allow retries, because we could get into an infinite retry loop, or a long retry
7081
// loop that masks an underlying problem. A newly-created Node instance should be able to accept invocations,
7182
// or something more serious must be wrong.
72-
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, allowRetry: false);
83+
return await InvokeExportWithPossibleRetryAsync<T>(moduleName, exportedFunctionName, args, /* allowRetry */ false, cancellationToken);
7384
}
7485
else
7586
{
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace Microsoft.AspNetCore.NodeServices
5+
{
6+
internal static class TaskExtensions
7+
{
8+
public static Task OrThrowOnCancellation(this Task task, CancellationToken cancellationToken)
9+
{
10+
return task.IsCompleted
11+
? task // If the task is already completed, no need to wrap it in a further layer of task
12+
: task.ContinueWith(
13+
_ => {}, // If the task completes, allow execution to continue
14+
cancellationToken,
15+
TaskContinuationOptions.ExecuteSynchronously,
16+
TaskScheduler.Default);
17+
}
18+
19+
public static Task<T> OrThrowOnCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
20+
{
21+
return task.IsCompleted
22+
? task // If the task is already completed, no need to wrap it in a further layer of task
23+
: task.ContinueWith(
24+
t => t.Result, // If the task completes, pass through its result
25+
cancellationToken,
26+
TaskContinuationOptions.ExecuteSynchronously,
27+
TaskScheduler.Default);
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)