Skip to content

Commit

Permalink
Use ConnectionFactory, if available, to connect for connections creat…
Browse files Browse the repository at this point in the history
…ed by constructors. [Azure#264]
  • Loading branch information
xinchen10 committed Jan 27, 2022
1 parent f26b175 commit d643b00
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 95 deletions.
4 changes: 4 additions & 0 deletions docs/articles/building_application.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ undesired behavior, e.g. UI freeze if it is called from a UI thread. In these ca
should use the `ConnectionFactory` class to perform asynchronous non-blocking creation of
a connection.

On platforms where `ConnectionFactory` is supported, the default factory (`Connection.Factory`) is
used for all connections created by the constructors. Therefore, any change made to the settings of
the default factory is applied to connections created afterwards.

### ConnectionFactory

`ConnectionFactory` provides asynchronous connection creation, and it also gives more control
Expand Down
173 changes: 97 additions & 76 deletions src/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public partial class Connection : AmqpObject

internal const uint DefaultMaxFrameSize = 256 * 1024;
internal const ushort DefaultMaxSessions = 256;
internal const int DefaultMaxLinksPerSession = 64;
internal const int DefaultMaxLinksPerSession = 1024;
internal static int HeartBeatCloseTimeout = 20 * 1000;
readonly Address address;
readonly OnOpened onOpened;
Expand All @@ -120,7 +120,6 @@ public partial class Connection : AmqpObject
uint maxFrameSize;
uint remoteMaxFrameSize;
ITransport writer;
Pump reader;
HeartBeat heartBeat;

Connection(Address address, ushort channelMax, uint maxFrameSize)
Expand All @@ -142,7 +141,7 @@ public partial class Connection : AmqpObject
/// The connection initialization includes establishing the underlying transport,
/// which typically has blocking network I/O. Depending on the current synchronization
/// context, it may cause deadlock or UI freeze. Please use the ConnectionFactory.CreateAsync
/// method instead.
/// method instead if it is available for the platform.
/// </remarks>
public Connection(Address address)
: this(address, null)
Expand All @@ -154,6 +153,12 @@ public Connection(Address address)
/// </summary>
/// <param name="address">The address.</param>
/// <param name="handler">The protocol handler.</param>
/// <remarks>
/// The connection initialization includes establishing the underlying transport,
/// which typically has blocking network I/O. Depending on the current synchronization
/// context, it may cause deadlock or UI freeze. Please use the ConnectionFactory.CreateAsync
/// method instead if it is available for the platform.
/// </remarks>
public Connection(Address address, IHandler handler)
: this(address, DefaultMaxSessions, DefaultMaxFrameSize)
{
Expand All @@ -174,7 +179,7 @@ public Connection(Address address, IHandler handler)
/// The connection initialization includes establishing the underlying transport,
/// which typically has blocking network I/O. Depending on the current synchronization
/// context, it may cause deadlock or UI freeze. Please use the ConnectionFactory.CreateAsync
/// method instead.
/// method instead if it is available for the platform.
/// </remarks>
public Connection(Address address, SaslProfile saslProfile, Open open, OnOpened onOpened)
: this(address, DefaultMaxSessions, DefaultMaxFrameSize)
Expand Down Expand Up @@ -209,14 +214,21 @@ static uint GetMaxFrameSize(AmqpSettings amqpSettings, Open open)

internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Address address,
IAsyncTransport transport, Open open, OnOpened onOpened, IHandler handler)
: this(address, GetChannelMax(amqpSettings, open), GetMaxFrameSize(amqpSettings, open))
: this(address, DefaultMaxSessions, DefaultMaxFrameSize)
{
this.onOpened = onOpened;
this.handler = handler;
this.Init(bufferManager, amqpSettings, transport, open);
}

internal void Init(IBufferManager bufferManager, AmqpSettings amqpSettings, IAsyncTransport transport, Open open)
{
transport.SetConnection(this);

this.handler = handler;
this.BufferManager = bufferManager;
this.channelMax = GetChannelMax(amqpSettings, open);
this.maxFrameSize = GetMaxFrameSize(amqpSettings, open);
this.MaxLinksPerSession = amqpSettings.MaxLinksPerSession;
this.onOpened = onOpened;
this.writer = new TransportWriter(transport, this.OnIoException);

// after getting the transport, move state to open pipe before starting the pump
Expand All @@ -242,12 +254,19 @@ internal Connection(IBufferManager bufferManager, AmqpSettings amqpSettings, Add
this.state = ConnectionState.OpenPipe;
}

static ConnectionFactory connectionFactory;

/// <summary>
/// Gets a factory with default settings.
/// Gets a factory with default settings. Any changes to the settings are
/// applied to the connections created from this factory instance.
/// </summary>
/// <remarks>
/// On platforms where <seealso cref="ConnectionFactory"/> is supported, this
/// is also used for connections initialized by the constructors.
/// </remarks>
public static ConnectionFactory Factory
{
get { return new ConnectionFactory(); }
get { return connectionFactory ?? (connectionFactory = new ConnectionFactory()); }
}

/// <summary>
Expand Down Expand Up @@ -280,8 +299,14 @@ ByteBuffer WrapBuffer(ByteBuffer buffer, int offset, int length)
{
return new WrappedByteBuffer(buffer, offset, length);
}

void Connect(SaslProfile saslProfile, Open open)
{
Factory.ConnectAsync(this.address, saslProfile, open, this).ConfigureAwait(false).GetAwaiter().GetResult();
}
#else
internal int MaxLinksPerSession = DefaultMaxLinksPerSession;
Pump reader;

ByteBuffer AllocateBuffer(int size)
{
Expand All @@ -292,8 +317,71 @@ ByteBuffer WrapBuffer(ByteBuffer buffer, int offset, int length)
{
return new ByteBuffer(buffer.Buffer, offset, length, length);
}

void Connect(SaslProfile saslProfile, Open open)
{
if (open != null)
{
this.maxFrameSize = open.MaxFrameSize;
this.channelMax = open.ChannelMax;
}
else
{
open = new Open()
{
ContainerId = MakeAmqpContainerId(),
HostName = this.address.Host,
MaxFrameSize = this.maxFrameSize,
ChannelMax = this.channelMax
};
}

if (open.IdleTimeOut > 0)
{
this.heartBeat = new HeartBeat(this, open.IdleTimeOut * 2);
}

ITransport transport;
{
TcpTransport tcpTransport = new TcpTransport();
tcpTransport.Connect(this, this.address, DisableServerCertValidation);
transport = tcpTransport;
}

try
{
if (saslProfile != null)
{
transport = saslProfile.Open(this.address.Host, transport);
}
else if (this.address.User != null)
{
transport = new SaslPlainProfile(this.address.User, this.address.Password).Open(this.address.Host, transport);
}
}
catch
{
transport.Close();
throw;
}

this.writer = new Writer(transport);

// after getting the transport, move state to open pipe before starting the pump
this.SendHeader();
this.SendOpen(open);
this.state = ConnectionState.OpenPipe;

this.reader = new Pump(this, transport);
this.reader.Start();
}
#endif

internal static string MakeAmqpContainerId()
{
return "AMQPNetLite-" + Guid.NewGuid().ToString().Substring(0, 8);
}

internal ushort AddSession(Session session)
{
this.ThrowIfClosed("AddSession");
Expand Down Expand Up @@ -423,73 +511,6 @@ protected override bool OnClose(Error error)
}
}

void Connect(SaslProfile saslProfile, Open open)
{
if (open != null)
{
this.maxFrameSize = open.MaxFrameSize;
this.channelMax = open.ChannelMax;
}
else
{
open = new Open()
{
ContainerId = Guid.NewGuid().ToString(),
HostName = this.address.Host,
MaxFrameSize = this.maxFrameSize,
ChannelMax = this.channelMax
};
}

if (open.IdleTimeOut > 0)
{
this.heartBeat = new HeartBeat(this, open.IdleTimeOut * 2);
}

ITransport transport;
#if NETFX
if (WebSocketTransport.MatchScheme(address.Scheme))
{
WebSocketTransport wsTransport = new WebSocketTransport();
wsTransport.ConnectAsync(address, null).ConfigureAwait(false).GetAwaiter().GetResult();
transport = wsTransport;
}
else
#endif
{
TcpTransport tcpTransport = new TcpTransport();
tcpTransport.Connect(this, this.address, DisableServerCertValidation);
transport = tcpTransport;
}

try
{
if (saslProfile != null)
{
transport = saslProfile.Open(this.address.Host, transport);
}
else if (this.address.User != null)
{
transport = new SaslPlainProfile(this.address.User, this.address.Password).Open(this.address.Host, transport);
}
}
catch
{
transport.Close();
throw;
}

this.writer = new Writer(transport);

// after getting the transport, move state to open pipe before starting the pump
this.SendHeader();
this.SendOpen(open);
this.state = ConnectionState.OpenPipe;

this.reader = new Pump(this, transport);
this.reader.Start();
}

void ThrowIfClosed(string operation)
{
if (this.state >= ConnectionState.CloseSent)
Expand Down
56 changes: 44 additions & 12 deletions src/Net/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,28 @@ public Task<Connection> CreateAsync(Address address, Open open = null, OnOpened
return this.CreateAsync(address, open, onOpened, null);
}

async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened, IHandler handler)
internal async Task ConnectAsync(Address address, SaslProfile saslProfile, Open open, Connection connection)
{
if (saslProfile == null)
{
if (address.User != null)
{
saslProfile = new SaslPlainProfile(address.User, address.Password);
}
else if (this.saslSettings != null && this.saslSettings.Profile != null)
{
saslProfile = this.saslSettings.Profile;
}
}

IAsyncTransport transport = await this.CreateTransportAsync(address, saslProfile, connection.Handler).ConfigureAwait(false);
connection.Init(this.BufferManager, this.AMQP, transport, open);

AsyncPump pump = new AsyncPump(this.BufferManager, transport);
pump.Start(connection);
}

async Task<IAsyncTransport> CreateTransportAsync(Address address, SaslProfile saslProfile, IHandler handler)
{
IAsyncTransport transport;
TransportProvider provider;
Expand All @@ -146,27 +167,38 @@ async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened
throw new NotSupportedException(address.Scheme);
}

try

if (saslProfile != null)
{
if (address.User != null)
try
{
SaslPlainProfile profile = new SaslPlainProfile(address.User, address.Password);
transport = await profile.OpenAsync(address.Host, this.BufferManager, transport, null).ConfigureAwait(false);
transport = await saslProfile.OpenAsync(address.Host, this.BufferManager, transport, null).ConfigureAwait(false);
}
else if (this.saslSettings != null && this.saslSettings.Profile != null)
catch
{
transport = await this.saslSettings.Profile.OpenAsync(address.Host, this.BufferManager, transport, null).ConfigureAwait(false);
transport.Close();
throw;
}
}
catch

return transport;
}

async Task<Connection> CreateAsync(Address address, Open open, OnOpened onOpened, IHandler handler)
{
SaslProfile saslProfile = null;
if (address.User != null)
{
saslProfile = new SaslPlainProfile(address.User, address.Password);
}
else if (this.saslSettings != null && this.saslSettings.Profile != null)
{
transport.Close();
throw;
saslProfile = this.saslSettings.Profile;
}

AsyncPump pump = new AsyncPump(this.BufferManager, transport);
IAsyncTransport transport = await this.CreateTransportAsync(address, saslProfile, handler).ConfigureAwait(false);
Connection connection = new Connection(this.BufferManager, this.AMQP, address, transport, open, onOpened, handler);

AsyncPump pump = new AsyncPump(this.BufferManager, transport);
pump.Start(connection);

return connection;
Expand Down
2 changes: 1 addition & 1 deletion src/Net/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected ConnectionFactoryBase()
this.amqpSettings = new AmqpSettings()
{
MaxFrameSize = (int)Connection.DefaultMaxFrameSize,
ContainerId = "AMQPNetLite-" + Guid.NewGuid().ToString("N").Substring(0, 8),
ContainerId = Connection.MakeAmqpContainerId(),
IdleTimeout = int.MaxValue,
MaxSessionsPerConnection = Connection.DefaultMaxSessions,
MaxLinksPerSession = Connection.DefaultMaxLinksPerSession
Expand Down
2 changes: 1 addition & 1 deletion src/Net/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public async Task ConnectAsync(Address address, ConnectionFactory factory, IHand
IAsyncTransport transport;
if (address.UseSsl)
{
RemoteCertificateValidationCallback remoteCertificateValidationCallback = null;
RemoteCertificateValidationCallback remoteCertificateValidationCallback = Connection.DisableServerCertValidation ? noneCertValidator : null;
LocalCertificateSelectionCallback localCertificateSelectionCallback = null;
var ssl = factory.SslInternal;
if (ssl != null)
Expand Down
Loading

0 comments on commit d643b00

Please sign in to comment.