Skip to content

Commit

Permalink
Introduced AddLocalEvent and AddDistributedEvent methods on IUnitOfWork
Browse files Browse the repository at this point in the history
  • Loading branch information
hikalkan committed Aug 25, 2021
1 parent a09ccde commit 8217bf5
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 13 deletions.
10 changes: 10 additions & 0 deletions framework/src/Volo.Abp.Uow/Volo/Abp/Uow/ChildUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ public void OnCompleted(Func<Task> handler)
_parent.OnCompleted(handler);
}

public void AddLocalEvent(object eventData)
{
_parent.AddLocalEvent(eventData);
}

public void AddDistributedEvent(object eventData)
{
_parent.AddDistributedEvent(eventData);
}

public IDatabaseApi FindDatabaseApi(string key)
{
return _parent.FindDatabaseApi(key);
Expand Down
4 changes: 4 additions & 0 deletions framework/src/Volo.Abp.Uow/Volo/Abp/Uow/IUnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,9 @@ public interface IUnitOfWork : IDatabaseApiContainer, ITransactionApiContainer,
Task RollbackAsync(CancellationToken cancellationToken = default);

void OnCompleted(Func<Task> handler);

void AddLocalEvent(object eventData);

void AddDistributedEvent(object eventData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Volo.Abp.Uow
{
public interface IUnitOfWorkEventPublisher
{
Task PublishLocalEventsAsync(IEnumerable<object> localEvents);
Task PublishDistributedEventsAsync(IEnumerable<object> distributedEvents);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;

namespace Volo.Abp.Uow
{
public class NullUnitOfWorkEventPublisher : IUnitOfWorkEventPublisher, ISingletonDependency
{
public Task PublishLocalEventsAsync(IEnumerable<object> localEvents)
{
return Task.CompletedTask;
}

public Task PublishDistributedEventsAsync(IEnumerable<object> distributedEvents)
{
return Task.CompletedTask;
}
}
}
63 changes: 53 additions & 10 deletions framework/src/Volo.Abp.Uow/Volo/Abp/Uow/UnitOfWork.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand Down Expand Up @@ -33,11 +35,14 @@ public class UnitOfWork : IUnitOfWork, ITransientDependency
public string ReservationName { get; set; }

protected List<Func<Task>> CompletedHandlers { get; } = new List<Func<Task>>();
protected ICollection<object> DistributedEvents { get; } = new Collection<object>();
protected ICollection<object> LocalEvents { get; } = new Collection<object>();

public event EventHandler<UnitOfWorkFailedEventArgs> Failed;
public event EventHandler<UnitOfWorkEventArgs> Disposed;

public IServiceProvider ServiceProvider { get; }
protected IUnitOfWorkEventPublisher UnitOfWorkEventPublisher { get; }

[NotNull]
public Dictionary<string, object> Items { get; }
Expand All @@ -50,9 +55,13 @@ public class UnitOfWork : IUnitOfWork, ITransientDependency
private bool _isCompleting;
private bool _isRolledback;

public UnitOfWork(IServiceProvider serviceProvider, IOptions<AbpUnitOfWorkDefaultOptions> options)
public UnitOfWork(
IServiceProvider serviceProvider,
IUnitOfWorkEventPublisher unitOfWorkEventPublisher,
IOptions<AbpUnitOfWorkDefaultOptions> options)
{
ServiceProvider = serviceProvider;
UnitOfWorkEventPublisher = unitOfWorkEventPublisher;
_defaultOptions = options.Value;

_databaseApis = new Dictionary<string, IDatabaseApi>();
Expand Down Expand Up @@ -103,12 +112,12 @@ public virtual async Task SaveChangesAsync(CancellationToken cancellationToken =
}
}

public IReadOnlyList<IDatabaseApi> GetAllActiveDatabaseApis()
public virtual IReadOnlyList<IDatabaseApi> GetAllActiveDatabaseApis()
{
return _databaseApis.Values.ToImmutableList();
}

public IReadOnlyList<ITransactionApi> GetAllActiveTransactionApis()
public virtual IReadOnlyList<ITransactionApi> GetAllActiveTransactionApis()
{
return _transactionApis.Values.ToImmutableList();
}
Expand All @@ -126,6 +135,30 @@ public virtual async Task CompleteAsync(CancellationToken cancellationToken = de
{
_isCompleting = true;
await SaveChangesAsync(cancellationToken);

while (LocalEvents.Any() || DistributedEvents.Any())
{
if (LocalEvents.Any())
{
var localEventsToBePublished = LocalEvents.ToArray();
LocalEvents.Clear();
await UnitOfWorkEventPublisher.PublishLocalEventsAsync(
localEventsToBePublished
);
}

if (DistributedEvents.Any())
{
var distributedEventsToBePublished = DistributedEvents.ToArray();
DistributedEvents.Clear();
await UnitOfWorkEventPublisher.PublishDistributedEventsAsync(
distributedEventsToBePublished
);
}

await SaveChangesAsync(cancellationToken);
}

await CommitTransactionsAsync();
IsCompleted = true;
await OnCompletedAsync();
Expand All @@ -149,12 +182,12 @@ public virtual async Task RollbackAsync(CancellationToken cancellationToken = de
await RollbackAllAsync(cancellationToken);
}

public IDatabaseApi FindDatabaseApi(string key)
public virtual IDatabaseApi FindDatabaseApi(string key)
{
return _databaseApis.GetOrDefault(key);
}

public void AddDatabaseApi(string key, IDatabaseApi api)
public virtual void AddDatabaseApi(string key, IDatabaseApi api)
{
Check.NotNull(key, nameof(key));
Check.NotNull(api, nameof(api));
Expand All @@ -167,22 +200,22 @@ public void AddDatabaseApi(string key, IDatabaseApi api)
_databaseApis.Add(key, api);
}

public IDatabaseApi GetOrAddDatabaseApi(string key, Func<IDatabaseApi> factory)
public virtual IDatabaseApi GetOrAddDatabaseApi(string key, Func<IDatabaseApi> factory)
{
Check.NotNull(key, nameof(key));
Check.NotNull(factory, nameof(factory));

return _databaseApis.GetOrAdd(key, factory);
}

public ITransactionApi FindTransactionApi(string key)
public virtual ITransactionApi FindTransactionApi(string key)
{
Check.NotNull(key, nameof(key));

return _transactionApis.GetOrDefault(key);
}

public void AddTransactionApi(string key, ITransactionApi api)
public virtual void AddTransactionApi(string key, ITransactionApi api)
{
Check.NotNull(key, nameof(key));
Check.NotNull(api, nameof(api));
Expand All @@ -195,19 +228,29 @@ public void AddTransactionApi(string key, ITransactionApi api)
_transactionApis.Add(key, api);
}

public ITransactionApi GetOrAddTransactionApi(string key, Func<ITransactionApi> factory)
public virtual ITransactionApi GetOrAddTransactionApi(string key, Func<ITransactionApi> factory)
{
Check.NotNull(key, nameof(key));
Check.NotNull(factory, nameof(factory));

return _transactionApis.GetOrAdd(key, factory);
}

public void OnCompleted(Func<Task> handler)
public virtual void OnCompleted(Func<Task> handler)
{
CompletedHandlers.Add(handler);
}

public virtual void AddLocalEvent(object eventData)
{
LocalEvents.Add(eventData);
}

public virtual void AddDistributedEvent(object eventData)
{
DistributedEvents.Add(eventData);
}

protected virtual async Task OnCompletedAsync()
{
foreach (var handler in CompletedHandlers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ public class TestUnitOfWork : UnitOfWork
{
private readonly TestUnitOfWorkConfig _config;

public TestUnitOfWork(IServiceProvider serviceProvider, IOptions<AbpUnitOfWorkDefaultOptions> options, TestUnitOfWorkConfig config)
: base(serviceProvider, options)
public TestUnitOfWork(
IServiceProvider serviceProvider,
IUnitOfWorkEventPublisher unitOfWorkEventPublisher,
IOptions<AbpUnitOfWorkDefaultOptions> options, TestUnitOfWorkConfig config)
: base(
serviceProvider,
unitOfWorkEventPublisher,
options)
{
_config = config;
}
Expand Down
4 changes: 3 additions & 1 deletion test/DistEvents/DistDemoApp/TodoEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public virtual async Task HandleEventAsync(EntityCreatedEto<TodoItemEto> eventDa
todoSummary.Increase();
await _todoSummaryRepository.UpdateAsync(todoSummary);
}

Console.WriteLine("Increased total count: " + todoSummary);

throw new ApplicationException("Thrown to rollback the UOW!");
}

public async Task HandleEventAsync(EntityDeletedEto<TodoItemEto> eventData)
Expand Down

0 comments on commit 8217bf5

Please sign in to comment.