From 65d310b2064b278ddefd3201daba8079aeac3845 Mon Sep 17 00:00:00 2001 From: liuhai Date: Mon, 14 Apr 2025 15:28:06 +0800 Subject: [PATCH 1/3] fix code --- src/NebulaBus/NebulaBusService.cs | 34 +++++++++++-------- .../Controllers/TestController.cs | 21 +++++++++--- .../Handlers/TestHandlerV1.cs | 2 +- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/NebulaBus/NebulaBusService.cs b/src/NebulaBus/NebulaBusService.cs index d841768..892d58a 100644 --- a/src/NebulaBus/NebulaBusService.cs +++ b/src/NebulaBus/NebulaBusService.cs @@ -48,14 +48,17 @@ namespace NebulaBus where T : class, new() { var header = BuildNebulaHeader(nameOrGroup); - _delayMessageScheduler.Schedule(new DelayStoreMessage() + await Task.Run(() => { - MessageId = header[NebulaHeader.MessageId]!, - Group = nameOrGroup, - Header = header, - Message = message, - Name = nameOrGroup, - TriggerTime = DateTimeOffset.Now.AddSeconds(delay.TotalSeconds).ToUnixTimeSeconds() + _delayMessageScheduler.Schedule(new DelayStoreMessage() + { + MessageId = header[NebulaHeader.MessageId]!, + Group = nameOrGroup, + Header = header, + Message = message, + Name = nameOrGroup, + TriggerTime = DateTimeOffset.Now.AddSeconds(delay.TotalSeconds).ToUnixTimeSeconds() + }); }); } @@ -63,14 +66,17 @@ namespace NebulaBus where T : class, new() { var header = BuildNebulaHeader(nameOrGroup, headers); - _delayMessageScheduler.Schedule(new DelayStoreMessage() + await Task.Run(() => { - MessageId = header[NebulaHeader.MessageId]!, - Group = nameOrGroup, - Header = header, - Message = message, - Name = nameOrGroup, - TriggerTime = DateTimeOffset.Now.AddSeconds(delay.TotalSeconds).ToUnixTimeSeconds() + _delayMessageScheduler.Schedule(new DelayStoreMessage() + { + MessageId = header[NebulaHeader.MessageId]!, + Group = nameOrGroup, + Header = header, + Message = message, + Name = nameOrGroup, + TriggerTime = DateTimeOffset.Now.AddSeconds(delay.TotalSeconds).ToUnixTimeSeconds() + }); }); } diff --git a/src/WebApplicationSample/Controllers/TestController.cs b/src/WebApplicationSample/Controllers/TestController.cs index c27ceeb..4026bba 100644 --- a/src/WebApplicationSample/Controllers/TestController.cs +++ b/src/WebApplicationSample/Controllers/TestController.cs @@ -85,15 +85,28 @@ namespace WebApplicationSample.Controllers }).ToArray(); } - [HttpGet("StressTest")] - public async Task StressTest() + [HttpGet("StressTestSend")] + public async Task StressTestSend() { - _logger.LogInformation($"{DateTime.Now} Start send StressTest Message"); + _logger.LogInformation($"{DateTime.Now} Start send StressTestSend Message"); var tasks = new List(); for (int i = 0; i < 2000; i++) { - tasks.Add(_bus.PublishAsync("NebulaBus.TestHandler.V4", new TestMessage { Message = $"StressTest Message{i}" })); + tasks.Add(_bus.PublishAsync("NebulaBus.TestHandler.V4", new TestMessage { Message = $"StressTestSend Message{i}" })); + } + await Task.WhenAll(tasks); + } + + [HttpGet("StressTestPublish")] + public async Task StressTestPublish() + { + _logger.LogInformation($"{DateTime.Now} Start send StressTestPublish Message"); + + var tasks = new List(); + for (int i = 0; i < 2000; i++) + { + tasks.Add(_bus.PublishAsync("NebulaBus.TestHandler", new TestMessage { Message = $"StressTestPublish Message{i}" })); } await Task.WhenAll(tasks); } diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index 82549a9..43effcc 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -22,7 +22,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { Console.WriteLine($"{DateTime.Now} [{Name}] [{nameof(TestHandlerV1)}]Received Message :{message.Message} RetryCount {header.GetRetryCount()}"); - throw new Exception("Test Exception"); + } } } \ No newline at end of file -- Gitee From 14fb3e37dc32b9227b2fa500d4b9d3bd4f219655 Mon Sep 17 00:00:00 2001 From: liuhai Date: Mon, 14 Apr 2025 16:45:08 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=AF=8F=E6=9D=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=8D=95=E7=8B=AC=E7=9A=84scope=E6=B6=88?= =?UTF-8?q?=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs | 15 ++++++++------- src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 6 ++++-- .../Handlers/TestHandlerV1.cs | 3 ++- .../Handlers/TestHandlerV3.cs | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs index 5995407..79cbf99 100644 --- a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs +++ b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs @@ -1,4 +1,5 @@ -using RabbitMQ.Client; +using Microsoft.Extensions.DependencyInjection; +using RabbitMQ.Client; using System; using System.Text; using System.Threading; @@ -8,12 +9,12 @@ namespace NebulaBus.Rabbitmq { internal class NebulaRabbitmqConsumer : AsyncDefaultBasicConsumer { - private readonly IServiceProvider _serviceProvider; + private readonly IServiceScopeFactory _serviceScopeFactory; private readonly Type _handlerType; - public NebulaRabbitmqConsumer(IChannel channel, IServiceProvider serviceProvider, Type handlerType) : base(channel) + public NebulaRabbitmqConsumer(IChannel channel, IServiceScopeFactory serviceScopeFactory, Type handlerType) : base(channel) { - _serviceProvider = serviceProvider; _handlerType = handlerType; + _serviceScopeFactory = serviceScopeFactory; } public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken = default) @@ -26,11 +27,11 @@ namespace NebulaBus.Rabbitmq if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); } } - - var handler = _serviceProvider.GetService(_handlerType) as NebulaHandler; + using var scope = _serviceScopeFactory.CreateScope(); + var handler = scope.ServiceProvider.GetService(_handlerType) as NebulaHandler; if (handler != null) { - await handler.Excute(_serviceProvider, body, header); + await handler.Excute(scope.ServiceProvider, body, header); } await BasicAckAsync(deliveryTag, cancellationToken); } diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 28f5efb..5c26a23 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using NebulaBus.Scheduler; using RabbitMQ.Client; using System; using System.Collections.Generic; @@ -20,14 +19,17 @@ namespace NebulaBus.Rabbitmq private bool _started; private readonly NebulaOptions _nebulaOptions; private readonly IRabbitmqChannelPool _channelPool; + private readonly IServiceScopeFactory _serviceFactory; public RabbitmqProcessor( IServiceProvider serviceProvider, + IServiceScopeFactory serviceScopeFactory, IRabbitmqChannelPool rabbitmqChannelPool, NebulaOptions nebulaOptions, ILogger logger) { _serviceProvider = serviceProvider; + _serviceFactory = serviceScopeFactory; _nebulaOptions = nebulaOptions; _rabbitmqOptions = nebulaOptions.RabbitmqOptions; _channels = new List(); @@ -135,7 +137,7 @@ namespace NebulaBus.Rabbitmq await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); //Create Consumer - var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerInfo.Type); + var consumer = new NebulaRabbitmqConsumer(channel, _serviceFactory, handlerInfo.Type); await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); } } diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index 43effcc..3c7bd4c 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -13,6 +13,7 @@ namespace WebApplicationSample.Handlers public override byte? ExecuteThreadCount => 2; private readonly INebulaBus _bus; + public int TestValue = 2; public TestHandlerV1(INebulaBus nebulaBus) { @@ -22,7 +23,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { Console.WriteLine($"{DateTime.Now} [{Name}] [{nameof(TestHandlerV1)}]Received Message :{message.Message} RetryCount {header.GetRetryCount()}"); - + TestValue = 3; } } } \ No newline at end of file diff --git a/src/WebApplicationSample/Handlers/TestHandlerV3.cs b/src/WebApplicationSample/Handlers/TestHandlerV3.cs index 3378937..245e109 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV3.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV3.cs @@ -5,7 +5,7 @@ namespace WebApplicationSample.Handlers { public class TestHandlerV3 : NebulaHandler { - public override string Name => "NebulaBus.TestHandler.V1"; + public override string Name => "NebulaBus.TestHandler.V3"; public override string Group => "NebulaBus.TestHandler"; public override byte? ExecuteThreadCount => 4; -- Gitee From 237244314e2965226398c6c76b1ef8920e014596 Mon Sep 17 00:00:00 2001 From: liuhai Date: Mon, 14 Apr 2025 17:11:35 +0800 Subject: [PATCH 3/3] update --- src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs | 8 ++++---- src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs index 79cbf99..0373224 100644 --- a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs +++ b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs @@ -9,12 +9,12 @@ namespace NebulaBus.Rabbitmq { internal class NebulaRabbitmqConsumer : AsyncDefaultBasicConsumer { - private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly IServiceProvider _serviceProvider; private readonly Type _handlerType; - public NebulaRabbitmqConsumer(IChannel channel, IServiceScopeFactory serviceScopeFactory, Type handlerType) : base(channel) + public NebulaRabbitmqConsumer(IChannel channel, IServiceProvider serviceProvider, Type handlerType) : base(channel) { _handlerType = handlerType; - _serviceScopeFactory = serviceScopeFactory; + _serviceProvider = serviceProvider; } public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken = default) @@ -27,7 +27,7 @@ namespace NebulaBus.Rabbitmq if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); } } - using var scope = _serviceScopeFactory.CreateScope(); + using var scope = _serviceProvider.CreateScope(); var handler = scope.ServiceProvider.GetService(_handlerType) as NebulaHandler; if (handler != null) { diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 5c26a23..feac0b8 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -19,17 +19,14 @@ namespace NebulaBus.Rabbitmq private bool _started; private readonly NebulaOptions _nebulaOptions; private readonly IRabbitmqChannelPool _channelPool; - private readonly IServiceScopeFactory _serviceFactory; public RabbitmqProcessor( IServiceProvider serviceProvider, - IServiceScopeFactory serviceScopeFactory, IRabbitmqChannelPool rabbitmqChannelPool, NebulaOptions nebulaOptions, ILogger logger) { _serviceProvider = serviceProvider; - _serviceFactory = serviceScopeFactory; _nebulaOptions = nebulaOptions; _rabbitmqOptions = nebulaOptions.RabbitmqOptions; _channels = new List(); @@ -137,7 +134,7 @@ namespace NebulaBus.Rabbitmq await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); //Create Consumer - var consumer = new NebulaRabbitmqConsumer(channel, _serviceFactory, handlerInfo.Type); + var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerInfo.Type); await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); } } -- Gitee