Skip to content

Commit

Permalink
fix avg notification per message for Pubsub (#656)
Browse files Browse the repository at this point in the history
* fix avg notification per message for Pubsub

Updated the calculation of average Notifications/Message to handle Pub/Sub correctly.

* Implement Review comment and adopt for non batch mode

* changed type from int to ulong for counter of notifications
* adopted calculation also for
  * Uadp in batch mode
  * Json in sample mode
  * Uadp in batch mode

Co-authored-by: Alexander Köpke <[email protected]>
  • Loading branch information
koepalex and Alexander Köpke authored Aug 7, 2020
1 parent 000f22b commit ef1fe72
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsJson(
var messageSize = 2; // array brackets
maxMessageSize -= 2048; // reserve 2k for header
var chunk = new Collection<NetworkMessage>();
ulong notificationsPerMessage = 0;
while (processing) {
var notification = current.Current;
var messageCompleted = false;
Expand All @@ -115,6 +116,7 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsJson(
if (!messageCompleted) {
NotificationsProcessedCount++;
chunk.Add(notification);
notificationsPerMessage += (ulong)notification.Messages.Sum(m => m.Payload.Count);
processing = current.MoveNext();
messageSize += notificationSize + (processing ? 1 : 0);
}
Expand Down Expand Up @@ -142,10 +144,11 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsJson(
AvgMessageSize = (AvgMessageSize * MessagesProcessedCount + encoded.Body.Length) /
(MessagesProcessedCount + 1);
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount +
chunk.Count) / (MessagesProcessedCount + 1);
notificationsPerMessage) / (MessagesProcessedCount + 1);
MessagesProcessedCount++;
chunk.Clear();
messageSize = 2;
notificationsPerMessage = 0;
yield return encoded;
}
}
Expand Down Expand Up @@ -173,6 +176,7 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsUadp(
var messageSize = 4; // array length size
maxMessageSize -= 2048; // reserve 2k for header
var chunk = new Collection<NetworkMessage>();
ulong notificationsPerMessage = 0;
while (processing) {
var notification = current.Current;
var messageCompleted = false;
Expand All @@ -191,6 +195,7 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsUadp(

if (!messageCompleted) {
chunk.Add(notification);
notificationsPerMessage += (ulong)notification.Messages.Sum(m => m.Payload.Count);
NotificationsProcessedCount++;
processing = current.MoveNext();
messageSize += notificationSize;
Expand All @@ -210,10 +215,11 @@ private IEnumerable<NetworkMessageModel> EncodeBatchAsUadp(
AvgMessageSize = (AvgMessageSize * MessagesProcessedCount + encoded.Body.Length) /
(MessagesProcessedCount + 1);
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount +
chunk.Count) / (MessagesProcessedCount + 1);
notificationsPerMessage) / (MessagesProcessedCount + 1);
MessagesProcessedCount++;
chunk.Clear();
messageSize = 4;
notificationsPerMessage = 0;
yield return encoded;
}
}
Expand All @@ -237,6 +243,7 @@ private IEnumerable<NetworkMessageModel> EncodeAsJson(
yield break;
}
foreach (var networkMessage in notifications) {
ulong notificationsPerMessage = (ulong)networkMessage.Messages.Sum(m => m.Payload.Count);
var writer = new StringWriter();
var encoder = new JsonEncoderEx(writer, encodingContext) {
UseAdvancedEncoding = true,
Expand All @@ -261,7 +268,7 @@ private IEnumerable<NetworkMessageModel> EncodeAsJson(
NotificationsProcessedCount++;
AvgMessageSize = (AvgMessageSize * MessagesProcessedCount + encoded.Body.Length) /
(MessagesProcessedCount + 1);
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount + 1) /
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount + notificationsPerMessage) /
(MessagesProcessedCount + 1);
MessagesProcessedCount++;
yield return encoded;
Expand All @@ -285,7 +292,9 @@ private IEnumerable<NetworkMessageModel> EncodeAsUadp(
if (notifications.Count() == 0) {
yield break;
}

foreach (var networkMessage in notifications) {
ulong notificationsPerMessage = (ulong)networkMessage.Messages.Sum(m => m.Payload.Count);
var encoder = new BinaryEncoder(encodingContext);
encoder.WriteBoolean(null, false); // is not Batch
encoder.WriteEncodeable(null, networkMessage);
Expand All @@ -305,7 +314,7 @@ private IEnumerable<NetworkMessageModel> EncodeAsUadp(
NotificationsProcessedCount++;
AvgMessageSize = (AvgMessageSize * MessagesProcessedCount + encoded.Body.Length) /
(MessagesProcessedCount + 1);
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount + 1) /
AvgNotificationsPerMessage = (AvgNotificationsPerMessage * MessagesProcessedCount + notificationsPerMessage) /
(MessagesProcessedCount + 1);
MessagesProcessedCount++;
yield return encoded;
Expand Down

0 comments on commit ef1fe72

Please sign in to comment.