Skip to content

Commit

Permalink
implemented disposal logic for calls
Browse files Browse the repository at this point in the history
  • Loading branch information
jtattermusch committed May 21, 2015
1 parent 9f550a3 commit 2d2652d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 15 deletions.
7 changes: 4 additions & 3 deletions src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
readonly Action disposeAction;

public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result)
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
{
this.requestStream = requestStream;
this.result = result;
this.disposeAction = disposeAction;
}

/// <summary>
Expand Down Expand Up @@ -90,8 +92,7 @@ public TaskAwaiter<TResponse> GetAwaiter()
/// </summary>
public void Dispose()
{
// TODO(jtattermusch): implement
throw new NotImplementedException();
disposeAction.Invoke();
}
}
}
7 changes: 4 additions & 3 deletions src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;

public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream)
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.disposeAction = disposeAction;
}

/// <summary>
Expand Down Expand Up @@ -81,8 +83,7 @@ public IClientStreamWriter<TRequest> RequestStream
/// </summary>
public void Dispose()
{
// TODO(jtattermusch): implement
throw new NotImplementedException();
disposeAction.Invoke();
}
}
}
7 changes: 4 additions & 3 deletions src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Action disposeAction;

public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream)
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.responseStream = responseStream;
this.disposeAction = disposeAction;
}

/// <summary>
Expand All @@ -68,8 +70,7 @@ public IAsyncStreamReader<TResponse> ResponseStream
/// </summary>
public void Dispose()
{
// TODO(jtattermusch): implement
throw new NotImplementedException();
disposeAction.Invoke();
}
}
}
6 changes: 3 additions & 3 deletions src/csharp/Grpc.Core/Calls.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequ
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
}

public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
Expand All @@ -85,7 +85,7 @@ public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreaming
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
}

public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
Expand All @@ -98,7 +98,7 @@ public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreaming
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
}

private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)
Expand Down
16 changes: 13 additions & 3 deletions src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,19 @@ public void Div2()
Assert.AreEqual(0, response.Remainder);
}

// TODO(jtattermusch): test division by zero
[Test]
public void DivByZero()
{
try
{
DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
}

[Test]
public void DivAsync()
Expand All @@ -119,7 +131,6 @@ public void Fib()
var responses = await call.ResponseStream.ToList();
CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
responses.ConvertAll((n) => n.Num_));

}
}).Wait();
}
Expand Down Expand Up @@ -154,7 +165,6 @@ public void DivMany()
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};


using (var call = client.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
Expand Down

0 comments on commit 2d2652d

Please sign in to comment.