diff --git a/src/NebulaBus/NebulaBus.csproj b/src/NebulaBus/NebulaBus.csproj index e8d18345334ab38af98b0ea985416347164a1d8c..870058cc469a16fd96bac96fc3c71247f96b4dec 100644 --- a/src/NebulaBus/NebulaBus.csproj +++ b/src/NebulaBus/NebulaBus.csproj @@ -1,12 +1,16 @@  - + + javen liu + JiewitTech + https://github.com/JiewitTech/NebulaBus.git + MIT + README.md + $(PackageVersion) + + netstandard2.1 enable - $(GitVersion_MajorMinorPatch) - $(GitVersion_PreReleaseTag) - $(GitVersion_SemVer.Replace('+', '-')) - README.md diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 086fae7e00a425749c6d4672449c724efb49675e..3f7f5e3ca11a7a671fb0e26d7cf05fced2390447 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -2,6 +2,7 @@ using Newtonsoft.Json; using System; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; namespace NebulaBus { @@ -16,6 +17,8 @@ namespace NebulaBus internal abstract Task Subscribe(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header); + internal abstract Task FallBackSubscribe(string message, NebulaHeader header, Exception ex); + protected async Task Execute(Func operation) { for (int attempt = 1; attempt <= 4; attempt++) @@ -35,34 +38,52 @@ namespace NebulaBus } public abstract class NebulaHandler : NebulaHandler + where T : class, new() { internal override async Task Subscribe(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header) { - if (string.IsNullOrEmpty(message)) return; + if (string.IsNullOrEmpty(message)) + { + await FallBackHandler(null, header, new Exception($"message is null or empty")); + return; + } header[NebulaHeader.Consumer] = Environment.MachineName; header[NebulaHeader.Name] = Name; header[NebulaHeader.Group] = Group; var retryCount = header.GetRetryCount(); - + T data = null; try { - if (retryCount > MaxRetryCount) return; + if (retryCount > MaxRetryCount) + { + return; + } //首次执行若发生异常直接重试三次 if (retryCount == 0) { await Execute(async () => { - var data = JsonConvert.DeserializeObject(message); - if (data == null) return; + data = JsonConvert.DeserializeObject(message); + if (data == null) + { + await FallBackHandler(data, header, new Exception($"can not deserialize from:{message}")); + return; + } + await Handle(data, header); }); } else { - var data = JsonConvert.DeserializeObject(message); - if (data == null) return; + data = JsonConvert.DeserializeObject(message); + if (data == null) + { + await FallBackHandler(data, header, new Exception($"can not deserialize from:{message}")); + return; + } + await Handle(data, header); } } @@ -71,7 +92,12 @@ namespace NebulaBus header[NebulaHeader.Exception] = ex.ToString(); //no retry - if (MaxRetryCount == 0) return; + if (MaxRetryCount == 0) + { + await FallBackHandler(data, header, new Exception($"can not deserialize from:{message}")); + return; + } + header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); //First Time to retry,if no delay then send directly @@ -96,8 +122,12 @@ namespace NebulaBus return; } + //out of retry count if (retryCount >= MaxRetryCount) + { + await FallBackHandler(data, header, new Exception($"can not deserialize from:{message}")); return; + } //Interval Retry await delayMessageScheduler.Schedule(new Store.DelayStoreMessage() @@ -112,6 +142,24 @@ namespace NebulaBus } } + internal override async Task FallBackSubscribe(string message, NebulaHeader header, Exception ex) + { + try + { + var data = JsonConvert.DeserializeObject(message); + if (data == null) return; + await FallBackHandler(data, header, ex); + } + catch + { + } + } + protected abstract Task Handle(T message, NebulaHeader header); + + protected virtual async Task FallBackHandler(T handler, NebulaHeader header, Exception exception) + { + await Task.CompletedTask; + } } } \ No newline at end of file diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 2d3c859eea9c9e61d89e838d0ad23efc7244dd52..75f3f39bbefd8cc63aea0f7d1d473c3e53d01e2c 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -72,7 +72,8 @@ namespace NebulaBus.Rabbitmq //Create Exchange await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct); - await channel.QueueDeclareAsync(handler.Name, false, false, false, null); + //Create Queue + await channel.QueueDeclareAsync(handler.Name, true, false, false, null); //Bind Group RoutingKey if (!string.IsNullOrEmpty(handler.Group)) @@ -121,6 +122,7 @@ namespace NebulaBus.Rabbitmq { Headers = new Dictionary() }; + props.Persistent = true; foreach (var item in header) props.Headers.Add(item.Key, item.Value); diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index c77562753a2ab3e34e3c3b27622de22ecd855401..56b9e5ba19e3a544b613845fd492053347c53bf4 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -58,6 +58,7 @@ namespace Microsoft.Extensions.DependencyInjection public static void AddNebulaBusHandler(this IServiceCollection services) where TH : NebulaHandler + where TM : class, new() { services.TryAddEnumerable(ServiceDescriptor.Singleton()); }