diff --git a/src/NebulaBus/Bootstrapper.cs b/src/NebulaBus/Bootstrapper.cs index b4dc8ed7b377773d587704e3abafa6ecfa8d2f85..56ddb27dbf76919b81b7dce2708f4eb968b96be1 100644 --- a/src/NebulaBus/Bootstrapper.cs +++ b/src/NebulaBus/Bootstrapper.cs @@ -49,7 +49,7 @@ namespace NebulaBus try { _cts.Token.ThrowIfCancellationRequested(); - await processor.Start(_cts.Token); + await processor.Start(_cts.Token).ConfigureAwait(false); } catch (Exception ex) { @@ -58,7 +58,7 @@ namespace NebulaBus } //Start Store Scheduler - await _delayMessageScheduler.StartSchedule(_cts.Token); + await _delayMessageScheduler!.StartSchedule(_cts.Token).ConfigureAwait(false); } public override async Task StopAsync(CancellationToken cancellationToken) diff --git a/src/NebulaBus/HandlerInfo.cs b/src/NebulaBus/HandlerInfo.cs new file mode 100644 index 0000000000000000000000000000000000000000..d9ea6c6231af224be884b1c3af0de57e47a1e5e4 --- /dev/null +++ b/src/NebulaBus/HandlerInfo.cs @@ -0,0 +1,12 @@ +using System; + +namespace NebulaBus +{ + internal class HandlerInfo + { + public string Name { get; set; } + public string Group { get; set; } + public Type Type { get; set; } + public byte ExcuteThreadCount { get; set; } + } +} diff --git a/src/NebulaBus/NebulaExecutor.cs b/src/NebulaBus/NebulaExecutor.cs deleted file mode 100644 index 40d9450b2b0b22843765cc7e851bf5e6a358481e..0000000000000000000000000000000000000000 --- a/src/NebulaBus/NebulaExecutor.cs +++ /dev/null @@ -1,66 +0,0 @@ -using NebulaBus.Scheduler; -using System; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; - -namespace NebulaBus -{ - internal class NebulaExecutor - { - private readonly Channel<(T id, string message, NebulaHeader header)> _channel; - private readonly byte _threadCount; - private readonly NebulaHandler _handler; - private readonly IProcessor _processor; - private readonly IDelayMessageScheduler _delayMessageScheduler; - private readonly SemaphoreSlim _semaphore; - - public NebulaExecutor(IProcessor processor, NebulaHandler nebulaHandler, IDelayMessageScheduler delayMessageScheduler, byte threadCount) - { - _processor = processor; - _delayMessageScheduler = delayMessageScheduler; - _threadCount = nebulaHandler.ExecuteThreadCount > 0 ? nebulaHandler.ExecuteThreadCount.Value : threadCount; - _handler = nebulaHandler; - _channel = Channel.CreateUnbounded<(T, string, NebulaHeader)>(); - _semaphore = new SemaphoreSlim(1, 1); - } - - public async Task Enqueue(T id, string message, NebulaHeader header) - { - await _channel.Writer.WriteAsync((id, message, header)); - } - - public void Start(CancellationToken cancellationToken, Func callback) - { - for (int i = 0; i < _threadCount; i++) - { - Task.Run(async () => - { - while (await _channel.Reader.WaitToReadAsync()) - { - if (cancellationToken.IsCancellationRequested) - break; - if (_channel.Reader.TryRead(out var data)) - { - await _handler.Excute(_processor, _delayMessageScheduler, data.message, data.header); - await CallBack(data.id, callback); - } - } - }); - } - } - - private async Task CallBack(T id, Func callback) - { - await _semaphore.WaitAsync(); - try - { - await callback(id); - } - finally - { - _semaphore.Release(); - } - } - } -} \ No newline at end of file diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 0731b0e14a52b5853c639b19e65bb88d20a23e3c..cd52e2f9716df2cb7d50c07176cbbbdb42992d29 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -12,7 +12,7 @@ namespace NebulaBus public virtual TimeSpan RetryInterval => TimeSpan.FromSeconds(10); public virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5); public virtual int MaxRetryCount => 10; - public virtual byte? ExecuteThreadCount => 1; + public virtual byte? ExecuteThreadCount => null; internal abstract Task Excute(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header); diff --git a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs new file mode 100644 index 0000000000000000000000000000000000000000..632e3b6f3d8fb0d37a21c128b85a6c92020547ed --- /dev/null +++ b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs @@ -0,0 +1,39 @@ +using RabbitMQ.Client; +using System; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace NebulaBus.Rabbitmq +{ + internal class NebulaRabbitmqConsumer : AsyncDefaultBasicConsumer + { + private readonly Func _excute; + public NebulaRabbitmqConsumer(IChannel channel, int prefetchSize, Func excute) : base(channel) + { + _excute = excute; + } + + public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken = default) + { + var message = Encoding.UTF8.GetString(body.Span); + var header = new NebulaHeader(); + if (properties.Headers != null) + { + foreach (var item in properties.Headers!) + { + if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); + } + } + + await _excute(message, header).ConfigureAwait(false); + await BasicAckAsync(deliveryTag, cancellationToken); + } + + public async Task BasicAckAsync(ulong deliveryTag, CancellationToken cancellationToken = default) + { + if (Channel.IsOpen) + await Channel.BasicAckAsync(deliveryTag, false, cancellationToken); + } + } +} diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 877376049eca1801635b9851f5419990c8a79e6a..ebeb16941a100110afe41503fea6bb7ee10287e9 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -2,9 +2,9 @@ using Microsoft.Extensions.Logging; using NebulaBus.Scheduler; using RabbitMQ.Client; -using RabbitMQ.Client.Events; using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -55,70 +55,11 @@ namespace NebulaBus.Rabbitmq { try { - ConnectionFactory factory = new ConnectionFactory(); - factory.UserName = _rabbitmqOptions.UserName; - factory.Password = _rabbitmqOptions.Password; - factory.VirtualHost = _rabbitmqOptions.VirtualHost; - factory.HostName = _rabbitmqOptions.HostName; - factory.AutomaticRecoveryEnabled = true; - factory.ClientProvidedName = $"NebulaBus:{Environment.MachineName}"; - - _connection = await factory.CreateConnectionAsync(); - //Sender Channel - _senderChannel = await _connection.CreateChannelAsync(); + _senderChannel = await CreateSenderChannel(); _channels.Add(_senderChannel); - var _nebulaHandlers = _serviceProvider.GetServices(); - if (_nebulaHandlers != null) - { - foreach (var handler in _nebulaHandlers) - { - var channel = await _connection.CreateChannelAsync(); - var getQos = _rabbitmqOptions.GetQos?.Invoke(handler.Name, handler.Group); - var qos = getQos > 0 ? getQos : _rabbitmqOptions.Qos; - if (qos > 0) - await channel.BasicQosAsync(0, qos.Value, false); - - //Create Exchange - await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); - //Create Queue - await channel.QueueDeclareAsync(handler.Name, true, false, false, null); - - //Bind Group RoutingKey - if (!string.IsNullOrEmpty(handler.Group)) - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Group, null); - - //Bind Name RoutingKey - if (!string.IsNullOrEmpty(handler.Name)) - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Name, null); - - _channels.Add(channel); - var consumer = new AsyncEventingBasicConsumer(channel); - await channel.BasicConsumeAsync(handler.Name, false, consumer, cancellationToken); - - var excutor = new NebulaExecutor(this, handler, _delayMessageScheduler,_nebulaOptions.ExecuteThreadCount); - excutor.Start(cancellationToken, async (tag) => - { - await channel.BasicAckAsync(tag, false); - }); - consumer.ReceivedAsync += async (ch, ea) => - { - var body = ea.Body.ToArray(); - var message = Encoding.UTF8.GetString(body); - var header = new NebulaHeader(); - if (ea.BasicProperties.Headers != null) - { - foreach (var item in ea.BasicProperties.Headers!) - { - if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); - } - } - - await excutor.Enqueue(ea.DeliveryTag, message, header); - }; - } - } + await RegisteConsumer(cancellationToken); _started = true; } catch (Exception ex) @@ -155,5 +96,102 @@ namespace NebulaBus.Rabbitmq _semaphore.Release(); } } + + private async Task GetConnection() + { + if (_connection == null || !_connection.IsOpen) + { + var connectionFactory = new ConnectionFactory() + { + HostName = _rabbitmqOptions.HostName, + UserName = _rabbitmqOptions.UserName, + Password = _rabbitmqOptions.Password, + VirtualHost = _rabbitmqOptions.VirtualHost, + AutomaticRecoveryEnabled = true, + ClientProvidedName = $"NebulaBus:{Environment.MachineName}" + }; + _connection = await connectionFactory.CreateConnectionAsync(); + } + return _connection; + } + + private async Task CreateSenderChannel() + { + var connection = await GetConnection(); + if (_senderChannel == null || !_senderChannel.IsOpen) + _senderChannel = await connection.CreateChannelAsync(); + return _senderChannel; + } + + private async Task CreateNewChannel() + { + await _semaphore.WaitAsync(); + try + { + var connection = await GetConnection(); + return await connection.CreateChannelAsync(); + } + finally + { + _semaphore.Release(); + } + } + + private async Task RegisteConsumer(CancellationToken cancellationToken) + { + var _nebulaHandlers = _serviceProvider.GetServices(); + if (_nebulaHandlers == null) return; + + var handlerInfos = _nebulaHandlers.Select(x => new HandlerInfo() + { + Name = x.Name, + Group = x.Group, + ExcuteThreadCount = x.ExecuteThreadCount.HasValue ? x.ExecuteThreadCount.Value : _nebulaOptions.ExecuteThreadCount, + Type = x.GetType() + }); + + foreach (var info in handlerInfos) + { + var getQos = _rabbitmqOptions.GetQos?.Invoke(info.Name, info.Group); + var qos = getQos > 0 ? getQos : _rabbitmqOptions.Qos; + + await RegisteConsumerByConfig(info, qos.Value, cancellationToken); + } + } + + private async Task RegisteConsumerByConfig(HandlerInfo handlerInfo, ushort qos, CancellationToken cancellationToken) + { + //每个handler创建一个channel 一个consumer + for (byte i = 0; i < handlerInfo.ExcuteThreadCount; i++) + { + var channel = await CreateNewChannel(); + _channels.Add(channel); + + if (qos > 0) + await channel.BasicQosAsync(0, qos, false); + + //Create Exchange + await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); + //Create Queue + await channel.QueueDeclareAsync(handlerInfo.Name, true, false, false, null); + + //Bind Group RoutingKey + if (!string.IsNullOrEmpty(handlerInfo.Group)) + await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Group, null); + + //Bind Name RoutingKey + if (!string.IsNullOrEmpty(handlerInfo.Name)) + await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); + + //Create Consumer + var consumer = new NebulaRabbitmqConsumer(channel, qos, async (message, header) => + { + var handler = _serviceProvider.GetService(handlerInfo.Type) as NebulaHandler; + if (handler == null) return; + await handler.Excute(this, _delayMessageScheduler!, message, header); + }); + await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); + } + } } } \ No newline at end of file diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 2ff11b1f90057fef9bfd1f16e6b1a708d7604c16..9bcfbc10055d058ecdbb6b2ff4a7c447a0d4efa6 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -52,14 +52,8 @@ namespace NebulaBus.Scheduler _scheduler.JobFactory = _jobFactory; await _scheduler.Start(cts.Token); - while (true) + while (!cts.IsCancellationRequested) { - if (cts.IsCancellationRequested) - { - _logger.LogInformation("Cancelling loop lock scheduler"); - return; - } - //lock var gotLock = _store.Lock(); if (!gotLock) @@ -68,14 +62,8 @@ namespace NebulaBus.Scheduler continue; } - while (true) + while (!cts.IsCancellationRequested) { - if (cts.IsCancellationRequested) - { - _logger.LogInformation("Cancelling loop scheduler"); - return; - } - await ScheduleJobFromStore(cts.Token); await Task.Delay(1000, cts.Token); } @@ -98,7 +86,7 @@ namespace NebulaBus.Scheduler { try { - var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(1).ToUnixTimeSeconds()); + var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(3).ToUnixTimeSeconds()); if (delayMessages == null) return; foreach (var delayMessage in delayMessages) { diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index dd5a625362dc0cc661272560d16eb0ee5b591a15..62e12525708a557b34b7005b7967ca45c727192c 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -6,11 +6,11 @@ using NebulaBus.Scheduler; using NebulaBus.Store; using NebulaBus.Store.Memory; using NebulaBus.Store.Redis; +using Newtonsoft.Json; using Quartz; using Quartz.Spi; using System; using System.Linq; -using Newtonsoft.Json; using System.Reflection; namespace Microsoft.Extensions.DependencyInjection @@ -57,23 +57,25 @@ namespace Microsoft.Extensions.DependencyInjection public static void AddNebulaBusHandler(this IServiceCollection services) where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Transient()); + services.TryAddTransient(); } public static void AddNebulaBusHandler(this IServiceCollection services) where TH : NebulaHandler where TM : class, new() { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Transient()); + services.TryAddTransient(); } - public static void AddNebulaBusHandler(this IServiceCollection services, Assembly assembly) + public static void AddNebulaBusHandler(this IServiceCollection services, params Assembly[] assemblies) { - var types = assembly.GetTypes() - .Where(t => t.IsClass && !t.IsAbstract && typeof(NebulaHandler).IsAssignableFrom(t)); + var types = assemblies.SelectMany(x => x.GetTypes().Where(t => t.IsClass && !t.IsAbstract && typeof(NebulaHandler).IsAssignableFrom(t))); foreach (var typeItem in types) { - services.TryAddEnumerable(ServiceDescriptor.Singleton(typeof(NebulaHandler), typeItem)); + services.TryAddEnumerable(ServiceDescriptor.Transient(typeof(NebulaHandler), typeItem)); + services.TryAddTransient(typeItem); } } } diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index 98df9ef232771f0a89126b2a30fc61953e869284..7c221ee6a8cd7fb9fa0c1770478950557ec07f46 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -10,6 +10,7 @@ namespace WebApplicationSample.Handlers public override TimeSpan RetryDelay => TimeSpan.FromSeconds(10); public override int MaxRetryCount => 5; public override TimeSpan RetryInterval => TimeSpan.FromSeconds(10); + public override byte? ExecuteThreadCount => 2; private readonly INebulaBus _bus; @@ -20,7 +21,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { - Console.WriteLine($"{DateTime.Now} Received MessageId:{header.GetMessageId()} {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}"); + Console.WriteLine($"{DateTime.Now} [{Name}] [{nameof(TestHandlerV1)}]Received Message :{message.Message} RetryCount {header.GetRetryCount()}"); throw new Exception("Test Exception"); } } diff --git a/src/WebApplicationSample/Handlers/TestHandlerV2.cs b/src/WebApplicationSample/Handlers/TestHandlerV2.cs index 59d869d81470541e94ce2e60ab83444aba43af79..0a2b1f0272fc44b214075d890086df94263af6d8 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV2.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV2.cs @@ -7,10 +7,11 @@ namespace WebApplicationSample.Handlers { public override string Name => "NebulaBus.TestHandler.V2"; public override string Group => "NebulaBus.TestHandler"; + public override byte? ExecuteThreadCount => 4; protected override async Task Handle(TestMessage message, NebulaHeader header) { - Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message}"); + Console.WriteLine($"{DateTime.Now} [{Name}] [{nameof(TestHandlerV2)}]Received Message :{message.Message} RetryCount {header.GetRetryCount()}"); } } } \ No newline at end of file diff --git a/src/WebApplicationSample/Handlers/TestHandlerV3.cs b/src/WebApplicationSample/Handlers/TestHandlerV3.cs new file mode 100644 index 0000000000000000000000000000000000000000..b02773d5b836419f810b948649c0e58d21d056a9 --- /dev/null +++ b/src/WebApplicationSample/Handlers/TestHandlerV3.cs @@ -0,0 +1,17 @@ +using NebulaBus; +using WebApplicationSample.Messages; + +namespace WebApplicationSample.Handlers +{ + public class TestHandlerV3 : NebulaHandler + { + public override string Name => "NebulaBus.TestHandler.V1"; + public override string Group => "NebulaBus.TestHandler"; + public override byte? ExecuteThreadCount => 4; + + protected override async Task Handle(TestMessage message, NebulaHeader header) + { + Console.WriteLine($"{DateTime.Now} [{Name}] [{nameof(TestHandlerV3)}]Received Message :{message.Message} RetryCount {header.GetRetryCount()}"); + } + } +} \ No newline at end of file diff --git a/src/WebApplicationSample/Program.cs b/src/WebApplicationSample/Program.cs index 30efae0149dfb51d2a7c91b585880b2bcc656b1b..63691f8453affe400b7312938fcc7f60701cda4b 100644 --- a/src/WebApplicationSample/Program.cs +++ b/src/WebApplicationSample/Program.cs @@ -31,8 +31,8 @@ builder.Services.AddNebulaBus(options => }); }); builder.Services.AddNebulaBusHandler(typeof(TestHandlerV1).Assembly); -// builder.Services.AddNebulaBusHandler(); -// builder.Services.AddNebulaBusHandler(); +//builder.Services.AddNebulaBusHandler(); +//builder.Services.AddNebulaBusHandler(); var app = builder.Build();