Skip to content

Commit

Permalink
Implemented initial inbox processing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hikalkan committed Sep 9, 2021
1 parent 9593c87 commit 8af7ccd
Show file tree
Hide file tree
Showing 19 changed files with 470 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
Expand All @@ -11,11 +13,14 @@ public class DbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected IClock Clock { get; }

public DbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider)
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock)
{
DbContextProvider = dbContextProvider;
Clock = clock;
}

[UnitOfWork]
Expand All @@ -35,6 +40,7 @@ public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int max
var outgoingEventRecords = await dbContext
.IncomingEvents
.AsNoTracking()
.Where(x => !x.Processed)
.OrderBy(x => x.CreationTime)
.Take(maxCount)
.ToListAsync();
Expand All @@ -43,5 +49,16 @@ public virtual async Task<List<IncomingEventInfo>> GetWaitingEventsAsync(int max
.Select(x => x.ToIncomingEventInfo())
.ToList();
}

public async Task MarkAsProcessedAsync(Guid id)
{
//TODO: Optimize?
var dbContext = (IHasEventInbox) await DbContextProvider.GetDbContextAsync();
var incomingEvent = await dbContext.IncomingEvents.FindAsync(id);
if (incomingEvent != null)
{
incomingEvent.MarkAsProcessed(Clock.Now);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public virtual async Task<List<OutgoingEventInfo>> GetWaitingEventsAsync(int max
[UnitOfWork]
public virtual async Task DeleteAsync(Guid id)
{
//TODO: Optimize?
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var outgoingEvent = await dbContext.OutgoingEvents.FindAsync(id);
if (outgoingEvent != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class IncomingEventRecord :
public byte[] EventData { get; private set; }

public DateTime CreationTime { get; private set; }

public bool Processed { get; set; }

public DateTime? ProcessedTime { get; set; }

protected IncomingEventRecord()
{
Expand Down Expand Up @@ -48,5 +52,11 @@ public IncomingEventInfo ToIncomingEventInfo()
CreationTime
);
}

public void MarkAsProcessed(DateTime processedTime)
{
Processed = true;
ProcessedTime = processedTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class AbpEventBusBoxesModule : AbpModule
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.AddBackgroundWorker<OutboxSenderManager>();
context.AddBackgroundWorker<InboxProcessManager>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EventBus.Boxes
{
public interface IInboxProcessor
{
Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default);

Task StopAsync(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Volo.Abp.EventBus.Boxes
public interface IOutboxSender
{
Task StartAsync(OutboxConfig outboxConfig, CancellationToken cancellationToken = default);

Task StopAsync(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EventBus.Boxes
{
public class InboxProcessManager : IBackgroundWorker
{
protected AbpDistributedEventBusOptions Options { get; }
protected IServiceProvider ServiceProvider { get; }
protected List<IInboxProcessor> Processors { get; }

public InboxProcessManager(
IOptions<AbpDistributedEventBusOptions> options,
IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
Options = options.Value;
Processors = new List<IInboxProcessor>();
}

public async Task StartAsync(CancellationToken cancellationToken = default)
{
foreach (var inboxConfig in Options.Inboxes.Values)
{
if (inboxConfig.IsProcessingEnabled)
{
var processor = ServiceProvider.GetRequiredService<IInboxProcessor>();
await processor.StartAsync(inboxConfig, cancellationToken);
Processors.Add(processor);
}
}
}

public async Task StopAsync(CancellationToken cancellationToken = default)
{
foreach (var processor in Processors)
{
await processor.StopAsync(cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Medallion.Threading;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Threading;
using Volo.Abp.Uow;

namespace Volo.Abp.EventBus.Boxes
{
public class InboxProcessor : IInboxProcessor, ITransientDependency
{
protected IServiceProvider ServiceProvider { get; }
protected AbpTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IDistributedLockProvider DistributedLockProvider { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IEventInbox Inbox { get; private set; }
protected InboxConfig InboxConfig { get; private set; }

protected string DistributedLockName => "Inbox_" + InboxConfig.Name;
public ILogger<InboxProcessor> Logger { get; set; }

public InboxProcessor(
IServiceProvider serviceProvider,
AbpTimer timer,
IDistributedEventBus distributedEventBus,
IDistributedLockProvider distributedLockProvider,
IUnitOfWorkManager unitOfWorkManager)
{
ServiceProvider = serviceProvider;
Timer = timer;
DistributedEventBus = distributedEventBus;
DistributedLockProvider = distributedLockProvider;
UnitOfWorkManager = unitOfWorkManager;
Timer.Period = 2000; //TODO: Config?
Timer.Elapsed += TimerOnElapsed;
Logger = NullLogger<InboxProcessor>.Instance;
}

private void TimerOnElapsed(object sender, EventArgs e)
{
AsyncHelper.RunSync(RunAsync);
}

public Task StartAsync(InboxConfig inboxConfig, CancellationToken cancellationToken = default)
{
InboxConfig = inboxConfig;
Inbox = (IEventInbox)ServiceProvider.GetRequiredService(inboxConfig.ImplementationType);
Timer.Start(cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
return Task.CompletedTask;
}

protected virtual async Task RunAsync()
{
await using (var handle = await DistributedLockProvider.TryAcquireLockAsync(DistributedLockName))
{
if (handle != null)
{
Logger.LogDebug("Obtained the distributed lock: " + DistributedLockName);

while (true)
{
var waitingEvents = await Inbox.GetWaitingEventsAsync(1000); //TODO: Config?
if (waitingEvents.Count <= 0)
{
break;
}

Logger.LogInformation($"Found {waitingEvents.Count} events in the inbox.");

foreach (var waitingEvent in waitingEvents)
{
using (var uow = UnitOfWorkManager.Begin(isTransactional: true, requiresNew: true))
{
await DistributedEventBus
.AsRawEventPublisher()
.ProcessRawAsync(waitingEvent.EventName, waitingEvent.EventData);

/*
await DistributedEventBus
.AsRawEventPublisher()
.PublishRawAsync(waitingEvent.Id, waitingEvent.EventName, waitingEvent.EventData);
*/
await Inbox.MarkAsProcessedAsync(waitingEvent.Id);
await uow.CompleteAsync();
}

Logger.LogInformation($"Processed the incoming event with id = {waitingEvent.Id:N}");
}
}
}
else
{
Logger.LogDebug("Could not obtain the distributed lock: " + DistributedLockName);
await Task.Delay(7000); //TODO: Can we pass a cancellation token to cancel on shutdown? (Config?)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
if (outboxConfig.IsSendingEnabled)
{
var sender = ServiceProvider.GetRequiredService<IOutboxSender>();
await sender.StartAsync(outboxConfig);
await sender.StartAsync(outboxConfig, cancellationToken);
Senders.Add(sender);
}
}
Expand All @@ -41,7 +41,7 @@ public async Task StopAsync(CancellationToken cancellationToken = default)
{
foreach (var sender in Senders)
{
await sender.StopAsync();
await sender.StopAsync(cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ public override Task PublishRawAsync(
);
}

public override async Task ProcessRawAsync(string eventName, byte[] eventDataBytes)
{
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}

var eventData = Serializer.Deserialize(eventDataBytes, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ await TriggerHandlersAsync(eventType, eventData, errorContext =>
retryAttempt = (int)ea.BasicProperties.Headers[EventErrorHandlerBase.RetryAttemptKey];
}

errorContext.EventData = Serializer.Deserialize(ea.Body.ToArray(), eventType);
errorContext.EventData = Serializer.Deserialize(eventBytes, eventType);
errorContext.SetProperty(EventErrorHandlerBase.HeadersKey, ea.BasicProperties);
errorContext.SetProperty(EventErrorHandlerBase.RetryAttemptKey, retryAttempt);
});
Expand Down Expand Up @@ -217,6 +217,25 @@ public override Task PublishRawAsync(Guid eventId, string eventName, byte[] even
return PublishAsync(eventName, eventData, null, eventId: eventId);
}

public override async Task ProcessRawAsync(string eventName, byte[] eventDataBytes)
{
//TODO: We have a duplication in logic and also with the kafka side!

var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}

var eventData = Serializer.Deserialize(eventDataBytes, eventType);
var exceptions = new List<Exception>();
await TriggerHandlersAsync(eventType, eventData, exceptions);
if (exceptions.Any())
{
ThrowOriginalExceptions(eventType, exceptions);
}
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public override Task PublishRawAsync(Guid eventId, string eventName, byte[] even
throw new NotImplementedException();
}

public override Task ProcessRawAsync(string eventName, byte[] eventDataBytes)
{
/* TODO: IMPLEMENT! */
throw new NotImplementedException();
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public async Task PublishAsync(
}

public abstract Task PublishRawAsync(Guid eventId, string eventName, byte[] eventData);
public abstract Task ProcessRawAsync(string eventName, byte[] eventDataBytes);

private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{
Expand Down Expand Up @@ -128,6 +129,7 @@ protected async Task<bool> AddToInboxAsync(
if (inboxConfig.EventSelector == null || inboxConfig.EventSelector(eventType))
{
var eventInbox = (IEventInbox) scope.ServiceProvider.GetRequiredService(inboxConfig.ImplementationType);
//TODO: Check if event was received before!!
await eventInbox.EnqueueAsync(
new IncomingEventInfo(
GuidGenerator.Create(),
Expand Down
Loading

0 comments on commit 8af7ccd

Please sign in to comment.