Skip to content

Commit

Permalink
save to outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
hikalkan committed Sep 8, 2021
1 parent 5f50d33 commit d1ea473
Show file tree
Hide file tree
Showing 26 changed files with 371 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private void PublishEntityEvents(EntityEventReport changeReport)
foreach (var distributedEvent in changeReport.DistributedEvents)
{
UnitOfWorkManager.Current?.AddOrReplaceDistributedEvent(
new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder, useOutbox: true)
new UnitOfWorkEventRecord(distributedEvent.EventData.GetType(), distributedEvent.EventData, distributedEvent.EventOrder)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.DependencyInjection.Extensions;
using Volo.Abp.Domain;
using Volo.Abp.EntityFrameworkCore.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Modularity;
using Volo.Abp.Uow.EntityFrameworkCore;

Expand All @@ -26,6 +27,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
});

context.Services.TryAddTransient(typeof(IDbContextProvider<>), typeof(UnitOfWorkDbContextProvider<>));
context.Services.AddTransient(typeof(DbContextEventOutbox<>));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Threading.Tasks;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Guids;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class DbContextEventOutbox<TDbContext> : IEventOutbox
where TDbContext : IHasEventOutbox
{
protected IDbContextProvider<TDbContext> DbContextProvider { get; }
protected IGuidGenerator GuidGenerator { get; }

public DbContextEventOutbox(
IDbContextProvider<TDbContext> dbContextProvider,
IGuidGenerator guidGenerator)
{
DbContextProvider = dbContextProvider;
GuidGenerator = guidGenerator;
}

public async Task EnqueueAsync(string eventName, byte[] eventData)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
dbContext.OutgoingEventRecords.Add(
new OutgoingEventRecord(GuidGenerator.Create(), eventName, eventData)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IHasEventOutbox
public interface IHasEventOutbox : IEfCoreDbContext
{
DbSet<OutgoingEventRecord> OutgoingEventRecords { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ protected OutgoingEventRecord()
this.SetDefaultsForExtraProperties();
}

public OutgoingEventRecord(Guid id)
public OutgoingEventRecord(Guid id, string eventName, byte[] eventData)
: base(id)
{
EventName = eventName;
EventData = eventData;

ExtraProperties = new ExtraPropertyDictionary();
this.SetDefaultsForExtraProperties();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventR
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
}

public virtual async Task PublishAsync(Type eventType, object eventData, Headers headers, Dictionary<string, object> headersArguments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ protected override void AddToUnitOfWork(IUnitOfWork unitOfWork, UnitOfWorkEventR
{
unitOfWork.AddOrReplaceDistributedEvent(eventRecord);
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
}

public Task PublishAsync(Type eventType, object eventData, IBasicProperties properties, Dictionary<string, object> headersArguments = null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;

namespace Volo.Abp.EventBus.Rebus
{
public interface IRebusSerializer
{
byte[] Serialize(object obj);

object Deserialize(byte[] value, Type type);

T Deserialize<T>(byte[] value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Volo.Abp.EventBus.Rebus
public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDependency
{
protected IBus Rebus { get; }
protected IRebusSerializer Serializer { get; }

//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
Expand All @@ -32,7 +33,8 @@ public RebusDistributedEventBus(
IBus rebus,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IOptions<AbpRebusEventBusOptions> abpEventBusRebusOptions,
IEventErrorHandler errorHandler) :
IEventErrorHandler errorHandler,
IRebusSerializer serializer) :
base(
serviceScopeFactory,
currentTenant,
Expand All @@ -41,6 +43,7 @@ public RebusDistributedEventBus(
abpDistributedEventBusOptions)
{
Rebus = rebus;
Serializer = serializer;
AbpRebusEventBusOptions = abpEventBusRebusOptions.Value;

HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
Expand Down Expand Up @@ -178,5 +181,10 @@ private static bool ShouldTriggerEventForHandler(Type targetEventType, Type hand

return false;
}

protected override byte[] Serialize(object eventData)
{
return Serializer.Serialize(eventData);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Text;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Json;

namespace Volo.Abp.EventBus.Rebus
{
public class Utf8JsonRebusSerializer : IRebusSerializer, ITransientDependency
{
private readonly IJsonSerializer _jsonSerializer;

public Utf8JsonRebusSerializer(IJsonSerializer jsonSerializer)
{
_jsonSerializer = jsonSerializer;
}

public byte[] Serialize(object obj)
{
return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
}

public object Deserialize(byte[] value, Type type)
{
return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));
}

public T Deserialize<T>(byte[] value)
{
return _jsonSerializer.Deserialize<T>(Encoding.UTF8.GetString(value));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections.Generic;
using Volo.Abp.Collections;

namespace Volo.Abp.EventBus.Distributed
Expand All @@ -7,16 +6,12 @@ public class AbpDistributedEventBusOptions
{
public ITypeList<IEventHandler> Handlers { get; }

public List<OutboxConfig> Outboxes { get; }
public OutboxConfigList Outboxes { get; }

public AbpDistributedEventBusOptions()
{
Handlers = new TypeList<IEventHandler>();
Outboxes = new OutboxConfigList();
}
}

public class OutboxConfig
{

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -31,6 +32,11 @@ public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) w
return Subscribe(typeof(TEvent), handler);
}

public override Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true)
{
return PublishAsync(eventType, eventData, onUnitOfWorkComplete, useOutbox: true);
}

public Task PublishAsync<TEvent>(
TEvent eventData,
bool onUnitOfWorkComplete = true,
Expand Down Expand Up @@ -68,7 +74,29 @@ public async Task PublishAsync(

private async Task<bool> AddToOutboxAsync(Type eventType, object eventData)
{
var unitOfWork = UnitOfWorkManager.Current;
if (unitOfWork == null)
{
return false;
}

foreach (var outboxConfig in AbpDistributedEventBusOptions.Outboxes)
{
if (outboxConfig.Selector == null || outboxConfig.Selector(eventType))
{
var eventOutbox = (IEventOutbox)unitOfWork.ServiceProvider.GetRequiredService(outboxConfig.ImplementationType);
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
await eventOutbox.EnqueueAsync(
eventName,
Serialize(eventData)
);
return true;
}
}

return false;
}

protected abstract byte[] Serialize(object eventData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;

namespace Volo.Abp.EventBus.Distributed
{
public interface IEventOutbox
{
Task EnqueueAsync(string eventName, byte[] eventData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace Volo.Abp.EventBus.Distributed
{
public class OutboxConfig
{
public string Name { get; }

public Type ImplementationType { get; set; }
public Func<Type, bool> Selector { get; set; }

public OutboxConfig(string name, Type implementationType, Func<Type, bool> selector = null)
{
Name = name;
ImplementationType = implementationType;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using System.Collections.Generic;

namespace Volo.Abp.EventBus.Distributed
{
public class OutboxConfigList : List<OutboxConfig>
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = t
}

/// <inheritdoc/>
public async Task PublishAsync(
public virtual async Task PublishAsync(
Type eventType,
object eventData,
bool onUnitOfWorkComplete = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,7 @@ protected virtual void TriggerDomainEvents(object entity)
new UnitOfWorkEventRecord(
distributedEvent.EventData.GetType(),
distributedEvent.EventData,
distributedEvent.EventOrder,
useOutbox: true
distributedEvent.EventOrder
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public UnitOfWorkEventRecord(
Type eventType,
object eventData,
long eventOrder,
bool useOutbox = false)
bool useOutbox = true)
{
EventType = eventType;
EventData = eventData;
Expand Down
1 change: 1 addition & 0 deletions test/DistEvents/DistDemoApp/DistDemoApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ItemGroup>
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EntityFrameworkCore.SqlServer\Volo.Abp.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.Autofac\Volo.Abp.Autofac.csproj" />
<ProjectReference Include="..\..\..\framework\src\Volo.Abp.EventBus.RabbitMQ\Volo.Abp.EventBus.RabbitMQ.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
13 changes: 12 additions & 1 deletion test/DistEvents/DistDemoApp/DistDemoAppModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
using Volo.Abp.Autofac;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.EntityFrameworkCore.SqlServer;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.EventBus.RabbitMq;
using Volo.Abp.Modularity;

namespace DistDemoApp
{
[DependsOn(
typeof(AbpEntityFrameworkCoreSqlServerModule),
typeof(AbpAutofacModule)
typeof(AbpAutofacModule),
typeof(AbpEventBusRabbitMqModule)
)]
public class DistDemoAppModule : AbpModule
{
Expand All @@ -32,6 +36,13 @@ public override void ConfigureServices(ServiceConfigurationContext context)
options.EtoMappings.Add<TodoItem, TodoItemEto>();
options.AutoEventSelectors.Add<TodoItem>();
});

Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Add(
new OutboxConfig("Default", typeof(DbContextEventOutbox<TodoDbContext>))
);
});
}
}
}
Loading

0 comments on commit d1ea473

Please sign in to comment.