Skip to content

Commit

Permalink
Merge branch 'release1.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
DamianEdwards committed Jun 1, 2013
2 parents c6b9962 + aa285dd commit 06a850b
Show file tree
Hide file tree
Showing 23 changed files with 343 additions and 93 deletions.
2 changes: 1 addition & 1 deletion build/Microsoft.AspNet.SignalR.versions.targets
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>1</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>

<!-- Change this to set the build quality of the project. Use values like "alpha", "beta", "rc1", "rtm", etc. -->
<!-- These values are used in SemVer, so make sure to always increase these alphabetically. -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*!
* ASP.NET SignalR JavaScript Library v1.1.1
* ASP.NET SignalR JavaScript Library v1.1.2
* http://signalr.net/
*
* Copyright Microsoft Open Technologies, Inc. All rights reserved.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* jquery.signalR.core.js */
/*global window:false */
/*!
* ASP.NET SignalR JavaScript Library v1.1.1
* ASP.NET SignalR JavaScript Library v1.1.2
* http://signalr.net/
*
* Copyright Microsoft Open Technologies, Inc. All rights reserved.
Expand Down Expand Up @@ -2119,5 +2119,5 @@
/*global window:false */
/// <reference path="jquery.signalR.core.js" />
(function ($) {
$.signalR.version = "1.1.1";
$.signalR.version = "1.1.2";
}(window.jQuery));

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* jquery.signalR.core.js */
/*global window:false */
/*!
* ASP.NET SignalR JavaScript Library v1.1.1
* ASP.NET SignalR JavaScript Library v1.1.2
* http://signalr.net/
*
* Copyright Microsoft Open Technologies, Inc. All rights reserved.
Expand Down Expand Up @@ -2119,5 +2119,5 @@
/*global window:false */
/// <reference path="jquery.signalR.core.js" />
(function ($) {
$.signalR.version = "1.1.1";
$.signalR.version = "1.1.2";
}(window.jQuery));

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/Common/CommonVersionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

[assembly: AssemblyVersion("1.1.0")]
[assembly: AssemblyFileVersion("1.1.0.0")]
[assembly: AssemblyInformationalVersion("1.1.1")]
[assembly: AssemblyInformationalVersion("1.1.2")]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*global window:false */
/*!
* ASP.NET SignalR JavaScript Library v1.1.1
* ASP.NET SignalR JavaScript Library v1.1.2
* http://signalr.net/
*
* Copyright Microsoft Open Technologies, Inc. All rights reserved.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Threading;

namespace Microsoft.AspNet.SignalR.Infrastructure
{

public static class InterlockedHelper
{
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1045:DoNotPassTypesByReference", MessageId = "0#", Justification="This is an interlocked helper...")]
public static bool CompareExchangeOr(ref int location, int value, int comparandA, int comparandB)
{
return Interlocked.CompareExchange(ref location, value, comparandA) == comparandA ||
Interlocked.CompareExchange(ref location, value, comparandB) == comparandB;
}
}

}
17 changes: 14 additions & 3 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/LocalEventKeyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,24 @@ namespace Microsoft.AspNet.SignalR.Messaging
{
public class LocalEventKeyInfo
{
public LocalEventKeyInfo(ulong id, MessageStore<Message> store)
private readonly WeakReference _storeReference;

public LocalEventKeyInfo(string key, ulong id, MessageStore<Message> store)
{
// Don't hold onto MessageStores that would otherwise be GC'd
_storeReference = new WeakReference(store);
Key = key;
Id = id;
MessageStore = store;
}

public string Key { get; private set; }
public ulong Id { get; private set; }
public MessageStore<Message> MessageStore { get; private set; }
public MessageStore<Message> MessageStore
{
get
{
return _storeReference.Target as MessageStore<Message>;
}
}
}
}
103 changes: 76 additions & 27 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ protected ulong Save(Message message)
throw new ArgumentNullException("message");
}

// Don't mark topics as active when publishing
// GetTopic will return a topic for the given key. If topic exists and is Dying,
// it will revive it and mark it as NoSubscriptions
Topic topic = GetTopic(message.Key);
// Mark the topic as used so it doesn't immediately expire (if it was in that state before).
topic.MarkUsed();

return topic.Store.Add(message);
}
Expand Down Expand Up @@ -229,7 +232,8 @@ public virtual IDisposable Subscribe(ISubscriber subscriber, string cursor, Func

foreach (var key in subscriber.EventKeys)
{
Topic topic = GetTopic(key);
// Create or retrieve topic and set it as HasSubscriptions
Topic topic = SubscribeTopic(key);

// Set the subscription for this topic
subscription.SetEventTopic(key, topic);
Expand Down Expand Up @@ -389,10 +393,8 @@ internal void GarbageCollectTopics()
for (int i = 0; i < overflow && i < candidates.Count; i++)
{
var pair = candidates[i];

// Mark it as dead
if (Interlocked.CompareExchange(ref pair.Value.State, TopicState.Dead, TopicState.NoSubscriptions)
== TopicState.NoSubscriptions)
// We only want to kill the topic if it's in the NoSubscriptions or Dying state.
if (InterlockedHelper.CompareExchangeOr(ref pair.Value.State, TopicState.Dead, TopicState.NoSubscriptions, TopicState.Dying))
{
// Kill it
DestroyTopicCore(pair.Key, pair.Value);
Expand All @@ -405,17 +407,20 @@ internal void GarbageCollectTopics()

private void DestroyTopic(string key, Topic topic)
{
var state = Interlocked.Exchange(ref topic.State, TopicState.Dead);

switch (state)
{
case TopicState.NoSubscriptions:
// The goal of this function is to destroy topics after 2 garbage collect cycles
// This first if statement will transition a topic into the dying state on the first GC cycle
// but it will prevent the code path from hitting the second if statement
if (Interlocked.CompareExchange(ref topic.State, TopicState.Dying, TopicState.NoSubscriptions) == TopicState.Dying)
{
// If we've hit this if statement we're on the second GC cycle with this soon to be
// destroyed topic. At this point we move the Topic State into the Dead state as
// long as it has not been revived from the dying state. We check if the state is
// still dying again to ensure that the topic has not been transitioned into a new
// state since we've decided to destroy it.
if (Interlocked.CompareExchange(ref topic.State, TopicState.Dead, TopicState.Dying) == TopicState.Dying)
{
DestroyTopicCore(key, topic);
break;
default:
// Restore the old state
Interlocked.Exchange(ref topic.State, state);
break;
}
}
}

Expand All @@ -436,42 +441,86 @@ private void DestroyTopicCore(string key, Topic topic)

internal Topic GetTopic(string key)
{
while (true)
Topic topic;
int oldState;

do
{
if (BeforeTopicCreated != null)
{
BeforeTopicCreated(key);
}

Topic topic = Topics.GetOrAdd(key, _createTopic);
topic = Topics.GetOrAdd(key, _createTopic);

if (BeforeTopicMarked != null)
{
BeforeTopicMarked(key, topic);
}

var oldState = Interlocked.Exchange(ref topic.State, TopicState.HasSubscriptions);
// If the topic was dying revive it to the NoSubscriptions state. This is used to ensure
// that in the scaleout case that even if we're publishing to a topic with no subscriptions
// that we keep it around in case a user hops nodes.
oldState = Interlocked.CompareExchange(ref topic.State, TopicState.NoSubscriptions, TopicState.Dying);

if (AfterTopicMarked != null)
{
AfterTopicMarked(key, topic, oldState);
AfterTopicMarked(key, topic, topic.State);
}

if (oldState != TopicState.Dead)
// If the topic is currently dead then we're racing with the DestroyTopicCore function, therefore
// loop around until we're able to create a new topic
} while (oldState == TopicState.Dead);

if (AfterTopicMarkedSuccessfully != null)
{
AfterTopicMarkedSuccessfully(key, topic);
}

return topic;
}

internal Topic SubscribeTopic(string key)
{
Topic topic;

do
{
if (BeforeTopicCreated != null)
{
if (AfterTopicMarkedSuccessfully != null)
{
AfterTopicMarkedSuccessfully(key, topic);
}
BeforeTopicCreated(key);
}

topic = Topics.GetOrAdd(key, _createTopic);

return topic;
if (BeforeTopicMarked != null)
{
BeforeTopicMarked(key, topic);
}

// Transition into the HasSubscriptions state as long as the topic is not dead
InterlockedHelper.CompareExchangeOr(ref topic.State, TopicState.HasSubscriptions, TopicState.NoSubscriptions, TopicState.Dying);

if (AfterTopicMarked != null)
{
AfterTopicMarked(key, topic, topic.State);
}

// If we were unable to transition into the HasSubscription state that means we're in the Dead state.
// Loop around until we're able to create the topic new
} while (topic.State != TopicState.HasSubscriptions);

if (AfterTopicMarkedSuccessfully != null)
{
AfterTopicMarkedSuccessfully(key, topic);
}

return topic;
}

private void AddEvent(ISubscriber subscriber, string eventKey)
{
Topic topic = GetTopic(eventKey);
Topic topic = SubscribeTopic(eventKey);

// Add or update the cursor (in case it already exists)
if (subscriber.Subscription.AddEvent(eventKey, topic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

using System;
using System.Collections.Generic;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Messaging
{
public class ScaleoutMapping
{
public ScaleoutMapping(ulong id, ScaleoutMessage message)
: this(id, message, new Dictionary<string, IList<LocalEventKeyInfo>>())
: this(id, message, ListHelper<LocalEventKeyInfo>.Empty)
{
}

public ScaleoutMapping(ulong id, ScaleoutMessage message, IDictionary<string, IList<LocalEventKeyInfo>> localKeyInfo)
public ScaleoutMapping(ulong id, ScaleoutMessage message, IList<LocalEventKeyInfo> localKeyInfo)
{
if (message == null)
{
Expand All @@ -30,7 +31,7 @@ public ScaleoutMapping(ulong id, ScaleoutMessage message, IDictionary<string, IL
}

public ulong Id { get; private set; }
public IDictionary<string, IList<LocalEventKeyInfo>> LocalKeyInfo { get; private set; }
public IList<LocalEventKeyInfo> LocalKeyInfo { get; private set; }
public DateTime ServerCreationTime { get; private set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public ScaleoutMappingStore()
_store = new ScaleoutStore(MaxMessages);
}

public void Add(ulong id, ScaleoutMessage message, IDictionary<string, IList<LocalEventKeyInfo>> localKeyInfo)
public void Add(ulong id, ScaleoutMessage message, IList<LocalEventKeyInfo> localKeyInfo)
{
if (MaxMapping != null && id < MaxMapping.Id)
{
Expand Down
15 changes: 5 additions & 10 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM

_trace.TraceInformation("OnReceived({0}, {1}, {2})", streamIndex, id, scaleoutMessage.Messages.Count);

var localMapping = new Dictionary<string, IList<LocalEventKeyInfo>>(StringComparer.OrdinalIgnoreCase);
var localMapping = new LocalEventKeyInfo[scaleoutMessage.Messages.Count];
var keys = new HashSet<string>();

for (var i = 0; i < scaleoutMessage.Messages.Count; ++i)
{
Expand All @@ -183,17 +184,11 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM
message.MappingId = id;
message.StreamIndex = streamIndex;

IList<LocalEventKeyInfo> keyInfo;
if (!localMapping.TryGetValue(message.Key, out keyInfo))
{
keyInfo = new List<LocalEventKeyInfo>();
localMapping.Add(message.Key, keyInfo);
}

keys.Add(message.Key);
ulong localId = Save(message);
MessageStore<Message> messageStore = Topics[message.Key].Store;

keyInfo.Add(new LocalEventKeyInfo(localId, messageStore));
localMapping[i] = new LocalEventKeyInfo(message.Key, localId, messageStore);
}

// Get the stream for this payload
Expand All @@ -203,7 +198,7 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM
store.Add(id, scaleoutMessage, localMapping);

// Schedule after we're done
foreach (var eventKey in localMapping.Keys)
foreach (var eventKey in keys)
{
ScheduleEvent(eventKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ private ulong ExtractMessages(int streamIndex, ScaleoutMapping mapping, IList<Ar
{
for (var i = 0; i < EventKeys.Count; ++i)
{
IList<LocalEventKeyInfo> infos;
if (mapping.LocalKeyInfo.TryGetValue(EventKeys[i], out infos))
string eventKey = EventKeys[i];

for (int j = 0; j < mapping.LocalKeyInfo.Count; j++)
{
for (int j = 0; j < infos.Count; j++)
{
LocalEventKeyInfo info = infos[j];
LocalEventKeyInfo info = mapping.LocalKeyInfo[j];

if (info.MessageStore != null && info.Key.Equals(eventKey, StringComparison.OrdinalIgnoreCase))
{
MessageStoreResult<Message> storeResult = info.MessageStore.GetMessages(info.Id, 1);

if (storeResult.Messages.Count > 0)
Expand Down
Loading

0 comments on commit 06a850b

Please sign in to comment.