From 45a45c6208fc2c30eba5559bc6009948d79068f5 Mon Sep 17 00:00:00 2001 From: liuhai Date: Sat, 12 Apr 2025 00:11:30 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=89=A7=E8=A1=8Chandler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/INebulaExecutor.cs | 66 +++++++++++++++++++ src/NebulaBus/NebulaHandler.cs | 10 +-- src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 46 ++++++++----- src/NebulaBus/Store/Redis/RedisStore.cs | 2 +- .../Handlers/TestHandlerV1.cs | 2 +- 5 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 src/NebulaBus/INebulaExecutor.cs diff --git a/src/NebulaBus/INebulaExecutor.cs b/src/NebulaBus/INebulaExecutor.cs new file mode 100644 index 0000000..64665ba --- /dev/null +++ b/src/NebulaBus/INebulaExecutor.cs @@ -0,0 +1,66 @@ +using NebulaBus.Scheduler; +using System; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace NebulaBus +{ + internal class NebulaExecutor + { + private readonly Channel<(T id, string message, NebulaHeader header)> _channel; + private readonly byte _threadCount; + private readonly NebulaHandler _handler; + private readonly IProcessor _processor; + private readonly IDelayMessageScheduler _delayMessageScheduler; + private readonly SemaphoreSlim _semaphore; + + public NebulaExecutor(IProcessor processor, NebulaHandler nebulaHandler, IDelayMessageScheduler delayMessageScheduler) + { + _processor = processor; + _delayMessageScheduler = delayMessageScheduler; + _threadCount = nebulaHandler.ExecuteThreadCount; + _handler = nebulaHandler; + _channel = Channel.CreateUnbounded<(T, string, NebulaHeader)>(); + _semaphore = new SemaphoreSlim(1, 1); + } + + public async Task Enqueue(T id, string message, NebulaHeader header) + { + await _channel.Writer.WriteAsync((id, message, header)); + } + + public void Start(CancellationToken cancellationToken, Func callback) + { + for (int i = 0; i < _threadCount; i++) + { + Task.Run(async () => + { + while (await _channel.Reader.WaitToReadAsync()) + { + if (cancellationToken.IsCancellationRequested) + break; + if (_channel.Reader.TryRead(out var data)) + { + await _handler.Excute(_processor, _delayMessageScheduler, data.message, data.header); + await CallBack(data.id, callback); + } + } + }); + } + } + + private async Task CallBack(T id, Func callback) + { + await _semaphore.WaitAsync(); + try + { + await callback(id); + } + finally + { + _semaphore.Release(); + } + } + } +} \ No newline at end of file diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index e8e1b26..3577478 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -2,7 +2,6 @@ using Newtonsoft.Json; using System; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; namespace NebulaBus { @@ -13,13 +12,14 @@ namespace NebulaBus public virtual TimeSpan RetryInterval => TimeSpan.FromSeconds(10); public virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5); public virtual int MaxRetryCount => 10; + public virtual byte ExecuteThreadCount => 1; - internal abstract Task Subscribe(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, + internal abstract Task Excute(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header); internal abstract Task FallBackSubscribe(string message, NebulaHeader header, Exception ex); - protected async Task Execute(Func operation) + protected async Task DirectRetryExecute(Func operation) { for (int attempt = 1; attempt <= 4; attempt++) { @@ -40,7 +40,7 @@ namespace NebulaBus public abstract class NebulaHandler : NebulaHandler where T : class, new() { - internal override async Task Subscribe(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, + internal override async Task Excute(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header) { if (string.IsNullOrEmpty(message)) @@ -63,7 +63,7 @@ namespace NebulaBus //首次执行若发生异常直接重试三次 if (retryCount == 0) { - await Execute(async () => + await DirectRetryExecute(async () => { data = JsonConvert.DeserializeObject(message); if (data == null) diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 1f50b79..11562aa 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -21,6 +21,7 @@ namespace NebulaBus.Rabbitmq private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private bool _started; + private readonly SemaphoreSlim _semaphore; public RabbitmqProcessor( IServiceProvider serviceProvider, @@ -33,6 +34,7 @@ namespace NebulaBus.Rabbitmq _channels = new List(); _delayMessageScheduler = delayMessageScheduler; _logger = logger; + _semaphore = new SemaphoreSlim(1, 1); } public void Dispose() @@ -93,6 +95,11 @@ namespace NebulaBus.Rabbitmq var consumer = new AsyncEventingBasicConsumer(channel); await channel.BasicConsumeAsync(handler.Name, false, consumer, cancellationToken); + var excutor = new NebulaExecutor(this, handler, _delayMessageScheduler); + excutor.Start(cancellationToken, async (tag) => + { + await channel.BasicAckAsync(tag, false); + }); consumer.ReceivedAsync += async (ch, ea) => { var body = ea.Body.ToArray(); @@ -106,8 +113,7 @@ namespace NebulaBus.Rabbitmq } } - await handler.Subscribe(this, _delayMessageScheduler, message, header); - await channel.BasicAckAsync(ea.DeliveryTag, false); + await excutor.Enqueue(ea.DeliveryTag, message, header); }; } } @@ -121,23 +127,31 @@ namespace NebulaBus.Rabbitmq public async Task Publish(string routingKey, string message, NebulaHeader header) { - if (!_started) + await _semaphore.WaitAsync(); + try { - _logger.LogError($"Processor {this.GetType().Name} not started"); - return; - } + if (!_started) + { + _logger.LogError($"Processor {this.GetType().Name} not started"); + return; + } - byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); - var props = new BasicProperties() + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); + var props = new BasicProperties() + { + Headers = new Dictionary() + }; + props.Persistent = true; + foreach (var item in header) + props.Headers.Add(item.Key, item.Value); + + await _senderChannel.BasicPublishAsync(_rabbitmqOptions.ExchangeName, routingKey, false, props, + messageBodyBytes); + } + finally { - Headers = new Dictionary() - }; - props.Persistent = true; - foreach (var item in header) - props.Headers.Add(item.Key, item.Value); - - await _senderChannel.BasicPublishAsync(_rabbitmqOptions.ExchangeName, routingKey, false, props, - messageBodyBytes); + _semaphore.Release(); + } } } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index ed2419c..c3a7937 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -37,7 +37,7 @@ namespace NebulaBus.Store.Redis public async Task GetAllByKeys(long beforeTimestamp) { var keys = await _redisClient.ZRangeByScoreAsync(IndexRedisKey, 0, beforeTimestamp); - if (keys == null) return null; + if (keys == null || keys.Length == 0) return null; var result = await _redisClient.HMGetAsync(RedisKey, keys!); //排除为空的值并删除 for (var i = 0; i < keys.Length; i++) diff --git a/src/WebApplicationSample/Handlers/TestHandlerV1.cs b/src/WebApplicationSample/Handlers/TestHandlerV1.cs index a065fab..98df9ef 100644 --- a/src/WebApplicationSample/Handlers/TestHandlerV1.cs +++ b/src/WebApplicationSample/Handlers/TestHandlerV1.cs @@ -20,7 +20,7 @@ namespace WebApplicationSample.Handlers protected override async Task Handle(TestMessage message, NebulaHeader header) { - Console.WriteLine($"{DateTime.Now} Received Message {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}"); + Console.WriteLine($"{DateTime.Now} Received MessageId:{header.GetMessageId()} {Name}:{message.Message} Header:{header["customHeader"]} RetryCount:{header[NebulaHeader.RetryCount]}"); throw new Exception("Test Exception"); } } -- Gitee From 56f5c1a29f0bd1799f671cc13aef43e16e3aa853 Mon Sep 17 00:00:00 2001 From: liuhai Date: Sat, 12 Apr 2025 00:17:57 +0800 Subject: [PATCH 2/4] rename store --- src/NebulaBus/Scheduler/DelayMessageScheduler.cs | 2 +- src/NebulaBus/Store/IStore.cs | 2 +- src/NebulaBus/Store/Memory/MemoryStore.cs | 2 +- src/NebulaBus/Store/Redis/RedisStore.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 659d08c..2ff11b1 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -98,7 +98,7 @@ namespace NebulaBus.Scheduler { try { - var delayMessages = await _store.GetAllByKeys(DateTimeOffset.Now.AddMinutes(1).ToUnixTimeSeconds()); + var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(1).ToUnixTimeSeconds()); if (delayMessages == null) return; foreach (var delayMessage in delayMessages) { diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index 519ce6a..a4ba39c 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -7,7 +7,7 @@ namespace NebulaBus.Store { Task Add(DelayStoreMessage delayStoreMessage); Task Delete(DelayStoreMessage delayStoreMessage); - Task GetAllByKeys(long beforeTimestamp); + Task Get(long beforeTimestamp); bool Lock(); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Memory/MemoryStore.cs b/src/NebulaBus/Store/Memory/MemoryStore.cs index 6ddc752..ddadd1b 100644 --- a/src/NebulaBus/Store/Memory/MemoryStore.cs +++ b/src/NebulaBus/Store/Memory/MemoryStore.cs @@ -26,7 +26,7 @@ namespace NebulaBus.Store.Memory await Task.CompletedTask; } - public async Task GetAllByKeys(long beforeTimestamp) + public async Task Get(long beforeTimestamp) { var result = _storeMessages.Values.ToArray(); return await Task.FromResult(result); diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index c3a7937..7105791 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -34,7 +34,7 @@ namespace NebulaBus.Store.Redis await _redisClient.HDelAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); } - public async Task GetAllByKeys(long beforeTimestamp) + public async Task Get(long beforeTimestamp) { var keys = await _redisClient.ZRangeByScoreAsync(IndexRedisKey, 0, beforeTimestamp); if (keys == null || keys.Length == 0) return null; -- Gitee From efea05b96f84c542e90056cedc221b3904fc4ee1 Mon Sep 17 00:00:00 2001 From: liuhai Date: Sat, 12 Apr 2025 01:05:30 +0800 Subject: [PATCH 3/4] add freeredis --- src/NebulaBus/NebulaBus.csproj | 4 +- src/NebulaBus/ServiceCollectionExtensions.cs | 7 +-- src/NebulaBus/Store/Redis/RedisStore.cs | 45 ++++++++++++++------ 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/src/NebulaBus/NebulaBus.csproj b/src/NebulaBus/NebulaBus.csproj index 870058c..458e31d 100644 --- a/src/NebulaBus/NebulaBus.csproj +++ b/src/NebulaBus/NebulaBus.csproj @@ -15,12 +15,14 @@ + + - + diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index 56b9e5b..d5266f9 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection.Extensions; +using FreeRedis; +using Microsoft.Extensions.DependencyInjection.Extensions; using NebulaBus; using NebulaBus.Rabbitmq; using NebulaBus.Scheduler; @@ -38,8 +39,8 @@ namespace Microsoft.Extensions.DependencyInjection //Delay Message Store if (!string.IsNullOrEmpty(options.RedisConnectionString)) { - var redisClient = new CSRedis.CSRedisClient(options.RedisConnectionString); - services.AddKeyedSingleton("NebulaBusRedis", redisClient); + var freeRedisClient = new RedisClient(options.RedisConnectionString); + services.AddKeyedSingleton("NebulaBusRedis", freeRedisClient); services.AddSingleton(); } else diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 7105791..626622c 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,5 +1,6 @@ -using CSRedis; +using FreeRedis; using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json; using System; using System.Linq; using System.Threading.Tasks; @@ -11,46 +12,66 @@ namespace NebulaBus.Store.Redis private string RedisKey => $"NebulaBus:{_nebulaOptions.ClusterName}.Store"; private string IndexRedisKey => $"NebulaBus:{_nebulaOptions.ClusterName}.StoreIndex"; - private readonly CSRedisClient _redisClient; + private readonly RedisClient _redisClient; private readonly NebulaOptions _nebulaOptions; - private CSRedisClientLock _redisClientLock; + private RedisClient.LockController _redisClientLock; public RedisStore(IServiceProvider provider, NebulaOptions nebulaOptions) { - _redisClient = provider.GetKeyedService("NebulaBusRedis")!; + _redisClient = provider.GetKeyedService("NebulaBusRedis")!; _nebulaOptions = nebulaOptions; } public async Task Add(DelayStoreMessage delayStoreMessage) { - await _redisClient.ZAddAsync(IndexRedisKey, (delayStoreMessage.TriggerTime, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}")); - await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", - delayStoreMessage); + //var redis = new CSRedis.CSRedisClient(""); + using (var tran = _redisClient.Multi()) + { + tran.ZAdd(IndexRedisKey, delayStoreMessage.TriggerTime, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + tran.HSet(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", JsonConvert.SerializeObject(delayStoreMessage)); + tran.Exec(); + } + await Task.CompletedTask; } public async Task Delete(DelayStoreMessage delayStoreMessage) { - await _redisClient.ZRemAsync(IndexRedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); - await _redisClient.HDelAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + using var tran = _redisClient.Multi(); + await tran.ZRemAsync(IndexRedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + await tran.HDelAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + tran.Exec(); } public async Task Get(long beforeTimestamp) { - var keys = await _redisClient.ZRangeByScoreAsync(IndexRedisKey, 0, beforeTimestamp); + var keys = _redisClient.ZRangeByScore(IndexRedisKey, 0, beforeTimestamp); if (keys == null || keys.Length == 0) return null; + var result1 = await _redisClient.HMGetAsync(RedisKey, keys!); + return null; var result = await _redisClient.HMGetAsync(RedisKey, keys!); //排除为空的值并删除 for (var i = 0; i < keys.Length; i++) { if (result[i] == null) { - await _redisClient.ZRemAsync(IndexRedisKey, keys[i]); - await _redisClient.HDelAsync(RedisKey, keys[i]); + using var tran = _redisClient.Multi(); + await RemoveKey(keys[i]); + await tran.ZRemAsync(IndexRedisKey, keys[i]); + await tran.HDelAsync(RedisKey, keys[i]); + tran.Exec(); } } return result.Where(x => x != null).ToArray(); } + private async Task RemoveKey(string key) + { + using var tran = _redisClient.Multi(); + await tran.ZRemAsync(IndexRedisKey, key); + await tran.HDelAsync(RedisKey, key); + tran.Exec(); + } + public bool Lock() { _redisClientLock = _redisClient.Lock($"NebulaBus:{_nebulaOptions.ClusterName}.Lock", 3, true); -- Gitee From 05c7a06399d5cb49768cb9f7e30ec714c1f650a6 Mon Sep 17 00:00:00 2001 From: liuhai Date: Sat, 12 Apr 2025 08:50:04 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=89=A7=E8=A1=8Chandler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/NebulaBus.csproj | 6 ++++-- .../{INebulaExecutor.cs => NebulaExecutor.cs} | 4 ++-- src/NebulaBus/NebulaHandler.cs | 2 +- src/NebulaBus/NebulaOptions.cs | 1 + src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 4 +++- src/NebulaBus/ServiceCollectionExtensions.cs | 3 +++ src/NebulaBus/Store/Redis/RedisStore.cs | 19 ++++++------------- 7 files changed, 20 insertions(+), 19 deletions(-) rename src/NebulaBus/{INebulaExecutor.cs => NebulaExecutor.cs} (91%) diff --git a/src/NebulaBus/NebulaBus.csproj b/src/NebulaBus/NebulaBus.csproj index 458e31d..c66110b 100644 --- a/src/NebulaBus/NebulaBus.csproj +++ b/src/NebulaBus/NebulaBus.csproj @@ -3,18 +3,20 @@ javen liu JiewitTech https://github.com/JiewitTech/NebulaBus.git + https://github.com/JiewitTech/NebulaBus + NebulaBus - Future oriented NET distributed event bus framework, allowing developers to focus on development + NebulaBus enventbus MIT README.md $(PackageVersion) - + netstandard2.1 enable - diff --git a/src/NebulaBus/INebulaExecutor.cs b/src/NebulaBus/NebulaExecutor.cs similarity index 91% rename from src/NebulaBus/INebulaExecutor.cs rename to src/NebulaBus/NebulaExecutor.cs index 64665ba..40d9450 100644 --- a/src/NebulaBus/INebulaExecutor.cs +++ b/src/NebulaBus/NebulaExecutor.cs @@ -15,11 +15,11 @@ namespace NebulaBus private readonly IDelayMessageScheduler _delayMessageScheduler; private readonly SemaphoreSlim _semaphore; - public NebulaExecutor(IProcessor processor, NebulaHandler nebulaHandler, IDelayMessageScheduler delayMessageScheduler) + public NebulaExecutor(IProcessor processor, NebulaHandler nebulaHandler, IDelayMessageScheduler delayMessageScheduler, byte threadCount) { _processor = processor; _delayMessageScheduler = delayMessageScheduler; - _threadCount = nebulaHandler.ExecuteThreadCount; + _threadCount = nebulaHandler.ExecuteThreadCount > 0 ? nebulaHandler.ExecuteThreadCount.Value : threadCount; _handler = nebulaHandler; _channel = Channel.CreateUnbounded<(T, string, NebulaHeader)>(); _semaphore = new SemaphoreSlim(1, 1); diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 3577478..0731b0e 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -12,7 +12,7 @@ namespace NebulaBus public virtual TimeSpan RetryInterval => TimeSpan.FromSeconds(10); public virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5); public virtual int MaxRetryCount => 10; - public virtual byte ExecuteThreadCount => 1; + public virtual byte? ExecuteThreadCount => 1; internal abstract Task Excute(IProcessor processor, IDelayMessageScheduler delayMessageScheduler, string message, NebulaHeader header); diff --git a/src/NebulaBus/NebulaOptions.cs b/src/NebulaBus/NebulaOptions.cs index c51a76c..a5f3f0a 100644 --- a/src/NebulaBus/NebulaOptions.cs +++ b/src/NebulaBus/NebulaOptions.cs @@ -9,6 +9,7 @@ namespace NebulaBus internal RabbitmqOptions RabbitmqOptions { get; } internal string RedisConnectionString { get; set; } public string ClusterName { get; set; } = $"{Assembly.GetEntryAssembly().GetName().Name}"; + public byte ExecuteThreadCount { get; set; } = 1; public NebulaOptions() { diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index 11562aa..8773760 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -22,6 +22,7 @@ namespace NebulaBus.Rabbitmq private readonly IServiceProvider _serviceProvider; private bool _started; private readonly SemaphoreSlim _semaphore; + private readonly NebulaOptions _nebulaOptions; public RabbitmqProcessor( IServiceProvider serviceProvider, @@ -30,6 +31,7 @@ namespace NebulaBus.Rabbitmq ILogger logger) { _serviceProvider = serviceProvider; + _nebulaOptions = nebulaOptions; _rabbitmqOptions = nebulaOptions.RabbitmqOptions; _channels = new List(); _delayMessageScheduler = delayMessageScheduler; @@ -95,7 +97,7 @@ namespace NebulaBus.Rabbitmq var consumer = new AsyncEventingBasicConsumer(channel); await channel.BasicConsumeAsync(handler.Name, false, consumer, cancellationToken); - var excutor = new NebulaExecutor(this, handler, _delayMessageScheduler); + var excutor = new NebulaExecutor(this, handler, _delayMessageScheduler,_nebulaOptions.ExecuteThreadCount); excutor.Start(cancellationToken, async (tag) => { await channel.BasicAckAsync(tag, false); diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index d5266f9..dd5a625 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -10,6 +10,7 @@ using Quartz; using Quartz.Spi; using System; using System.Linq; +using Newtonsoft.Json; using System.Reflection; namespace Microsoft.Extensions.DependencyInjection @@ -40,6 +41,8 @@ namespace Microsoft.Extensions.DependencyInjection if (!string.IsNullOrEmpty(options.RedisConnectionString)) { var freeRedisClient = new RedisClient(options.RedisConnectionString); + freeRedisClient.Serialize = obj => JsonConvert.SerializeObject(obj); + freeRedisClient.Deserialize = (json, type) => JsonConvert.DeserializeObject(json, type); services.AddKeyedSingleton("NebulaBusRedis", freeRedisClient); services.AddSingleton(); } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 626622c..7b50777 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,6 +1,5 @@ using FreeRedis; using Microsoft.Extensions.DependencyInjection; -using Newtonsoft.Json; using System; using System.Linq; using System.Threading.Tasks; @@ -24,11 +23,10 @@ namespace NebulaBus.Store.Redis public async Task Add(DelayStoreMessage delayStoreMessage) { - //var redis = new CSRedis.CSRedisClient(""); using (var tran = _redisClient.Multi()) { tran.ZAdd(IndexRedisKey, delayStoreMessage.TriggerTime, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); - tran.HSet(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", JsonConvert.SerializeObject(delayStoreMessage)); + tran.HSet(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", delayStoreMessage); tran.Exec(); } await Task.CompletedTask; @@ -37,28 +35,23 @@ namespace NebulaBus.Store.Redis public async Task Delete(DelayStoreMessage delayStoreMessage) { using var tran = _redisClient.Multi(); - await tran.ZRemAsync(IndexRedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); - await tran.HDelAsync(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + tran.ZRem(IndexRedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + tran.HDel(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); tran.Exec(); + await Task.CompletedTask; } public async Task Get(long beforeTimestamp) { var keys = _redisClient.ZRangeByScore(IndexRedisKey, 0, beforeTimestamp); if (keys == null || keys.Length == 0) return null; - var result1 = await _redisClient.HMGetAsync(RedisKey, keys!); - return null; var result = await _redisClient.HMGetAsync(RedisKey, keys!); //排除为空的值并删除 for (var i = 0; i < keys.Length; i++) { if (result[i] == null) { - using var tran = _redisClient.Multi(); await RemoveKey(keys[i]); - await tran.ZRemAsync(IndexRedisKey, keys[i]); - await tran.HDelAsync(RedisKey, keys[i]); - tran.Exec(); } } return result.Where(x => x != null).ToArray(); @@ -67,8 +60,8 @@ namespace NebulaBus.Store.Redis private async Task RemoveKey(string key) { using var tran = _redisClient.Multi(); - await tran.ZRemAsync(IndexRedisKey, key); - await tran.HDelAsync(RedisKey, key); + tran.ZRem(IndexRedisKey, key); + tran.HDel(RedisKey, key); tran.Exec(); } -- Gitee