diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs index 882c8d6a700..d1157c3fd87 100644 --- a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs @@ -76,6 +76,16 @@ public void OnCompleted(Func handler) _parent.OnCompleted(handler); } + public void AddLocalEvent(object eventData) + { + _parent.AddLocalEvent(eventData); + } + + public void AddDistributedEvent(object eventData) + { + _parent.AddDistributedEvent(eventData); + } + public IDatabaseApi FindDatabaseApi(string key) { return _parent.FindDatabaseApi(key); diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs index ee5baa58bc5..ed3e185b72c 100644 --- a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs @@ -42,5 +42,9 @@ public interface IUnitOfWork : IDatabaseApiContainer, ITransactionApiContainer, Task RollbackAsync(CancellationToken cancellationToken = default); void OnCompleted(Func handler); + + void AddLocalEvent(object eventData); + + void AddDistributedEvent(object eventData); } } diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWorkEventPublisher.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWorkEventPublisher.cs new file mode 100644 index 00000000000..1f4c65165fd --- /dev/null +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWorkEventPublisher.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Volo.Abp.Uow +{ + public interface IUnitOfWorkEventPublisher + { + Task PublishLocalEventsAsync(IEnumerable localEvents); + Task PublishDistributedEventsAsync(IEnumerable distributedEvents); + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/NullUnitOfWorkEventPublisher.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/NullUnitOfWorkEventPublisher.cs new file mode 100644 index 00000000000..d7ec5436fe3 --- /dev/null +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/NullUnitOfWorkEventPublisher.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.Uow +{ + public class NullUnitOfWorkEventPublisher : IUnitOfWorkEventPublisher, ISingletonDependency + { + public Task PublishLocalEventsAsync(IEnumerable localEvents) + { + return Task.CompletedTask; + } + + public Task PublishDistributedEventsAsync(IEnumerable distributedEvents) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs index 7b4f8fda334..8634075d94f 100644 --- a/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs +++ b/framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.Collections.Immutable; +using System.Collections.ObjectModel; +using System.Linq; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; @@ -33,11 +35,14 @@ public class UnitOfWork : IUnitOfWork, ITransientDependency public string ReservationName { get; set; } protected List> CompletedHandlers { get; } = new List>(); + protected ICollection DistributedEvents { get; } = new Collection(); + protected ICollection LocalEvents { get; } = new Collection(); public event EventHandler Failed; public event EventHandler Disposed; public IServiceProvider ServiceProvider { get; } + protected IUnitOfWorkEventPublisher UnitOfWorkEventPublisher { get; } [NotNull] public Dictionary Items { get; } @@ -50,9 +55,13 @@ public class UnitOfWork : IUnitOfWork, ITransientDependency private bool _isCompleting; private bool _isRolledback; - public UnitOfWork(IServiceProvider serviceProvider, IOptions options) + public UnitOfWork( + IServiceProvider serviceProvider, + IUnitOfWorkEventPublisher unitOfWorkEventPublisher, + IOptions options) { ServiceProvider = serviceProvider; + UnitOfWorkEventPublisher = unitOfWorkEventPublisher; _defaultOptions = options.Value; _databaseApis = new Dictionary(); @@ -103,12 +112,12 @@ public virtual async Task SaveChangesAsync(CancellationToken cancellationToken = } } - public IReadOnlyList GetAllActiveDatabaseApis() + public virtual IReadOnlyList GetAllActiveDatabaseApis() { return _databaseApis.Values.ToImmutableList(); } - public IReadOnlyList GetAllActiveTransactionApis() + public virtual IReadOnlyList GetAllActiveTransactionApis() { return _transactionApis.Values.ToImmutableList(); } @@ -126,6 +135,30 @@ public virtual async Task CompleteAsync(CancellationToken cancellationToken = de { _isCompleting = true; await SaveChangesAsync(cancellationToken); + + while (LocalEvents.Any() || DistributedEvents.Any()) + { + if (LocalEvents.Any()) + { + var localEventsToBePublished = LocalEvents.ToArray(); + LocalEvents.Clear(); + await UnitOfWorkEventPublisher.PublishLocalEventsAsync( + localEventsToBePublished + ); + } + + if (DistributedEvents.Any()) + { + var distributedEventsToBePublished = DistributedEvents.ToArray(); + DistributedEvents.Clear(); + await UnitOfWorkEventPublisher.PublishDistributedEventsAsync( + distributedEventsToBePublished + ); + } + + await SaveChangesAsync(cancellationToken); + } + await CommitTransactionsAsync(); IsCompleted = true; await OnCompletedAsync(); @@ -149,12 +182,12 @@ public virtual async Task RollbackAsync(CancellationToken cancellationToken = de await RollbackAllAsync(cancellationToken); } - public IDatabaseApi FindDatabaseApi(string key) + public virtual IDatabaseApi FindDatabaseApi(string key) { return _databaseApis.GetOrDefault(key); } - public void AddDatabaseApi(string key, IDatabaseApi api) + public virtual void AddDatabaseApi(string key, IDatabaseApi api) { Check.NotNull(key, nameof(key)); Check.NotNull(api, nameof(api)); @@ -167,7 +200,7 @@ public void AddDatabaseApi(string key, IDatabaseApi api) _databaseApis.Add(key, api); } - public IDatabaseApi GetOrAddDatabaseApi(string key, Func factory) + public virtual IDatabaseApi GetOrAddDatabaseApi(string key, Func factory) { Check.NotNull(key, nameof(key)); Check.NotNull(factory, nameof(factory)); @@ -175,14 +208,14 @@ public IDatabaseApi GetOrAddDatabaseApi(string key, Func factory) return _databaseApis.GetOrAdd(key, factory); } - public ITransactionApi FindTransactionApi(string key) + public virtual ITransactionApi FindTransactionApi(string key) { Check.NotNull(key, nameof(key)); return _transactionApis.GetOrDefault(key); } - public void AddTransactionApi(string key, ITransactionApi api) + public virtual void AddTransactionApi(string key, ITransactionApi api) { Check.NotNull(key, nameof(key)); Check.NotNull(api, nameof(api)); @@ -195,7 +228,7 @@ public void AddTransactionApi(string key, ITransactionApi api) _transactionApis.Add(key, api); } - public ITransactionApi GetOrAddTransactionApi(string key, Func factory) + public virtual ITransactionApi GetOrAddTransactionApi(string key, Func factory) { Check.NotNull(key, nameof(key)); Check.NotNull(factory, nameof(factory)); @@ -203,11 +236,21 @@ public ITransactionApi GetOrAddTransactionApi(string key, Func return _transactionApis.GetOrAdd(key, factory); } - public void OnCompleted(Func handler) + public virtual void OnCompleted(Func handler) { CompletedHandlers.Add(handler); } + public virtual void AddLocalEvent(object eventData) + { + LocalEvents.Add(eventData); + } + + public virtual void AddDistributedEvent(object eventData) + { + DistributedEvents.Add(eventData); + } + protected virtual async Task OnCompletedAsync() { foreach (var handler in CompletedHandlers) diff --git a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs index 3b9ec55e75d..bf0d0c87816 100644 --- a/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs +++ b/framework/test/Volo.Abp.AspNetCore.Mvc.Tests/Volo/Abp/AspNetCore/Mvc/Uow/TestUnitOfWork.cs @@ -13,8 +13,14 @@ public class TestUnitOfWork : UnitOfWork { private readonly TestUnitOfWorkConfig _config; - public TestUnitOfWork(IServiceProvider serviceProvider, IOptions options, TestUnitOfWorkConfig config) - : base(serviceProvider, options) + public TestUnitOfWork( + IServiceProvider serviceProvider, + IUnitOfWorkEventPublisher unitOfWorkEventPublisher, + IOptions options, TestUnitOfWorkConfig config) + : base( + serviceProvider, + unitOfWorkEventPublisher, + options) { _config = config; } diff --git a/test/DistEvents/DistDemoApp/TodoEventHandler.cs b/test/DistEvents/DistDemoApp/TodoEventHandler.cs index e54278c459f..73c70407327 100644 --- a/test/DistEvents/DistDemoApp/TodoEventHandler.cs +++ b/test/DistEvents/DistDemoApp/TodoEventHandler.cs @@ -39,8 +39,10 @@ public virtual async Task HandleEventAsync(EntityCreatedEto eventDa todoSummary.Increase(); await _todoSummaryRepository.UpdateAsync(todoSummary); } - + Console.WriteLine("Increased total count: " + todoSummary); + + throw new ApplicationException("Thrown to rollback the UOW!"); } public async Task HandleEventAsync(EntityDeletedEto eventData)