diff --git a/src/NebulaBus/NebulaBusService.cs b/src/NebulaBus/NebulaBusService.cs index 68240ba27f51d78af93f34a035773adcdda3604a..d841768bc66c7fc00a79e863469fc95ba8a95ef6 100644 --- a/src/NebulaBus/NebulaBusService.cs +++ b/src/NebulaBus/NebulaBusService.cs @@ -48,7 +48,7 @@ namespace NebulaBus where T : class, new() { var header = BuildNebulaHeader(nameOrGroup); - await _delayMessageScheduler.Schedule(new DelayStoreMessage() + _delayMessageScheduler.Schedule(new DelayStoreMessage() { MessageId = header[NebulaHeader.MessageId]!, Group = nameOrGroup, @@ -63,7 +63,7 @@ namespace NebulaBus where T : class, new() { var header = BuildNebulaHeader(nameOrGroup, headers); - await _delayMessageScheduler.Schedule(new DelayStoreMessage() + _delayMessageScheduler.Schedule(new DelayStoreMessage() { MessageId = header[NebulaHeader.MessageId]!, Group = nameOrGroup, diff --git a/src/NebulaBus/NebulaHandler.cs b/src/NebulaBus/NebulaHandler.cs index 93f4f7d93c606f49f40944579062f71695b7d6a7..90645857105df072453d8a160f9df89399d4f024 100644 --- a/src/NebulaBus/NebulaHandler.cs +++ b/src/NebulaBus/NebulaHandler.cs @@ -1,4 +1,5 @@ -using NebulaBus.Scheduler; +using Microsoft.Extensions.DependencyInjection; +using NebulaBus.Scheduler; using System; using System.Reflection; using System.Text.Json; @@ -6,7 +7,17 @@ using System.Threading.Tasks; namespace NebulaBus { - public abstract class NebulaHandler + internal interface INebulaHandler + { + public string Name { get; } + public string Group { get; } + public TimeSpan RetryInterval { get; } + public TimeSpan RetryDelay { get; } + public int MaxRetryCount { get; } + public byte? ExecuteThreadCount { get; } + } + + public abstract class NebulaHandler : INebulaHandler { public abstract string Name { get; } public abstract string Group { get; } @@ -16,11 +27,9 @@ namespace NebulaBus public virtual byte? ExecuteThreadCount => null; internal abstract Task Excute( - IDelayMessageScheduler delayMessageScheduler, + IServiceProvider serviceProvider, ReadOnlyMemory message, - NebulaHeader header, - JsonSerializerOptions jsonSerializerOptions, - Func, NebulaHeader, Task> repulish); + NebulaHeader header); protected async Task DirectRetryExecute(Func operation) { @@ -44,12 +53,13 @@ namespace NebulaBus where T : class, new() { internal override async Task Excute( - IDelayMessageScheduler delayMessageScheduler, + IServiceProvider serviceProvider, ReadOnlyMemory message, - NebulaHeader header, - JsonSerializerOptions jsonSerializerOptions, - Func, NebulaHeader, Task> repulish) + NebulaHeader header) { + var delayMessageScheduler = serviceProvider.GetRequiredService(); + var jsonSerializerOptions = serviceProvider.GetRequiredService().JsonSerializerOptions; + (bool success, T? data, Exception? exception) = await DeSerializer(message, header, jsonSerializerOptions); if (!success || data == null) { @@ -90,23 +100,16 @@ namespace NebulaBus //no retry if (MaxRetryCount == 0) { - await FallBackHandler(data, header, new Exception($"can not deserialize from:{message}")); + await FallBackHandler(data, header, ex); return; } header[NebulaHeader.RetryCount] = (retryCount + 1).ToString(); - //First Time to retry,if no delay then send directly - if (retryCount == 0 && RetryDelay.TotalSeconds <= 0) - { - await repulish(Name, message, header); - return; - } - - //First Time to retry,if have delay then send after delay time - if (retryCount == 0 && RetryDelay.TotalSeconds > 0) + //First Time to retry,use retry delay + if (retryCount == 0) { - await delayMessageScheduler.Schedule(new Store.DelayStoreMessage() + delayMessageScheduler.Schedule(new Store.DelayStoreMessage() { Group = Group, Name = Name, @@ -121,12 +124,12 @@ namespace NebulaBus //out of retry count if (retryCount >= MaxRetryCount) { - await FallBackHandler(data, header, new Exception($"Exceeded the maximum retry count of {MaxRetryCount} times")); + await FallBackHandler(data, header, ex); return; } //Interval Retry - await delayMessageScheduler.Schedule(new Store.DelayStoreMessage() + delayMessageScheduler.Schedule(new Store.DelayStoreMessage() { Group = Group, Name = Name, diff --git a/src/NebulaBus/Rabbitmq/IRabbitmqChannelPool.cs b/src/NebulaBus/Rabbitmq/IRabbitmqChannelPool.cs new file mode 100644 index 0000000000000000000000000000000000000000..121b47fd7d56a3c438109c2ce641f750a45af782 --- /dev/null +++ b/src/NebulaBus/Rabbitmq/IRabbitmqChannelPool.cs @@ -0,0 +1,11 @@ +using RabbitMQ.Client; +using System.Threading.Tasks; + +namespace NebulaBus.Rabbitmq +{ + internal interface IRabbitmqChannelPool + { + Task GetChannelAsync(); + Task ReturnChannel(IChannel channel); + } +} diff --git a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs index 2f04276da245b9f7ea6af8d9e2f12675ca679e0f..5995407f232d9afd7f5551e6468f931299793601 100644 --- a/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs +++ b/src/NebulaBus/Rabbitmq/NebulaRabbitmqConsumer.cs @@ -8,10 +8,12 @@ namespace NebulaBus.Rabbitmq { internal class NebulaRabbitmqConsumer : AsyncDefaultBasicConsumer { - private readonly Func, NebulaHeader, Task> _excute; - public NebulaRabbitmqConsumer(IChannel channel, int prefetchSize, Func, NebulaHeader, Task> excute) : base(channel) + private readonly IServiceProvider _serviceProvider; + private readonly Type _handlerType; + public NebulaRabbitmqConsumer(IChannel channel, IServiceProvider serviceProvider, Type handlerType) : base(channel) { - _excute = excute; + _serviceProvider = serviceProvider; + _handlerType = handlerType; } public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken = default) @@ -25,7 +27,11 @@ namespace NebulaBus.Rabbitmq } } - await _excute(body, header).ConfigureAwait(false); + var handler = _serviceProvider.GetService(_handlerType) as NebulaHandler; + if (handler != null) + { + await handler.Excute(_serviceProvider, body, header); + } await BasicAckAsync(deliveryTag, cancellationToken); } diff --git a/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs new file mode 100644 index 0000000000000000000000000000000000000000..0bbfbeaa099eab98f3f90748873199691de9df6a --- /dev/null +++ b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs @@ -0,0 +1,84 @@ +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; + +namespace NebulaBus.Rabbitmq +{ + internal class RabbitmqChannelPool : IRabbitmqChannelPool, IDisposable + { + private readonly ConcurrentQueue _channelPool; + private readonly SemaphoreSlim _semaphore; + private readonly RabbitmqOptions _rabbitmqOptions; + private readonly ILogger _logger; + private readonly int _maxPoolSize = 10; + private readonly ConnectionFactory _connectionFactory; + private IConnection _connection; + public RabbitmqChannelPool(NebulaOptions nebulaOptions, ILogger logger) + { + _channelPool = new ConcurrentQueue(); + _semaphore = new SemaphoreSlim(1, 1); + _rabbitmqOptions = nebulaOptions.RabbitmqOptions; + _logger = logger; + _connectionFactory = new ConnectionFactory() + { + HostName = _rabbitmqOptions.HostName, + UserName = _rabbitmqOptions.UserName, + Password = _rabbitmqOptions.Password, + VirtualHost = _rabbitmqOptions.VirtualHost, + AutomaticRecoveryEnabled = true, + ClientProvidedName = $"NebulaBus:{Environment.MachineName}.{Assembly.GetEntryAssembly().GetName().Name}" + }; + } + + private async Task CreateNewChannel() + { + await _semaphore.WaitAsync(); + try + { + if (_connection == null || !_connection.IsOpen) + _connection = await _connectionFactory.CreateConnectionAsync(); + return await _connection.CreateChannelAsync(); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to create new channel."); + throw; + } + finally + { + _semaphore.Release(); + } + } + + public async Task GetChannelAsync() + { + if (_channelPool.TryDequeue(out var channel) && channel.IsOpen) + return channel; + return await CreateNewChannel(); + } + + public async Task ReturnChannel(IChannel channel) + { + if (channel == null) return; + if (channel.IsOpen && _channelPool.Count < _maxPoolSize) + { + _channelPool.Enqueue(channel); + return; + } + await channel.CloseAsync(); + await channel.DisposeAsync(); + } + + public void Dispose() + { + while (_channelPool.TryDequeue(out var channel)) + { + channel?.Dispose(); + } + } + } +} diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index dfe87fcdd23da56d30ba4df7613853e9015da5af..4546dee789e409960cb4f070aaefaa0341373585 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -14,29 +14,25 @@ namespace NebulaBus.Rabbitmq internal class RabbitmqProcessor : IProcessor { private readonly RabbitmqOptions _rabbitmqOptions; - private IConnection? _connection; private readonly List _channels; - private IChannel _senderChannel; - private readonly IDelayMessageScheduler _delayMessageScheduler; private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private bool _started; - private readonly SemaphoreSlim _semaphore; private readonly NebulaOptions _nebulaOptions; + private readonly IRabbitmqChannelPool _channelPool; public RabbitmqProcessor( IServiceProvider serviceProvider, + IRabbitmqChannelPool rabbitmqChannelPool, NebulaOptions nebulaOptions, - IDelayMessageScheduler delayMessageScheduler, ILogger logger) { _serviceProvider = serviceProvider; _nebulaOptions = nebulaOptions; _rabbitmqOptions = nebulaOptions.RabbitmqOptions; _channels = new List(); - _delayMessageScheduler = delayMessageScheduler; _logger = logger; - _semaphore = new SemaphoreSlim(1, 1); + _channelPool = rabbitmqChannelPool; } public void Dispose() @@ -46,81 +42,50 @@ namespace NebulaBus.Rabbitmq channel.CloseAsync().Wait(); channel.Dispose(); } - - _connection?.CloseAsync().Wait(); - _connection?.Dispose(); } public async Task Start(CancellationToken cancellationToken) { try { - //Sender Channel - _senderChannel = await CreateNewChannel(); - _channels.Add(_senderChannel); - await RegisteConsumer(cancellationToken); _started = true; } catch (Exception ex) { - _logger.LogError(ex, $"Processor {this.GetType().Name} start failed"); + _logger.LogError(ex, $"Processor RabbitmqProcessor start failed"); } } public async Task Publish(string routingKey, object message, NebulaHeader header) { - await _semaphore.WaitAsync(); + var channel = await _channelPool.GetChannelAsync(); try { - if (!_started) + var props = new BasicProperties() { - _logger.LogError($"Processor {this.GetType().Name} not started"); - return; - } + Headers = header.ToDictionary(x => x.Key, (x) => (object?)x.Value), + Persistent = true, + ContentType = "application/json" + }; var jsonBytes = JsonSerializer.SerializeToUtf8Bytes(message, _nebulaOptions.JsonSerializerOptions); - await PublishByChannel(_senderChannel, routingKey, jsonBytes, header); + await channel.BasicPublishAsync(_rabbitmqOptions.ExchangeName, routingKey, false, props, jsonBytes); } - finally - { - _semaphore.Release(); - } - } - - private async Task GetConnection() - { - if (_connection == null || !_connection.IsOpen) - { - var connectionFactory = new ConnectionFactory() - { - HostName = _rabbitmqOptions.HostName, - UserName = _rabbitmqOptions.UserName, - Password = _rabbitmqOptions.Password, - VirtualHost = _rabbitmqOptions.VirtualHost, - AutomaticRecoveryEnabled = true, - ClientProvidedName = $"NebulaBus:{Environment.MachineName}" - }; - _connection = await connectionFactory.CreateConnectionAsync(); - } - return _connection; - } - - private async Task CreateNewChannel() - { - try + catch (Exception ex) { - var connection = await GetConnection(); - return await connection.CreateChannelAsync(); + _logger.LogError(ex, $"Processor RabbitmqProcessor publish message to {routingKey} failed"); + throw ex; } finally { + await _channelPool.ReturnChannel(channel); } } private async Task RegisteConsumer(CancellationToken cancellationToken) { - var _nebulaHandlers = _serviceProvider.GetServices(); + var _nebulaHandlers = _serviceProvider.GetServices(); if (_nebulaHandlers == null) return; var handlerInfos = _nebulaHandlers.Select(x => new HandlerInfo() @@ -145,7 +110,7 @@ namespace NebulaBus.Rabbitmq //每个handler创建一个channel 一个consumer for (byte i = 0; i < handlerInfo.ExcuteThreadCount; i++) { - var channel = await CreateNewChannel(); + var channel = await _channelPool.GetChannelAsync(); _channels.Add(channel); if (qos > 0) @@ -165,41 +130,9 @@ namespace NebulaBus.Rabbitmq await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); //Create Consumer - var consumer = new NebulaRabbitmqConsumer(channel, qos, async (message, header) => - { - var handler = _serviceProvider.GetService(handlerInfo.Type) as NebulaHandler; - if (handler == null) return; - await handler.Excute(_delayMessageScheduler!, message, header, _nebulaOptions.JsonSerializerOptions, (routingKey, message, header) => - { - return PublishByChannel(channel, routingKey, message, header); - }); - }); + var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerInfo.Type); await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); } } - - public async Task PublishByChannel(IChannel channel, string routingKey, ReadOnlyMemory message, NebulaHeader header) - { - try - { - if (!_started) - { - _logger.LogError($"Processor {this.GetType().Name} not started"); - return; - } - - var props = new BasicProperties() - { - Headers = header.ToDictionary(x => x.Key, (x) => (object?)x.Value), - Persistent = true, - ContentType = "application/json" - }; - - await channel.BasicPublishAsync(_rabbitmqOptions.ExchangeName, routingKey, false, props, message); - } - finally - { - } - } } } \ No newline at end of file diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 92603fe5e4d7b0dfd59f0d9d614bf81f19200bb2..8385f10e0290dd681143b50fd76542f857a3e895 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -27,27 +27,17 @@ namespace NebulaBus.Scheduler _serializerOptions = serviceProvider.GetRequiredService().JsonSerializerOptions; } - public async Task Schedule(DelayStoreMessage delayMessage) + public void Schedule(DelayStoreMessage delayMessage) { if (string.IsNullOrEmpty(delayMessage.MessageId)) return; - await _store.Add(delayMessage); + _store.Add(delayMessage); } public async Task StartSchedule(CancellationToken cancellationToken) { var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.Token.Register(() => - { - try - { - _store?.Dispose(); - } - catch - { - } - }); StdSchedulerFactory factory = new StdSchedulerFactory(); _scheduler = await factory.GetScheduler(cts.Token); diff --git a/src/NebulaBus/Scheduler/DelayMessageSendJob.cs b/src/NebulaBus/Scheduler/DelayMessageSendJob.cs index 08de237288d11cb086f4321e1552846fb9a8f6ea..df821b7f8b86eb67b44a9ea15b7ecc289c2ec78d 100644 --- a/src/NebulaBus/Scheduler/DelayMessageSendJob.cs +++ b/src/NebulaBus/Scheduler/DelayMessageSendJob.cs @@ -41,7 +41,7 @@ namespace NebulaBus.Scheduler await processor.Publish(messageData.Name, messageData.Message, messageData.Header); } - await _store.Delete(messageData); + _store.Delete(messageData); } catch (Exception e) { diff --git a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs index b368ddfa9923ea2a087a010e620f0c3de512c939..800796040c43f9863826f41621977826797813fd 100644 --- a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs @@ -7,6 +7,6 @@ namespace NebulaBus.Scheduler internal interface IDelayMessageScheduler { Task StartSchedule(CancellationToken cancellationToken); - Task Schedule(DelayStoreMessage delayStoreMessage); + void Schedule(DelayStoreMessage delayStoreMessage); } } \ No newline at end of file diff --git a/src/NebulaBus/ServiceCollectionExtensions.cs b/src/NebulaBus/ServiceCollectionExtensions.cs index f9e059292951bcac8d2e9ceb23edcb832457556b..2cf3204ad13614fb8ea7e79e86d1c133862c7901 100644 --- a/src/NebulaBus/ServiceCollectionExtensions.cs +++ b/src/NebulaBus/ServiceCollectionExtensions.cs @@ -30,6 +30,9 @@ namespace Microsoft.Extensions.DependencyInjection //Processor services.TryAddEnumerable(ServiceDescriptor.Singleton()); + //Rabbitmq + services.AddSingleton(); + //Schedule job services.AddSingleton(); services.AddKeyedSingleton("NebulaBusJobFactory"); @@ -57,7 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection public static void AddNebulaBusHandler(this IServiceCollection services) where TH : NebulaHandler { - services.TryAddEnumerable(ServiceDescriptor.Transient()); + services.TryAddEnumerable(ServiceDescriptor.Transient()); services.TryAddTransient(); } @@ -65,16 +68,16 @@ namespace Microsoft.Extensions.DependencyInjection where TH : NebulaHandler where TM : class, new() { - services.TryAddEnumerable(ServiceDescriptor.Transient()); + services.TryAddEnumerable(ServiceDescriptor.Transient()); services.TryAddTransient(); } public static void AddNebulaBusHandler(this IServiceCollection services, params Assembly[] assemblies) { - var types = assemblies.SelectMany(x => x.GetTypes().Where(t => t.IsClass && !t.IsAbstract && typeof(NebulaHandler).IsAssignableFrom(t))); + var types = assemblies.SelectMany(x => x.GetTypes().Where(t => t.IsClass && !t.IsAbstract && typeof(INebulaHandler).IsAssignableFrom(t))); foreach (var typeItem in types) { - services.TryAddEnumerable(ServiceDescriptor.Transient(typeof(NebulaHandler), typeItem)); + services.TryAddEnumerable(ServiceDescriptor.Transient(typeof(INebulaHandler), typeItem)); services.TryAddTransient(typeItem); } } diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index a4ba39c04b8aff5bc48b1ea8a602ca0b303ed304..0cc93b3f102517a5eceaf0b99d42d51fe0afddd3 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -3,10 +3,10 @@ using System.Threading.Tasks; namespace NebulaBus.Store { - internal interface IStore : IDisposable + internal interface IStore { - Task Add(DelayStoreMessage delayStoreMessage); - Task Delete(DelayStoreMessage delayStoreMessage); + void Add(DelayStoreMessage delayStoreMessage); + void Delete(DelayStoreMessage delayStoreMessage); Task Get(long beforeTimestamp); bool Lock(); } diff --git a/src/NebulaBus/Store/Memory/MemoryStore.cs b/src/NebulaBus/Store/Memory/MemoryStore.cs index ddadd1b7a53199561159dc60d0f03572ada9e60b..87b7781a5c51b1765406e903e9315deb276ed369 100644 --- a/src/NebulaBus/Store/Memory/MemoryStore.cs +++ b/src/NebulaBus/Store/Memory/MemoryStore.cs @@ -13,17 +13,15 @@ namespace NebulaBus.Store.Memory _storeMessages = new ConcurrentDictionary(); } - public async Task Add(DelayStoreMessage delayStoreMessage) + public void Add(DelayStoreMessage delayStoreMessage) { _storeMessages.AddOrUpdate($"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", c => delayStoreMessage, (c, o) => delayStoreMessage); - await Task.CompletedTask; } - public async Task Delete(DelayStoreMessage delayStoreMessage) + public void Delete(DelayStoreMessage delayStoreMessage) { _storeMessages.TryRemove($"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", out _); - await Task.CompletedTask; } public async Task Get(long beforeTimestamp) @@ -36,10 +34,5 @@ namespace NebulaBus.Store.Memory { return true; } - - public void Dispose() - { - _storeMessages.Clear(); - } } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 7b507779f8600d384e0aae775770016aa2a95239..cf8cff24349f85d749db6809bfd2e146f79acf8a 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -21,24 +21,20 @@ namespace NebulaBus.Store.Redis _nebulaOptions = nebulaOptions; } - public async Task Add(DelayStoreMessage delayStoreMessage) + public void Add(DelayStoreMessage delayStoreMessage) { - using (var tran = _redisClient.Multi()) - { - tran.ZAdd(IndexRedisKey, delayStoreMessage.TriggerTime, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); - tran.HSet(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", delayStoreMessage); - tran.Exec(); - } - await Task.CompletedTask; + using var tran = _redisClient.Multi(); + tran.ZAdd(IndexRedisKey, delayStoreMessage.TriggerTime, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); + tran.HSet(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}", delayStoreMessage); + tran.Exec(); } - public async Task Delete(DelayStoreMessage delayStoreMessage) + public void Delete(DelayStoreMessage delayStoreMessage) { using var tran = _redisClient.Multi(); tran.ZRem(IndexRedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); tran.HDel(RedisKey, $"{delayStoreMessage.MessageId}.{delayStoreMessage.Name}"); tran.Exec(); - await Task.CompletedTask; } public async Task Get(long beforeTimestamp) @@ -51,13 +47,13 @@ namespace NebulaBus.Store.Redis { if (result[i] == null) { - await RemoveKey(keys[i]); + RemoveKey(keys[i]); } } return result.Where(x => x != null).ToArray(); } - private async Task RemoveKey(string key) + private void RemoveKey(string key) { using var tran = _redisClient.Multi(); tran.ZRem(IndexRedisKey, key); @@ -70,11 +66,5 @@ namespace NebulaBus.Store.Redis _redisClientLock = _redisClient.Lock($"NebulaBus:{_nebulaOptions.ClusterName}.Lock", 3, true); return _redisClientLock != null; } - - public void Dispose() - { - _redisClientLock?.Dispose(); - _redisClient?.Dispose(); - } } } \ No newline at end of file