Skip to content

Commit

Permalink
Merge pull request abpframework#5637 from abpframework/liangshiwei/rebus
Browse files Browse the repository at this point in the history
Rebus Integration for Distributed Event Bus
  • Loading branch information
maliming authored Oct 13, 2020
2 parents 7e26426 + 78b1adb commit 80f61d8
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 0 deletions.
7 changes: 7 additions & 0 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.Autofac.WebAssembl
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Volo.Abp.AspNetCore.Authentication.OpenIdConnect", "src\Volo.Abp.AspNetCore.Authentication.OpenIdConnect\Volo.Abp.AspNetCore.Authentication.OpenIdConnect.csproj", "{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Rebus", "src\Volo.Abp.EventBus.Rebus\Volo.Abp.EventBus.Rebus.csproj", "{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.ExceptionHandling", "src\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj", "{B9D1ADCB-D552-4626-A1F1-78FF72C1E822}"
EndProject
Global
Expand Down Expand Up @@ -1035,6 +1037,10 @@ Global
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5}.Release|Any CPU.Build.0 = Release|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3}.Release|Any CPU.Build.0 = Release|Any CPU
{B9D1ADCB-D552-4626-A1F1-78FF72C1E822}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B9D1ADCB-D552-4626-A1F1-78FF72C1E822}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B9D1ADCB-D552-4626-A1F1-78FF72C1E822}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1214,6 +1220,7 @@ Global
{29CA7471-4E3E-4E75-8B33-001DDF682F01} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{37F89B0B-1C6B-426F-A5EE-676D1956D9E9} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{DEFE3DB2-EA4F-4F90-87FC-B25D64427BC5} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{F689967F-1EF1-4D75-8BA4-2F2F3506B1F3} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{B9D1ADCB-D552-4626-A1F1-78FF72C1E822} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
3 changes: 3 additions & 0 deletions framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
30 changes: 30 additions & 0 deletions framework/src/Volo.Abp.EventBus.Rebus/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Volo.Abp.EventBus.Rebus</AssemblyName>
<PackageId>Volo.Abp.EventBus.Rebus</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Rebus" Version="6.4.1" />
<PackageReference Include="Rebus.ServiceProvider" Version="5.0.6" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Microsoft.Extensions.DependencyInjection;
using Rebus.Handlers;
using Rebus.ServiceProvider;
using Volo.Abp.Modularity;

namespace Volo.Abp.EventBus.Rebus
{
[DependsOn(
typeof(AbpEventBusModule))]
public class AbpEventBusRebusModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var options = context.Services.ExecutePreConfiguredActions<AbpEventBusRebusOptions>();

context.Services.AddTransient(typeof(IHandleMessages<>), typeof(RebusDistributedEventHandlerAdapter<>));

Configure<AbpEventBusRebusOptions>(rebusOptions =>
{
rebusOptions.Configurer = options.Configurer;
rebusOptions.Publish = options.Publish;
rebusOptions.InputQueueName = options.InputQueueName;
});

context.Services.AddRebus(configurer =>
{
options.Configurer?.Invoke(configurer);
return configurer;
});

}

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.ServiceProvider.UseRebus();

context
.ServiceProvider
.GetRequiredService<RebusDistributedEventBus>()
.Initialize();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Persistence.InMem;
using Rebus.Transport.InMem;

namespace Volo.Abp.EventBus.Rebus
{
public class AbpEventBusRebusOptions
{
[NotNull]
public string InputQueueName { get; set; }

[NotNull]
public Action<RebusConfigurer> Configurer
{
get => _configurer;
set => _configurer = Check.NotNull(value, nameof(value));
}
private Action<RebusConfigurer> _configurer;

[NotNull]
public Func<IBus, Type, object, Task> Publish
{
get => _publish;
set => _publish = Check.NotNull(value, nameof(value));
}
private Func<IBus, Type, object, Task> _publish;

public AbpEventBusRebusOptions()
{
_publish = DefaultPublish;
_configurer = DefaultConfigurer;
}

private async Task DefaultPublish(IBus bus, Type eventType, object eventData)
{
await bus.Advanced.Routing.Send(InputQueueName, eventData);
}

private void DefaultConfigurer(RebusConfigurer configurer)
{
configurer.Subscriptions(s => s.StoreInMemory());
configurer.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), InputQueueName));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Rebus.Bus;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;

namespace Volo.Abp.EventBus.Rebus
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(RebusDistributedEventBus))]
public class RebusDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
protected IBus Rebus { get; }

//TODO: Accessing to the List<IEventHandlerFactory> may not be thread-safe!
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected AbpEventBusRebusOptions AbpEventBusRebusOptions { get; }

public RebusDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IBus rebus,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IOptions<AbpEventBusRebusOptions> abpEventBusRebusOptions) :
base(serviceScopeFactory, currentTenant)
{
Rebus = rebus;
AbpEventBusRebusOptions = abpEventBusRebusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;

HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}

public void Initialize()
{
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}

public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);

if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}

handlerFactories.Add(factory);

if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Rebus.Subscribe(eventType);
}

return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}

public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));

GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
if (!(factory is SingleInstanceHandlerFactory singleInstanceFactory))
{
return false;
}

if (!(singleInstanceFactory.HandlerInstance is ActionEventHandler<TEvent> actionHandler))
{
return false;
}

return actionHandler.Action == action;
});
});

Rebus.Unsubscribe(typeof(TEvent));
}

public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});

Rebus.Unsubscribe(eventType);
}

public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
Rebus.Unsubscribe(eventType);
}

public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
Rebus.Unsubscribe(eventType);
}

public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}

public override async Task PublishAsync(Type eventType, object eventData)
{
await AbpEventBusRebusOptions.Publish(Rebus, eventType, eventData);
}

private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
return new List<IEventHandlerFactory>();
}
);
}

protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();

foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}

return handlerFactoryList.ToArray();
}

private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}

//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Threading.Tasks;
using Rebus.Handlers;

namespace Volo.Abp.EventBus.Rebus
{
public class RebusDistributedEventHandlerAdapter<TEventData> : IHandleMessages<TEventData>
{
protected RebusDistributedEventBus RebusDistributedEventBus { get; }

public RebusDistributedEventHandlerAdapter(RebusDistributedEventBus rebusDistributedEventBus)
{
RebusDistributedEventBus = rebusDistributedEventBus;
}

public async Task Handle(TEventData message)
{
await RebusDistributedEventBus.TriggerHandlersAsync(typeof(TEventData), message);
}
}
}
1 change: 1 addition & 0 deletions nupkg/common.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ $projects = (
"framework/src/Volo.Abp.EventBus",
"framework/src/Volo.Abp.EventBus.RabbitMQ",
"framework/src/Volo.Abp.EventBus.Kafka",
"framework/src/Volo.Abp.EventBus.Rebus",
"framework/src/Volo.Abp.ExceptionHandling",
"framework/src/Volo.Abp.Features",
"framework/src/Volo.Abp.FluentValidation",
Expand Down

0 comments on commit 80f61d8

Please sign in to comment.