diff --git a/src/NebulaBus/INebulaBus.cs b/src/NebulaBus/INebulaBus.cs index 3b772b2fafe0ccee53891825016dbfca15c594cb..a64dcef1e25677e0f9a80b97245fce54c5246bde 100644 --- a/src/NebulaBus/INebulaBus.cs +++ b/src/NebulaBus/INebulaBus.cs @@ -6,9 +6,10 @@ namespace NebulaBus { public interface INebulaBus { - Task PublishAsync(string name, T message); - Task PublishAsync(string name, T message, IDictionary headers); - Task PublishAsync(TimeSpan delay, string name, T message); - Task PublishAsync(TimeSpan delay, string name, T message, IDictionary headers); + Task PublishAsync(string name, T message) where T : class, new(); + Task PublishAsync(string name, T message, IDictionary headers) where T : class, new(); + Task PublishAsync(TimeSpan delay, string name, T message) where T : class, new(); + Task PublishAsync(TimeSpan delay, string name, T message, IDictionary headers) + where T : class, new(); } } \ No newline at end of file diff --git a/src/NebulaBus/NebulaBusService.cs b/src/NebulaBus/NebulaBusService.cs index 7c3ec0602178ea19f4dc60f8a825a2ea2faea40d..b8383af12c12bd0d6a984e75bfcbf44b94c036af 100644 --- a/src/NebulaBus/NebulaBusService.cs +++ b/src/NebulaBus/NebulaBusService.cs @@ -24,6 +24,7 @@ namespace NebulaBus } public async Task PublishAsync(string group, T message) + where T : class, new() { var header = BuildNebulaHeader(group); foreach (var processor in _processors) @@ -33,6 +34,7 @@ namespace NebulaBus } public async Task PublishAsync(string group, T message, IDictionary headers) + where T : class, new() { var header = BuildNebulaHeader(group, headers); foreach (var processor in _processors) @@ -42,6 +44,7 @@ namespace NebulaBus } public async Task PublishAsync(TimeSpan delay, string group, T message) + where T : class, new() { var header = BuildNebulaHeader(group); await _delayMessageScheduler.Schedule(new DelayStoreMessage() @@ -56,6 +59,7 @@ namespace NebulaBus } public async Task PublishAsync(TimeSpan delay, string group, T message, IDictionary headers) + where T : class, new() { var header = BuildNebulaHeader(group, headers); await _delayMessageScheduler.Schedule(new DelayStoreMessage() @@ -70,6 +74,7 @@ namespace NebulaBus } private NebulaHeader BuildNebulaHeader(string group) + where T : class, new() { var newId = _idWorker.NextId().ToString(); var header = new NebulaHeader() @@ -84,6 +89,7 @@ namespace NebulaBus } private NebulaHeader BuildNebulaHeader(string group, IDictionary headers) + where T : class, new() { var newId = _idWorker.NextId().ToString(); var header = new NebulaHeader(headers); diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 0cb8c3399217e897b71798513216c5cd2383b067..9186d2ac4d343116e158982d8020a1231c0cee41 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -46,12 +46,11 @@ namespace NebulaBus header[NebulaHeader.Consumer] = Environment.MachineName; header[NebulaHeader.Name] = Name; header[NebulaHeader.Group] = Group; - int.TryParse(header[NebulaHeader.RetryCount], out var retryCount); + var retryCount = header.GetRetryCount(); try { if (retryCount > MaxRetryCount) return; - header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); //首次执行若发生异常直接重试三次 if (retryCount == 0) @@ -76,6 +75,8 @@ namespace NebulaBus //no retry if (MaxRetryCount == 0) return; + header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); + //First Time to retry,if no delay then send directly if (retryCount == 0 && RetryDelay.TotalSeconds <= 0) { diff --git a/src/NebulaBus/NebulaHeader.cs b/src/NebulaBus/NebulaHeader.cs index b6de3a0647f2fbef6ed7d464272b34ee271d34ca..5a98b4016cbcc089cc6cb25f2558fee88d796498 100644 --- a/src/NebulaBus/NebulaHeader.cs +++ b/src/NebulaBus/NebulaHeader.cs @@ -19,6 +19,14 @@ namespace NebulaBus private readonly Dictionary _dic; + public string GetRequestId() => this[RequestId]; + public string GetMessageId() => this[MessageId]; + public int GetRetryCount() + { + int.TryParse(this[RetryCount], out var result); + return result; + } + public NebulaHeader() { _dic = new Dictionary(); diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 0180629c29f9ea5c0c04794e7efe65bc85d0d3a3..1fa6977c015be49e4ae187203f1e43efacd93710 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -69,9 +69,12 @@ namespace NebulaBus.Rabbitmq await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct); await channel.QueueDeclareAsync(handler.Name, false, false, false, null); //Bind Group RoutingKey - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Group, null); + if (!string.IsNullOrEmpty(handler.Group)) + await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Group, null); + //Bind Name RoutingKey - await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Name, null); + if (!string.IsNullOrEmpty(handler.Name)) + await channel.QueueBindAsync(handler.Name, _rabbitmqOptions.ExchangeName, handler.Name, null); _channels.Add(channel); var consumer = new AsyncEventingBasicConsumer(channel); @@ -89,6 +92,7 @@ namespace NebulaBus.Rabbitmq if (item.Value is byte[] bytes) header.Add(item.Key, Encoding.UTF8.GetString(bytes)); } } + await handler.Subscribe(this, _delayMessageScheduler, message, header); await channel.BasicAckAsync(ea.DeliveryTag, false); }; @@ -104,6 +108,7 @@ namespace NebulaBus.Rabbitmq _logger.LogError($"Processor {this.GetType().Name} not started"); return; } + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); var props = new BasicProperties() { diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index 6d06d74aa84c0566d1c901630c6bcacd0970cd17..35ce06010e5b830ba3084d377616aca8b0f4322e 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -6,6 +6,8 @@ using NebulaBus.Store; using NebulaBus.Store.Redis; using Quartz; using System; +using System.Linq; +using System.Reflection; using NebulaBus.Store.Memory; using Quartz.Simpl; using Quartz.Spi; @@ -23,7 +25,6 @@ namespace Microsoft.Extensions.DependencyInjection setupAction(options); services.AddSingleton(options); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); //Schedule job @@ -38,7 +39,8 @@ namespace Microsoft.Extensions.DependencyInjection if (!string.IsNullOrEmpty(options.RedisConnectionString)) { var redisClient = new CSRedis.CSRedisClient(options.RedisConnectionString); - services.AddSingleton(redisClient); + services.AddKeyedSingleton("NebulaBusRedis", redisClient); + services.AddSingleton(); } else { @@ -48,16 +50,26 @@ namespace Microsoft.Extensions.DependencyInjection services.AddHostedService(); } - public static void AddNebulaBusHandler(this IServiceCollection services) - where H : NebulaHandler + public static void AddNebulaBusHandler(this IServiceCollection services) + where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); } - public static void AddNebulaBusHandler(this IServiceCollection services) - where H : NebulaHandler + public static void AddNebulaBusHandler(this IServiceCollection services) + where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); + } + + public static void AddNebulaBusHandler(this IServiceCollection services, Assembly assembly) + { + var types = assembly.GetTypes() + .Where(t => t.IsClass && !t.IsAbstract && typeof(NebulaHandler).IsAssignableFrom(t)); + foreach (var typeItem in types) + { + services.TryAddEnumerable(ServiceDescriptor.Singleton(typeof(NebulaHandler), typeItem)); + } } } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 2f1684dc1393a50187948e5b577e96ec67320c68..8e317cd9ab8a00dd5000f0c74ac2765e1623e5a3 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,6 +1,8 @@ -using CSRedis; +using System; +using CSRedis; using System.Collections.Generic; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; namespace NebulaBus.Store.Redis { @@ -11,14 +13,15 @@ namespace NebulaBus.Store.Redis private readonly CSRedisClient _redisClient; - public RedisStore(CSRedisClient cSRedisClient) + public RedisStore(IServiceProvider provider) { - _redisClient = cSRedisClient; + _redisClient = provider.GetKeyedService("NebulaBusRedis")!; } public async Task Add(DelayStoreMessage delayStoreMessage) { - await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", delayStoreMessage); + await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", + delayStoreMessage); } public async Task Delete(DelayStoreMessage delayStoreMessage) diff --git a/src/WebApplicationSample/Controllers/WeatherForecastController.cs b/src/WebApplicationSample/Controllers/WeatherForecastController.cs index d52283761332fc6e8387546fcb17ace61ee597de..4d73e9ac210d80792b93af747d8ec1ed3f26452f 100644 --- a/src/WebApplicationSample/Controllers/WeatherForecastController.cs +++ b/src/WebApplicationSample/Controllers/WeatherForecastController.cs @@ -26,13 +26,18 @@ namespace WebApplicationSample.Controllers public IEnumerable Publish() { _logger.LogInformation($"{DateTime.Now} Start send Message"); - _bus.PublishAsync("NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }); + _bus.PublishAsync("NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }, + new Dictionary() + { + { "customHeader", "123456" }, + { NebulaHeader.RequestId, "8889999" }, + }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -43,11 +48,11 @@ namespace WebApplicationSample.Controllers _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler", new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -57,11 +62,11 @@ namespace WebApplicationSample.Controllers _logger.LogInformation($"{DateTime.Now} Start send Message"); _bus.PublishAsync("NebulaBus.TestHandler.V1", new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast - { - Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), - TemperatureC = Random.Shared.Next(-20, 55), - Summary = Summaries[Random.Shared.Next(Summaries.Length)] - }) + { + Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), + TemperatureC = Random.Shared.Next(-20, 55), + Summary = Summaries[Random.Shared.Next(Summaries.Length)] + }) .ToArray(); } @@ -69,7 +74,8 @@ namespace WebApplicationSample.Controllers public IEnumerable DelaySend() { _logger.LogInformation($"{DateTime.Now} Start send Message"); - _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler.V1", new TestMessage { Message = "Hello World" }); + _bus.PublishAsync(TimeSpan.FromSeconds(5), "NebulaBus.TestHandler.V1", + new TestMessage { Message = "Hello World" }); return Enumerable.Range(1, 5).Select(index => new WeatherForecast { Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)), diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index 73ffc64619afe8e3bf36c6afca3006b737b42173..fca7b449145e001c23c3537796f8529f17a2fed3 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -12,7 +12,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { - Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message}"); + Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}"); throw new Exception("Test Exception"); } } diff --git a/src/WebApplicationSample/Program.cs b/src/WebApplicationSample/Program.cs index c2a8b85bd9ea6bc1e64d50f3af8238506916aa70..469fccf0e089cdc759f5b2234878ccebf2c109a5 100644 --- a/src/WebApplicationSample/Program.cs +++ b/src/WebApplicationSample/Program.cs @@ -29,8 +29,9 @@ builder.Services.AddNebulaBus(options => rabbitmq.VirtualHost = configuration!.GetValue("RabbitMq:VirtualHost"); }); }); -builder.Services.AddNebulaBusHandler(); -builder.Services.AddNebulaBusHandler(); +builder.Services.AddNebulaBusHandler(typeof(TestHandlerV1).Assembly); +// builder.Services.AddNebulaBusHandler(); +// builder.Services.AddNebulaBusHandler(); var app = builder.Build();