From d420f1c19ed38323bfeed59b40624340cd570782 Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Wed, 9 Apr 2025 13:22:06 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=BD=93name=E4=B8=BA=E7=A9=BA=E5=88=99?= =?UTF-8?q?=E4=B8=8D=E7=BB=91=E5=AE=9Aroutingkey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 0180629..1fa6977 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -69,9 +69,12 @@ namespace NebulaBus.Rabbitmq await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct); await channel.QueueDeclareAsync(handler.Name, false, false, false, null); //Bind Group RoutingKey - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Group, null); + if (!string.IsNullOrEmpty(handler.Group)) + await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Group, null); + //Bind Name RoutingKey - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Name, null); + if (!string.IsNullOrEmpty(handler.Name)) + await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Name, null); _channels.Add(channel); var consumer = new AsyncEventingBasicConsumer(channel); @@ -89,6 +92,7 @@ namespace NebulaBus.Rabbitmq if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); } } + await handler.Subscribe(this, _delayMessageScheduler, message, header); await channel.BasicAckAsync(ea.DeliveryTag, false); }; @@ -104,6 +108,7 @@ namespace NebulaBus.Rabbitmq _logger.LogError($"Processor {this.GetType().Name} not started"); return; } + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); var props = new BasicProperties() { -- Gitee From 591da556bb46807ec012ef13fc45fd9b6dd34bcf Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Wed, 9 Apr 2025 14:01:51 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=AC=A1=E6=95=B0=E8=AE=A1=E6=95=B0=E9=94=99=E8=AF=AF=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/NebulaHandler.cs | 7 ++-- src/NebulaBus/ServiceCollectionExtensions.cs | 4 +- src/NebulaBus/Store/Redis/RedisStore.cs | 11 +++-- .../Controllers/WeatherForecastController.cs | 40 +++++++++++-------- .../Handlers/TestHandlerV1.cs | 2 +- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 0cb8c33..e2f20a6 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -47,12 +47,11 @@ namespace NebulaBus header[NebulaHeader.Name] = Name; header[NebulaHeader.Group] = Group; int.TryParse(header[NebulaHeader.RetryCount], out var retryCount); - + try { if (retryCount > MaxRetryCount) return; - header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); - + //首次执行若发生异常直接重试三次 if (retryCount == 0) { @@ -76,6 +75,8 @@ namespace NebulaBus //no retry if (MaxRetryCount == 0) return; + header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); + //First Time to retry,if no delay then send directly if (retryCount == 0 && RetryDelay.TotalSeconds <= 0) { diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index 6d06d74..2859fc3 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -23,7 +23,6 @@ namespace Microsoft.Extensions.DependencyInjection setupAction(options); services.AddSingleton(options); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); //Schedule job @@ -38,7 +37,8 @@ namespace Microsoft.Extensions.DependencyInjection if (!string.IsNullOrEmpty(options.RedisConnectionString)) { var redisClient = new CSRedis.CSRedisClient(options.RedisConnectionString); - services.AddSingleton(redisClient); + services.AddKeyedSingleton("NebulaBusRedis", redisClient); + services.AddSingleton(); } else { diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 2f1684d..8e317cd 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,6 +1,8 @@ -using CSRedis; +using System; +using CSRedis; using System.Collections.Generic; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; namespace NebulaBus.Store.Redis { @@ -11,14 +13,15 @@ namespace NebulaBus.Store.Redis private readonly CSRedisClient _redisClient; - public RedisStore(CSRedisClient cSRedisClient) + public RedisStore(IServiceProvider provider) { - _redisClient = cSRedisClient; + _redisClient = provider.GetKeyedService("NebulaBusRedis")!; } public async Task Add(DelayStoreMessage delayStoreMessage) { - await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", delayStoreMessage); + await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", + delayStoreMessage); } public async Task Delete(DelayStoreMessage delayStoreMessage) diff --git a/src/WebApplicationSample/Controllers/WeatherForecastController.cs b/src/WebApplicationSample/Controllers/WeatherForecastController.cs index d522837..4d73e9a 100644 --- a/src/WebApplicationSample/Controllers/WeatherForecastController.cs +++ b/src/WebApplicationSample/Controllers/WeatherForecastController.cs @@ -26,13 +26,18 @@ namespace WebApplicationSample.Controllers public IEnumerable Publish() { _logger.LogInformation($"{DateTime.Now} Start send Message"); - _bus.PublishAsync("NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }); + _bus.PublishAsync("NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }, + new Dictionary() + { + { "customHeader", "123456" }, + { NebulaHeader.RequestId, "8889999" }, + }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -43,11 +48,11 @@ namespace WebApplicationSample.Controllers _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -57,11 +62,11 @@ namespace WebApplicationSample.Controllers _logger.LogInformation($"{DateTime.Now} Start send Message"); _bus.PublishAsync("NebulaBus.TestHandler.V1", new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -69,7 +74,8 @@ namespace WebApplicationSample.Controllers public IEnumerable DelaySend() { _logger.LogInformation($"{DateTime.Now} Start send Message"); - _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler.V1", new TestMessage { Message = "Hello World" }); + _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler.V1", + new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast { Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index 73ffc64..fca7b44 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -12,7 +12,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { - Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message}"); + Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}"); throw new Exception("Test Exception"); } } -- Gitee From 6f2ddb70f7fb6e3e56cc1e563f610bc4ce8dafdf Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Wed, 9 Apr 2025 14:10:31 +0800 Subject: [PATCH 3/4] =?UTF-8?q?header=E5=A2=9E=E5=8A=A0=E5=87=A0=E4=B8=AA?= =?UTF-8?q?=E9=87=8D=E8=A6=81=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/NebulaHandler.cs | 8 ++++---- src/NebulaBus/NebulaHeader.cs | 8 ++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index e2f20a6..9186d2a 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -46,12 +46,12 @@ namespace NebulaBus header[NebulaHeader.Consumer] = Environment.MachineName; header[NebulaHeader.Name] = Name; header[NebulaHeader.Group] = Group; - int.TryParse(header[NebulaHeader.RetryCount], out var retryCount); - + var retryCount = header.GetRetryCount(); + try { if (retryCount > MaxRetryCount) return; - + //首次执行若发生异常直接重试三次 if (retryCount == 0) { @@ -76,7 +76,7 @@ namespace NebulaBus //no retry if (MaxRetryCount == 0) return; header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); - + //First Time to retry,if no delay then send directly if (retryCount == 0 && RetryDelay.TotalSeconds <= 0) { diff --git a/src/NebulaBus/NebulaHeader.cs b/src/NebulaBus/NebulaHeader.cs index b6de3a0..5a98b40 100644 --- a/src/NebulaBus/NebulaHeader.cs +++ b/src/NebulaBus/NebulaHeader.cs @@ -19,6 +19,14 @@ namespace NebulaBus private readonly Dictionary _dic; + public string GetRequestId() => this[RequestId]; + public string GetMessageId() => this[MessageId]; + public int GetRetryCount() + { + int.TryParse(this[RetryCount], out var result); + return result; + } + public NebulaHeader() { _dic = new Dictionary(); -- Gitee From df541e283af57db31ac97b55f43fcb8e8a29ed25 Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Wed, 9 Apr 2025 14:50:34 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B3=9B=E5=9E=8B?= =?UTF-8?q?=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/INebulaBus.cs | 9 ++++---- src/NebulaBus/NebulaBusService.cs | 6 +++++ src/NebulaBus/ServiceCollectionExtensions.cs | 24 +++++++++++++++----- src/WebApplicationSample/Program.cs | 5 ++-- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/NebulaBus/INebulaBus.cs b/src/NebulaBus/INebulaBus.cs index 3b772b2..a64dcef 100644 --- a/src/NebulaBus/INebulaBus.cs +++ b/src/NebulaBus/INebulaBus.cs @@ -6,9 +6,10 @@ namespace NebulaBus { public interface INebulaBus { - Task PublishAsync(string name, T message); - Task PublishAsync(string name, T message, IDictionary headers); - Task PublishAsync(TimeSpan delay, string name, T message); - Task PublishAsync(TimeSpan delay, string name, T message, IDictionary headers); + Task PublishAsync(string name, T message) where T : class, new(); + Task PublishAsync(string name, T message, IDictionary headers) where T : class, new(); + Task PublishAsync(TimeSpan delay, string name, T message) where T : class, new(); + Task PublishAsync(TimeSpan delay, string name, T message, IDictionary headers) + where T : class, new(); } } \ No newline at end of file diff --git a/src/NebulaBus/NebulaBusService.cs b/src/NebulaBus/NebulaBusService.cs index 7c3ec06..b8383af 100644 --- a/src/NebulaBus/NebulaBusService.cs +++ b/src/NebulaBus/NebulaBusService.cs @@ -24,6 +24,7 @@ namespace NebulaBus } public async Task PublishAsync(string group, T message) + where T : class, new() { var header = BuildNebulaHeader(group); foreach (var processor in _processors) @@ -33,6 +34,7 @@ namespace NebulaBus } public async Task PublishAsync(string group, T message, IDictionary headers) + where T : class, new() { var header = BuildNebulaHeader(group, headers); foreach (var processor in _processors) @@ -42,6 +44,7 @@ namespace NebulaBus } public async Task PublishAsync(TimeSpan delay, string group, T message) + where T : class, new() { var header = BuildNebulaHeader(group); await _delayMessageScheduler.Schedule(new DelayStoreMessage() @@ -56,6 +59,7 @@ namespace NebulaBus } public async Task PublishAsync(TimeSpan delay, string group, T message, IDictionary headers) + where T : class, new() { var header = BuildNebulaHeader(group, headers); await _delayMessageScheduler.Schedule(new DelayStoreMessage() @@ -70,6 +74,7 @@ namespace NebulaBus } private NebulaHeader BuildNebulaHeader(string group) + where T : class, new() { var newId = _idWorker.NextId().ToString(); var header = new NebulaHeader() @@ -84,6 +89,7 @@ namespace NebulaBus } private NebulaHeader BuildNebulaHeader(string group, IDictionary headers) + where T : class, new() { var newId = _idWorker.NextId().ToString(); var header = new NebulaHeader(headers); diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index 2859fc3..35ce060 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -6,6 +6,8 @@ using NebulaBus.Store; using NebulaBus.Store.Redis; using Quartz; using System; +using System.Linq; +using System.Reflection; using NebulaBus.Store.Memory; using Quartz.Simpl; using Quartz.Spi; @@ -48,16 +50,26 @@ namespace Microsoft.Extensions.DependencyInjection services.AddHostedService(); } - public static void AddNebulaBusHandler(this IServiceCollection services) - where H : NebulaHandler + public static void AddNebulaBusHandler(this IServiceCollection services) + where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); } - public static void AddNebulaBusHandler(this IServiceCollection services) - where H : NebulaHandler + public static void AddNebulaBusHandler(this IServiceCollection services) + where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); + } + + public static void AddNebulaBusHandler(this IServiceCollection services, Assembly assembly) + { + var types = assembly.GetTypes() + .Where(t => t.IsClass && !t.IsAbstract && typeof(NebulaHandler).IsAssignableFrom(t)); + foreach (var typeItem in types) + { + services.TryAddEnumerable(ServiceDescriptor.Singleton(typeof(NebulaHandler), typeItem)); + } } } } \ No newline at end of file diff --git a/src/WebApplicationSample/Program.cs b/src/WebApplicationSample/Program.cs index c2a8b85..469fccf 100644 --- a/src/WebApplicationSample/Program.cs +++ b/src/WebApplicationSample/Program.cs @@ -29,8 +29,9 @@ builder.Services.AddNebulaBus(options => rabbitmq.VirtualHost = configuration!.GetValue("RabbitMq:VirtualHost"); }); }); -builder.Services.AddNebulaBusHandler(); -builder.Services.AddNebulaBusHandler(); +builder.Services.AddNebulaBusHandler(typeof(TestHandlerV1).Assembly); +// builder.Services.AddNebulaBusHandler(); +// builder.Services.AddNebulaBusHandler(); var app = builder.Build(); -- Gitee