Skip to content

Commit

Permalink
Add AmqpObject.AddClosedCallback method.
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Sep 8, 2017
1 parent cc9aab0 commit d421001
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 9 deletions.
46 changes: 38 additions & 8 deletions src/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public abstract partial class AmqpObject
ManualResetEvent endEvent;

/// <summary>
/// Gets the event used to notify that the object is closed.
/// Gets the event used to notify that the object is closed. Callbacks
/// may not be invoked if they are registered after the object is closed.
/// It is recommend to call AddClosedCallback method.
/// </summary>
public event ClosedCallback Closed;

Expand Down Expand Up @@ -81,15 +83,39 @@ internal void NotifyClosed(Error error)
temp.Set();
}

if (!this.closedNotified)
ClosedCallback closed;
lock (this)
{
closed = this.Closed;
bool notified = this.closedNotified;
this.closedNotified = true;
ClosedCallback closed = this.Closed;
if (closed != null)
if (notified || closed == null)
{
closed(this, error);
return;
}
}

closed(this, error);
}

/// <summary>
/// Adds a callback to be called when the object is called.
/// This method guarantees that the callback is invoked even if
/// it is registered after the object is closed.
/// </summary>
/// <param name="callback">The callback to be invoked.</param>
public void AddClosedCallback(ClosedCallback callback)
{
lock (this)
{
if (!this.closedCalled)
{
this.Closed += callback;
return;
}
}

callback(this, this.error);
}

/// <summary>
Expand All @@ -115,12 +141,16 @@ public void Close(TimeSpan timeout, Error error = null)

internal void CloseInternal(int waitMilliseconds, Error error = null)
{
if (this.closedCalled)
lock (this)
{
return;
if (this.closedCalled)
{
return;
}

this.closedCalled = true;
}

this.closedCalled = true;
// initialize event first to avoid the race with NotifyClosed
if (waitMilliseconds > 0)
{
Expand Down
12 changes: 11 additions & 1 deletion src/IAmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ namespace Amqp
public partial interface IAmqpObject
{
/// <summary>
/// Gets the event used to notify that the object is closed.
/// Gets the event used to notify that the object is closed. Callbacks
/// may not be invoked if they are registered after the object is closed.
/// It is recommend to call AddClosedCallback method.
/// </summary>
event ClosedCallback Closed;

Expand All @@ -85,6 +87,14 @@ public partial interface IAmqpObject
/// </summary>
bool IsClosed { get; }

/// <summary>
/// Adds a callback to be called when the object is called.
/// This method guarantees that the callback is invoked even if
/// it is called after the object is closed.
/// </summary>
/// <param name="callback">The callback to be invoked.</param>
void AddClosedCallback(ClosedCallback callback);

/// <summary>
/// Closes the AMQP object. It waits until a response is received from the peer,
/// or throws TimeoutException after a default timeout.
Expand Down
35 changes: 35 additions & 0 deletions test/Common/ProtocolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,41 @@ public void ClosedEventOnTransportResetTest()
}).Unwrap().GetAwaiter().GetResult();
}

[TestMethod]
public void ClosedCallbackGuaranteeTest()
{
this.testListener.RegisterTarget(TestPoint.Open, (stream, channel, fields) =>
{
stream.Dispose();
return TestOutcome.Continue;
});

Trace.WriteLine(TraceLevel.Information, "sync test");
{
ManualResetEvent closed = new ManualResetEvent(false);
Connection connection = new Connection(this.address);
connection.AddClosedCallback((o, e) => closed.Set());
Assert.IsTrue(closed.WaitOne(5000), "closed event not fired");
Assert.AreEqual(ErrorCode.ConnectionForced, (string)connection.Error.Condition);
closed.Reset();
connection.AddClosedCallback((o, e) => closed.Set());
Assert.IsTrue(closed.WaitOne(5000), "closed event not fired again");
}

Trace.WriteLine(TraceLevel.Information, "async test");
Task.Factory.StartNew(async () =>
{
ManualResetEvent closed = new ManualResetEvent(false);
Connection connection = await Connection.Factory.CreateAsync(this.address);
connection.AddClosedCallback((o, e) => closed.Set());
Assert.IsTrue(closed.WaitOne(5000), "closed event not fired");
Assert.AreEqual(ErrorCode.ConnectionForced, (string)connection.Error.Condition);
closed.Reset();
connection.AddClosedCallback((o, e) => closed.Set());
Assert.IsTrue(closed.WaitOne(5000), "closed event not fired again");
}).Unwrap().GetAwaiter().GetResult();
}

[TestMethod]
public void SaslInvalidProtocolHeaderTest()
{
Expand Down

0 comments on commit d421001

Please sign in to comment.