Skip to content

Commit

Permalink
Re-worked UnitofWork pattern so that it is entirely async-friendly. I…
Browse files Browse the repository at this point in the history
…n doing so, stumbled on a few issues related to how transactional events were being processed. Less of a bug, and more a case of bad information getting logged. Base transactional events were made ISync though instead of ISerializable so that they are processed correctly should anyone need to handle them.
  • Loading branch information
jasonmwebb-lv committed May 14, 2024
1 parent ad4c1af commit eaae76e
Show file tree
Hide file tree
Showing 35 changed files with 476 additions and 577 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class CreateLeaveTypeCommandHandlerTests

public CreateLeaveTypeCommandHandlerTests()
{
var mapperConfig = new MapperConfiguration(c =>

var mapperConfig = new MapperConfiguration(c =>
{
c.AddProfile<MappingProfile>();
});
Expand All @@ -46,13 +46,17 @@ public CreateLeaveTypeCommandHandlerTests()
var validationMock = new Mock<IValidationService>();
mock.Setup(x => x.AddAsync(TestDataActions.CreateLeaveTypeStub(), CancellationToken.None))
.Returns(() => Task.FromResult(new BaseCommandResponse()));
_handler = new CreateLeaveTypeCommandHandler(_mapper, mock.Object, validationMock.Object);


_leaveTypeDto = new CreateLeaveTypeDto
{
DefaultDays = 15,
Name = "Test DTO"
};

validationMock.Setup(x => x.ValidateAsync(_leaveTypeDto, false, CancellationToken.None))
.Returns(() => Task.FromResult(new ValidationOutcome()));
_handler = new CreateLeaveTypeCommandHandler(_mapper, mock.Object, validationMock.Object);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static bool IsRequest(Type type)

private static bool IsIRequestHandler(Type type)
{
return type.GetInterfaces().Any(interfaceType => interfaceType.IsGenericType && interfaceType.GetGenericTypeDefinition() == typeof(IAppRequestHandler<,>));
return type.GetInterfaces().Any(interfaceType => interfaceType.IsGenericType && interfaceType.GetGenericTypeDefinition() == typeof(IAppRequestHandler<>));
}

private static bool IsHandlerForRequest(Type handlerType, Type requestType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public override Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, Cance
return base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
}

public override void PersistChanges()
public override async Task PersistChangesAsync()
{
base.PersistChanges();
await base.PersistChangesAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public async Task RouteEventsAsync(IEnumerable<ISerializableEvent> transactional
var asyncEvents = transactionalEvents.Where(x => x is IAsyncEvent);
var eventProducers = _serviceProvider.GetServices<IEventProducer>();

_logger.LogInformation($"{this.GetGenericTypeName()} is routing {transactionalEvents.Count().ToString()} synchronized transactional events.");
_logger.LogInformation($"{this.GetGenericTypeName()} is routing {syncEvents.Count().ToString()} synchronized transactional events.");

// Produce the Synchronized Events first
foreach (var @event in syncEvents)
Expand All @@ -53,7 +53,7 @@ public async Task RouteEventsAsync(IEnumerable<ISerializableEvent> transactional
}
}

_logger.LogInformation($"{this.GetGenericTypeName()} is routing {transactionalEvents.Count().ToString()} asynchronous transactional events.");
_logger.LogInformation($"{this.GetGenericTypeName()} is routing {asyncEvents.Count().ToString()} asynchronous transactional events.");

// Produce the Async Events
foreach (var @event in asyncEvents)
Expand Down
12 changes: 6 additions & 6 deletions Src/RCommon.Dapper/Crud/DapperRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
using Microsoft.Extensions.Options;
using Dommel;
using RCommon.Collections;
using RCommon.Persistence.Transactions;
using RCommon.Persistence.Crud;
using RCommon.Persistence.Transactions;

namespace RCommon.Persistence.Dapper.Crud
{
Expand Down Expand Up @@ -47,7 +47,7 @@ public override async Task AddAsync(TEntity entity, CancellationToken token = de

entity.AddLocalEvent(new EntityCreatedEvent<TEntity>(entity));
EventTracker.AddEntity(entity);
DispatchEvents();
await DispatchEvents();
await db.InsertAsync(entity, cancellationToken: token);

}
Expand Down Expand Up @@ -81,7 +81,7 @@ public override async Task DeleteAsync(TEntity entity, CancellationToken token =

entity.AddLocalEvent(new EntityDeletedEvent<TEntity>(entity));
EventTracker.AddEntity(entity);
DispatchEvents();
await DispatchEvents();
await db.DeleteAsync(entity, cancellationToken: token);
}
catch (ApplicationException exception)
Expand Down Expand Up @@ -116,7 +116,7 @@ public override async Task UpdateAsync(TEntity entity, CancellationToken token =

entity.AddLocalEvent(new EntityUpdatedEvent<TEntity>(entity));
EventTracker.AddEntity(entity);
DispatchEvents();
await DispatchEvents();
await db.UpdateAsync(entity, cancellationToken: token);
}
catch (ApplicationException exception)
Expand Down Expand Up @@ -316,14 +316,14 @@ public override async Task<bool> AnyAsync(ISpecification<TEntity> specification,
return await AnyAsync(specification.Predicate, token);
}

protected void DispatchEvents()
protected async Task DispatchEvents()
{
try
{
if (UnitOfWorkManager.CurrentUnitOfWork == null)
{
Guard.Against<NullReferenceException>(DataStore == null, "DataStore is null");
DataStore.PersistChanges(); // This dispatches the events
await DataStore.PersistChangesAsync(); // This dispatches the events
}
}
catch (ApplicationException exception)
Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.EfCore/Crud/EFCoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RCommon.Persistence.Transactions;
using RCommon.Persistence.Crud;
using RCommon.Persistence.Transactions;

namespace RCommon.Persistence.EFCore.Crud
{
Expand Down
9 changes: 6 additions & 3 deletions Src/RCommon.EfCore/RCommonDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ public RCommonDbContext(DbContextOptions options, IEntityEventTracker entityEven
public RCommonDbContext(DbContextOptions options)
: base(options)
{

if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
}


Expand All @@ -42,9 +45,9 @@ public DbConnection GetDbConnection()
return base.Database.GetDbConnection();
}

public virtual void PersistChanges()
public virtual async Task PersistChangesAsync()
{
AsyncHelper.RunSync(() => this.SaveChangesAsync(true));
await this.SaveChangesAsync(true);
}


Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.Entities/EntityEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace RCommon.Entities
{
public class EntityEvent<TEntity> : ISerializableEvent
public class EntityEvent<TEntity> : ISyncEvent
{
public EntityEvent(TEntity entity)
{
Expand Down
3 changes: 2 additions & 1 deletion Src/RCommon.Entities/InMemoryEntityEventTracker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RCommon.EventHandling.Producers;
using System;
using System.Collections.Generic;
using System.Data.SqlTypes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -42,7 +43,7 @@ public async Task<bool> EmitTransactionalEventsAsync()
}
}
await _eventRouter.RouteEventsAsync();
return true;
return await Task.FromResult(true);
}
}
}
2 changes: 1 addition & 1 deletion Src/RCommon.Linq2Db/Crud/Linq2DbRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
using LinqToDB.Tools;
using LinqToDB.Data;
using DataExtensions = LinqToDB.Tools.DataExtensions;
using RCommon.Persistence.Transactions;
using RCommon;
using RCommon.Persistence.Crud;
using RCommon.Persistence.Transactions;

namespace RCommon.Persistence.Linq2Db.Crud
{
Expand Down
7 changes: 0 additions & 7 deletions Src/RCommon.Linq2Db/RCommonDataConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ public DbConnection GetDbConnection()
return this.Connection;
}

public void PersistChanges()
{
AsyncHelper.RunSync(() => this.PersistChangesAsync());
// Nothing to do here because persistence is handled in the underlying API. We'll need to handle Unit of work in a transaction.
return;
}

public async Task PersistChangesAsync()
{
await this._eventTracker.EmitTransactionalEventsAsync();
Expand Down
8 changes: 4 additions & 4 deletions Src/RCommon.Mediatr/Behaviors/UnitOfWorkBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe

try
{
using (var unitOfWork = this._unitOfWorkScopeFactory.Create(TransactionMode.Default))
await using (var unitOfWork = await this._unitOfWorkScopeFactory.CreateAsync(TransactionMode.Default))
{
_logger.LogInformation("----- Begin transaction {UnitOfWorkTransactionId} for {CommandName} ({@Command})",
this._unitOfWorkManager.CurrentUnitOfWork.TransactionId, typeName, request);
Expand All @@ -40,7 +40,7 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe
_logger.LogInformation("----- Commit transaction {UnitOfWorkTransactionId} for {CommandName}",
this._unitOfWorkManager.CurrentUnitOfWork.TransactionId, typeName);

unitOfWork.Commit();
await unitOfWork.CommitAsync();
}


Expand Down Expand Up @@ -77,7 +77,7 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe

try
{
using (var unitOfWork = this._unitOfWorkScopeFactory.Create(TransactionMode.Default))
await using (var unitOfWork = await this._unitOfWorkScopeFactory.CreateAsync(TransactionMode.Default))
{
_logger.LogInformation("----- Begin transaction {UnitOfWorkTransactionId} for {CommandName} ({@Command})",
this._unitOfWorkManager.CurrentUnitOfWork.TransactionId, typeName, request);
Expand All @@ -87,7 +87,7 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe
_logger.LogInformation("----- Commit transaction {UnitOfWorkTransactionId} for {CommandName}",
this._unitOfWorkManager.CurrentUnitOfWork.TransactionId, typeName);

unitOfWork.Commit();
await unitOfWork.CommitAsync();
}


Expand Down
3 changes: 0 additions & 3 deletions Src/RCommon.Persistence/Crud/GraphRepositoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public abstract class GraphRepositoryBase<TEntity> : LinqRepositoryBase<TEntity>
where TEntity : class, IBusinessEntity
{

private string _dataStoreName;
private readonly IDataStoreEnlistmentProvider _dataStoreEnlistmentProvider;

public GraphRepositoryBase(IDataStoreRegistry dataStoreRegistry, IDataStoreEnlistmentProvider dataStoreEnlistmentProvider,
IUnitOfWorkManager unitOfWorkManager, IEntityEventTracker eventTracker, IOptions<DefaultDataStoreOptions> defaultDataStoreOptions)
:base(dataStoreRegistry, dataStoreEnlistmentProvider, unitOfWorkManager, eventTracker, defaultDataStoreOptions)
Expand Down
4 changes: 2 additions & 2 deletions Src/RCommon.Persistence/IDataStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

namespace RCommon.Persistence
{
public interface IDataStore : IDisposable, IAsyncDisposable
public interface IDataStore : IAsyncDisposable
{

void PersistChanges();
Task PersistChangesAsync();
DbConnection GetDbConnection();
}

Expand Down
39 changes: 22 additions & 17 deletions Src/RCommon.Persistence/PersistenceBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using RCommon.Entities;
using RCommon.EventHandling.Producers;
using RCommon.EventHandling.Subscribers;
using RCommon.Persistence;
using RCommon.Persistence.Transactions;
using System;
Expand All @@ -15,62 +16,66 @@ namespace RCommon
public static class PersistenceBuilderExtensions
{

public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder config)
public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder builder)
where TObjectAccess: IPersistenceBuilder
where TUnitOfWork : IUnitOfWorkBuilder
{
return WithPersistence<TObjectAccess, TUnitOfWork>(config, x => { }, x => { });
return WithPersistence<TObjectAccess, TUnitOfWork>(builder, x => { }, x => { });
}

public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder config,
public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder builder,
Action<TObjectAccess> objectAccessActions)
where TObjectAccess : IPersistenceBuilder
where TUnitOfWork : IUnitOfWorkBuilder
{

return WithPersistence<TObjectAccess, TUnitOfWork>(config, objectAccessActions, x => { });
return WithPersistence<TObjectAccess, TUnitOfWork>(builder, objectAccessActions, x => { });
}

public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder config,
public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder builder,
Action<TUnitOfWork> uniOfWorkActions)
where TObjectAccess : IPersistenceBuilder
where TUnitOfWork : IUnitOfWorkBuilder
{

return WithPersistence<TObjectAccess, TUnitOfWork>(config, x => { }, uniOfWorkActions);
return WithPersistence<TObjectAccess, TUnitOfWork>(builder, x => { }, uniOfWorkActions);
}

public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder config,
public static IRCommonBuilder WithPersistence<TObjectAccess, TUnitOfWork>(this IRCommonBuilder builder,
Action<TObjectAccess> objectAccessActions, Action<TUnitOfWork> unitOfWorkActions)
where TObjectAccess : IPersistenceBuilder
where TUnitOfWork : IUnitOfWorkBuilder
{
// Data Store Management
StaticDataStore.DataStores = (StaticDataStore.DataStores == null ? new System.Collections.Concurrent.ConcurrentDictionary<string, Type>()
: StaticDataStore.DataStores);
config.Services.AddScoped<IDataStoreRegistry, StaticDataStoreRegistry>();
builder.Services.AddScoped<IDataStoreRegistry, StaticDataStoreRegistry>();

// Object Access and Unit of Work Configurations
var dataConfiguration = (TObjectAccess)Activator.CreateInstance(typeof(TObjectAccess), new object[] { config.Services });
// Wire up the "out of the box" events/event handlers used in persistence. These are not transactional
//builder.Services.AddScoped<ISubscriber<UnitOfWorkCreatedEvent>, UnitOfWorkCreatedHandler>();
//builder.Services.AddScoped<ISubscriber<UnitOfWorkCommittedEvent>, UnitOfWorkCommittedEventHandler>();

var dataConfiguration = (TObjectAccess)Activator.CreateInstance(typeof(TObjectAccess), new object[] { builder.Services });
objectAccessActions(dataConfiguration);
var unitOfWorkConfiguration = (TUnitOfWork)Activator.CreateInstance(typeof(TUnitOfWork), new object[] { config.Services });
var unitOfWorkConfiguration = (TUnitOfWork)Activator.CreateInstance(typeof(TUnitOfWork), new object[] { builder.Services });
unitOfWorkActions(unitOfWorkConfiguration);
config = WithChangeTracking(config);
return config;
builder = WithChangeTracking(builder);
return builder;
}


/// <summary>
/// Right now we are always using change tracking due to requirements for publishing entity events and those events being
/// somewhat tied to Change Tracking.
/// </summary>
/// <param name="config">Instance of <see cref="IRCommonBuilder"/>passed in.</param>
/// <param name="builder">Instance of <see cref="IRCommonBuilder"/>passed in.</param>
/// <returns>Updated instance of <see cref="IRCommonBuilder"/>RCommon Configuration</returns>
private static IRCommonBuilder WithChangeTracking(this IRCommonBuilder config)
private static IRCommonBuilder WithChangeTracking(this IRCommonBuilder builder)
{
config.Services.AddScoped<IEventRouter, InMemoryTransactionalEventRouter>();
config.Services.AddScoped<IEntityEventTracker, InMemoryEntityEventTracker>();
return config;
builder.Services.AddScoped<IEventRouter, InMemoryTransactionalEventRouter>();
builder.Services.AddScoped<IEntityEventTracker, InMemoryEntityEventTracker>();
return builder;
}


Expand Down
7 changes: 0 additions & 7 deletions Src/RCommon.Persistence/Sql/RDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ public DbConnection GetDbConnection()
return connection;
}

public void PersistChanges()
{
AsyncHelper.RunSync(() => this.PersistChangesAsync());
// Nothing to do here because this is a SQL Connection
return;
}

public async Task PersistChangesAsync()
{
await this._entityEventTracker.EmitTransactionalEventsAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public DefaultUnitOfWorkBuilder(IServiceCollection services)
services.AddScoped<IDataStoreEnlistmentProvider, ScopedDataStoreEnlistmentProvider>();

// Transaction Management
services.AddScoped<IUnitOfWorkManager, UnitOfWorkScopeManager>();
services.AddScoped<IUnitOfWorkManager, UnitOfWorkManager>();

// Factory for Unit Of Work Scope
services.AddTransient<IUnitOfWork, UnitOfWorkScope>();
services.AddTransient<IUnitOfWork, UnitOfWork>();
services.AddTransient<IUnitOfWorkFactory, UnitOfWorkFactory>();
_services = services;
}
Expand Down
Loading

0 comments on commit eaae76e

Please sign in to comment.