南通长城建设集团有限公司网站,培训机构网站,济南建网站哪家好,青岛网站美工EventBus#xff08;事件总线#xff09;是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中#xff0c;我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…EventBus事件总线是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先我们有两个基本的约束接口IEvent和IAsyncEventHandlerTEvent。
IEvent是一个空接口用于约束事件的类型。IAsyncEventHandlerTEvent是一个泛型接口用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来我们有一个IEventBus接口它定义了一些操作方法用于发布和订阅事件。
其中PublishTEvent和PublishAsyncTEvent方法用于发布事件而OnSubscribeTEvent方法用于订阅事件。然后我们看到一个实现了本地事件总线的类LocalEventBusManagerTEvent。它实现了ILocalEventBusManagerTEvent接口用于在单一管道内处理本地事件。它使用了一个ChannelTEvent来存储事件并提供了发布事件的方法Publish和PublishAsync。此外它还提供了一个自动处理事件的方法AutoHandle。
总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件组件可以独立地进行操作而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束实现IEvent约束事件实现IAsyncEvnetHandlerTEvent where TEvent:IEvent来约束事件的处理程序。
public interface IEvent
{}public interface IAsyncEventHandlerin TEvent where TEvent : IEvent
{Task HandleAsync(IEvent event);void HandleException(IEvent event, Exception ex);
}接下来规定一下咱们的IEventBus会有哪些操作方法。基本就是发布和订阅。
public interface IEventBus
{void PublishTEvent(TEvent event) where TEvent : IEvent;Task PublishAsyncTEvent(TEvent event) where TEvent : IEvent;void OnSubscribeTEvent() where TEvent : IEvent;
}实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现一种是LocalEventBusManager即本地事件管理第二种是LocalEventBusPool池化本地事件。
LocalEvnetBusManager
LocalEventBusManager主要在单一管道内进行处理集中进行消费。
public interface ILocalEventBusManagerin TEventwhere TEvent : IEvent
{void Publish(TEvent event);Task PublishAsync(TEvent event) ;void AutoHandle();
}public class LocalEventBusManagerTEvent(IServiceProvider serviceProvider):ILocalEventBusManagerTEventwhere TEvent: IEvent
{readonly IServiceProvider _servicesProvider serviceProvider;private readonly ChannelTEvent _eventChannel Channel.CreateUnboundedTEvent();public void Publish(TEvent event){Debug.Assert(_eventChannel ! null, nameof(_eventChannel) ! null);_eventChannel.Writer.WriteAsync(event);}private CancellationTokenSource Cts { get; } new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent event){await _eventChannel.Writer.WriteAsync(event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () {while (!Cts.IsCancellationRequested){var reader await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent event){var handler _servicesProvider.GetServiceIAsyncEventHandlerTEvent();if (handler is null){throw new NullReferenceException($No handler for event {event.GetType().Name});}try{await handler.HandleAsync(event);}catch (Exception ex){handler.HandleException( event, ex);}}
}LocalEventBusPool
LocalEventBusPool即所有的Event都会有一个单独的管道处理单独消费处理并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private ChannelIEvent Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key channel }, out var value);if (value ! null) return value;value Channel.CreateUnboundedIEvent();_channels.TryAdd(new ChannelKey() { Key channel }, value);return value;}private ChannelIEvent Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value ! null) return value;value Channel.CreateUnboundedIEvent();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionaryChannelKey, ChannelIEvent _channels new();private CancellationTokenSource Cts { get; } new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsyncTEvent(TEvent event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(event);}public void PublishTEvent(TEvent event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(event);}public void OnSubscribeTEvent() where TEvent : IEvent{var channelKey _channels.FirstOrDefault(x x.Key.Key typeof(TEvent).Name).Key ??new ChannelKey() { Key typeof(TEvent).Name };channelKey.Subscribers;Task.Run(async () {try{while (!Cts.IsCancellationRequested){var event await ReadAsync(channelKey);var handler _serviceProvider.GetServiceIAsyncEventHandlerTEvent();if (handler null) throw new NullReferenceException($No handler for Event {typeof(TEvent).Name});try{await handler.HandleAsync((TEvent)event);}catch (Exception ex){handler.HandleException((TEvent)event, ex);}}}catch (Exception e){throw new InvalidOperationException(Error on onSubscribe handler, e);}}, Cts.Token);}private async TaskIEvent ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async TaskIEvent ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}LocalEventBus
实现LocalEventBus继承自IEventBus即可如果有需要扩展的方法自行添加池化和管理器的情况单独处理。
public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private LocalEventBusPool? EventBusPool serviceProvider.GetServiceLocalEventBusPool();public void PublishTEvent(TEvent event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool ! null, nameof(EventBusPool) ! null);EventBusPool.Publish(event);}else{var manager serviceProvider.GetServiceLocalEventBusManagerTEvent();if (manager is null) throw new NullReferenceException($No manager for event {typeof(TEvent).Name}, please add singleton service it.);manager.Publish(event);}}public async Task PublishAsyncTEvent(TEvent event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool ! null, nameof(EventBusPool) ! null);await EventBusPool.PublishAsync(event);}else{var manager serviceProvider.GetServiceLocalEventBusManagerTEvent();if (manager is null) throw new NullReferenceException($No manager for event {typeof(TEvent).Name}, please add singleton service it.);await manager.PublishAsync(event);}}public void OnSubscribeTEvent() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool ! null, nameof(EventBusPool) ! null);EventBusPool.OnSubscribeTEvent();}else{var manager serviceProvider.GetServiceLocalEventBusManagerTEvent();if (manager is null) throw new NullReferenceException($No manager for event {typeof(TEvent).Name}, please add singleton service it.);manager.AutoHandle();}}
}分布式事件总线
根据需要扩展即可基本逻辑相同但可能需要增加确认机制等。