Skip to content
This repository has been archived by the owner on Jan 10, 2025. It is now read-only.

Commit

Permalink
Changes on transport layer
Browse files Browse the repository at this point in the history
- Removed dependency over ReactiveSockets
- Added TcpChannel as an adapter of TcpClient to replace ReactiveSocket
- Applied Disposable pattern over all the Channel implementations
  • Loading branch information
mauroa committed Jan 8, 2015
1 parent c3fd9b1 commit 56cb80b
Show file tree
Hide file tree
Showing 28 changed files with 410 additions and 356 deletions.
6 changes: 3 additions & 3 deletions src/Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ public class Client : IClient
readonly IRepository<PacketIdentifier> packetIdentifierRepository;
readonly ProtocolConfiguration configuration;

public Client(IBufferedChannel<byte> socket,
public Client(IChannel<byte[]> binaryChannel,
IPacketChannelFactory channelFactory,
IPacketChannelAdapter channelAdapter,
IProtocolFlowProvider flowProvider,
IRepositoryProvider repositoryProvider,
ProtocolConfiguration configuration)
{
var channel = channelFactory.CreateChannel (socket);
var channel = channelFactory.Create (binaryChannel);

this.protocolChannel = channelAdapter.Adapt (channel);
this.flowProvider = flowProvider;
Expand Down Expand Up @@ -196,7 +196,7 @@ private void CloseChannel ()
{
this.IsConnected = false;
this.Id = null;
this.protocolChannel.Close ();
this.protocolChannel.Dispose ();
}
}
}
4 changes: 2 additions & 2 deletions src/Client/Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
<Compile Include="..\Server\ProtocolChannel.cs">
<Link>ProtocolChannel.cs</Link>
</Compile>
<Compile Include="..\Server\ReactiveSocketChannel.cs">
<Link>ReactiveSocketChannel.cs</Link>
<Compile Include="..\Server\TcpChannel.cs">
<Link>TcpChannel.cs</Link>
</Compile>
<Compile Include="ApplicationMessage.cs" />
<Compile Include="Client.cs" />
Expand Down
9 changes: 9 additions & 0 deletions src/Client/Properties/Resources.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/Client/Properties/Resources.resx
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,7 @@
<data name="Client_ConnectionTimeout" xml:space="preserve">
<value>A timeout occured while waiting for the client connection confirmation</value>
</data>
<data name="TcpChannel_ClientMustBeConnected" xml:space="preserve">
<value>Client must be connected</value>
</data>
</root>
121 changes: 0 additions & 121 deletions src/Portable/BinaryChannel.cs

This file was deleted.

16 changes: 0 additions & 16 deletions src/Portable/IBufferedChannel.cs

This file was deleted.

4 changes: 1 addition & 3 deletions src/Portable/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Hermes
{
public interface IChannel<T>
public interface IChannel<T> : IDisposable
{
bool IsConnected { get; }

Expand All @@ -12,7 +12,5 @@ public interface IChannel<T>
IObservable<T> Sender { get; }

Task SendAsync(T message);

void Close();
}
}
7 changes: 7 additions & 0 deletions src/Portable/IPacketBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Hermes
{
public interface IPacketBuffer
{
bool TryGetPacket (byte[] sequence, out byte[] packet);
}
}
2 changes: 1 addition & 1 deletion src/Portable/IPacketChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace Hermes
{
public interface IPacketChannelFactory
{
IChannel<IPacket> CreateChannel (IBufferedChannel<byte> socket);
IChannel<IPacket> Create (IChannel<byte[]> binaryChannel);
}
}
99 changes: 99 additions & 0 deletions src/Portable/PacketBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System.Collections.Generic;
using System.Linq;

namespace Hermes
{
public class PacketBuffer : IPacketBuffer
{
bool readStarted;
bool remainingLengthRead;
int remainingLength = 0;
bool packetReady;

readonly IList<byte> mainBuffer;
readonly IList<byte> pendingBuffer;

public PacketBuffer ()
{
this.mainBuffer = new List<byte> ();
this.pendingBuffer = new List<byte> ();
}

public bool TryGetPacket (byte[] sequence, out byte[] packet)
{
this.Buffer (sequence);

if (this.packetReady) {
packet = this.mainBuffer.ToArray ();
this.Reset ();
} else {
packet = default (byte[]);
}

return packet != default(byte[]);
}

private void Buffer(byte[] sequence)
{
foreach(var @byte in sequence) {
this.Buffer (@byte);
}
}

private void Buffer (byte @byte)
{
if (this.packetReady) {
this.pendingBuffer.Add (@byte);
return;
}

this.mainBuffer.Add(@byte);

if (!this.readStarted)
{
this.readStarted = true;
return;
}

if (!this.remainingLengthRead)
{
if ((@byte & 128) == 0) {
var bytesLenght = default (int);

this.remainingLength = Protocol.Encoding.DecodeRemainingLength(mainBuffer.ToArray(), out bytesLenght);
this.remainingLengthRead = true;

if (remainingLength == 0)
this.packetReady = true;
}

return;
}

if (remainingLength == 1)
{
this.packetReady = true;
}
else
{
remainingLength--;
}
}

private void Reset()
{
this.mainBuffer.Clear();
this.readStarted = false;
this.remainingLengthRead = false;
this.remainingLength = 0;
this.packetReady = false;

if (this.pendingBuffer.Any ()) {
var pendingSequence = this.pendingBuffer.ToArray ();

this.pendingBuffer.Clear ();
this.Buffer (pendingSequence);
}
}
}
}
26 changes: 21 additions & 5 deletions src/Portable/PacketChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Hermes
{
public class PacketChannel : IChannel<IPacket>
{
bool disposed;

readonly IChannel<byte[]> innerChannel;
readonly IPacketManager manager;
readonly Subject<IPacket> receiver;
Expand Down Expand Up @@ -39,6 +41,9 @@ public PacketChannel (IChannel<byte[]> innerChannel, IPacketManager manager)

public async Task SendAsync (IPacket packet)
{
if (this.disposed)
throw new ObjectDisposedException (this.GetType ().FullName);

try {
var bytes = await this.manager.GetBytesAsync (packet);

Expand All @@ -50,12 +55,23 @@ public async Task SendAsync (IPacket packet)
}
}

public void Close ()
public void Dispose ()
{
this.subscription.Dispose ();
this.innerChannel.Close ();
this.receiver.OnCompleted ();
this.sender.OnCompleted ();
this.Dispose (true);
GC.SuppressFinalize (this);
}

protected virtual void Dispose (bool disposing)
{
if (this.disposed) return;

if (disposing) {
this.subscription.Dispose ();
this.innerChannel.Dispose ();
this.receiver.OnCompleted ();
this.sender.OnCompleted ();
this.disposed = true;
}
}
}
}
4 changes: 1 addition & 3 deletions src/Portable/PacketChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ public PacketChannelFactory (ITopicEvaluator topicEvaluator)
this.topicEvaluator = topicEvaluator;
}

public IChannel<IPacket> CreateChannel (IBufferedChannel<byte> socket)
public IChannel<IPacket> Create (IChannel<byte[]> binaryChannel)
{
var binaryChannel = new BinaryChannel (socket);

var formatters = this.GetFormatters();
var manager = new PacketManager (formatters);

Expand Down
Loading

0 comments on commit 56cb80b

Please sign in to comment.