From eaadef00bb51e561bcc2c41ac862e6faafe1c8cf Mon Sep 17 00:00:00 2001 From: liuhai Date: Wed, 16 Apr 2025 19:10:08 +0800 Subject: [PATCH 1/2] =?UTF-8?q?rabbitmq=20=E6=94=AF=E6=8C=81=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs | 6 ++- src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs | 47 ++++++++++--------- src/NebulaBus/Store/Redis/RedisStore.cs | 17 +++++-- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs index c87a8f6..77ea530 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs @@ -31,7 +31,8 @@ namespace NebulaBus.Rabbitmq VirtualHost = _rabbitmqOptions.VirtualHost, AutomaticRecoveryEnabled = true, Port = _rabbitmqOptions.Port, - ClientProvidedName = $"NebulaBus:{Environment.MachineName}.{Assembly.GetEntryAssembly().GetName().Name}" + ClientProvidedName = $"NebulaBus:{Environment.MachineName}.{Assembly.GetEntryAssembly().GetName().Name}", + TopologyRecoveryEnabled = true, }; if (_rabbitmqOptions.SslOption != null) _connectionFactory.Ssl = _rabbitmqOptions.SslOption; @@ -50,6 +51,9 @@ namespace NebulaBus.Rabbitmq _connection = await _connectionFactory.CreateConnectionAsync(AmqpTcpEndpoint.ParseMultiple(_rabbitmqOptions.HostName), cancellationToken); else _connection = await _connectionFactory.CreateConnectionAsync(cancellationToken); } + _connection.ConnectionShutdownAsync+= async (sender, args) => + { + }; return await _connection.CreateChannelAsync(cancellationToken: cancellationToken); } catch (Exception ex) diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index feac0b8..c621030 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -19,6 +19,7 @@ namespace NebulaBus.Rabbitmq private bool _started; private readonly NebulaOptions _nebulaOptions; private readonly IRabbitmqChannelPool _channelPool; + private CancellationToken _originalCancellationToken; public RabbitmqProcessor( IServiceProvider serviceProvider, @@ -50,6 +51,7 @@ namespace NebulaBus.Rabbitmq public async Task Start(CancellationToken cancellationToken) { + _originalCancellationToken = cancellationToken; try { await RegisteConsumer(cancellationToken); @@ -105,38 +107,37 @@ namespace NebulaBus.Rabbitmq var getQos = _rabbitmqOptions.GetQos?.Invoke(info.Name, info.Group); var qos = getQos > 0 ? getQos : _rabbitmqOptions.Qos; - await RegisteConsumerByConfig(info, qos.Value, cancellationToken); + for (byte i = 0; i < info.ExcuteThreadCount; i++) + { + await RegisteConsumerByConfig(info.Name, info.Group, qos.Value, info.Type, cancellationToken); + } } } - private async Task RegisteConsumerByConfig(HandlerInfo handlerInfo, ushort qos, CancellationToken cancellationToken) + private async Task RegisteConsumerByConfig(string name, string group, ushort qos, Type handlerType, CancellationToken cancellationToken) { - //每个handler创建一个channel 一个consumer - for (byte i = 0; i < handlerInfo.ExcuteThreadCount; i++) - { - var channel = await _channelPool.GetChannelAsync(cancellationToken); - _channels.Add(channel); + var channel = await _channelPool.GetChannelAsync(cancellationToken); + _channels.Add(channel); - if (qos > 0) - await channel.BasicQosAsync(0, qos, false); + if (qos > 0) + await channel.BasicQosAsync(0, qos, false); - //Create Exchange - await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); - //Create Queue - await channel.QueueDeclareAsync(handlerInfo.Name, true, false, false, null); + //Create Exchange + await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); + //Create Queue + await channel.QueueDeclareAsync(name, true, false, false, null); - //Bind Group RoutingKey - if (!string.IsNullOrEmpty(handlerInfo.Group)) - await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Group, null); + //Bind Group RoutingKey + if (!string.IsNullOrEmpty(group)) + await channel.QueueBindAsync(name, _rabbitmqOptions.ExchangeName, group, null); - //Bind Name RoutingKey - if (!string.IsNullOrEmpty(handlerInfo.Name)) - await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); + //Bind Name RoutingKey + if (!string.IsNullOrEmpty(name)) + await channel.QueueBindAsync(name, _rabbitmqOptions.ExchangeName, name, null); - //Create Consumer - var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerInfo.Type); - await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); - } + //Create Consumer + var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerType); + await channel.BasicConsumeAsync(name, false, $"{Guid.NewGuid()}", consumer, cancellationToken); } } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index e64385c..df97572 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,5 +1,6 @@ using FreeRedis; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using System; using System.Linq; using System.Threading.Tasks; @@ -15,11 +16,13 @@ namespace NebulaBus.Store.Redis private readonly RedisClient _redisClient; private readonly NebulaOptions _nebulaOptions; private RedisClient.LockController _redisClientLock; + private readonly ILogger _logger; - public RedisStore(IServiceProvider provider, NebulaOptions nebulaOptions) + public RedisStore(IServiceProvider provider, NebulaOptions nebulaOptions, ILogger logger) { _redisClient = provider.GetKeyedService("NebulaBusRedis")!; _nebulaOptions = nebulaOptions; + _logger = logger; } public void Add(DelayStoreMessage delayStoreMessage) @@ -64,8 +67,16 @@ namespace NebulaBus.Store.Redis public bool Lock() { - _redisClientLock = _redisClient.Lock(LockKey, 3, true); - return _redisClientLock != null; + try + { + _redisClientLock = _redisClient.Lock(LockKey, 3, true); + return _redisClientLock != null; + } + catch (Exception ex) + { + _logger.LogError(ex, "RedisStore Lock failed"); + return false; + } } public void Dispose() -- Gitee From c2de50c94c4f86b32c7bce6d250c788a05999e9c Mon Sep 17 00:00:00 2001 From: liuhai Date: Wed, 16 Apr 2025 19:20:46 +0800 Subject: [PATCH 2/2] =?UTF-8?q?redis=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Scheduler/DelayMessageScheduler.cs | 73 ++++++++++--------- src/NebulaBus/Store/Redis/RedisStore.cs | 12 +-- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index f31f01b..ac25e1a 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -53,17 +53,25 @@ namespace NebulaBus.Scheduler while (!cts.IsCancellationRequested) { - //lock - var gotLock = _store.Lock(); - if (!gotLock) + try { - await Task.Delay(1000, cancellationToken); - continue; - } + //lock + var gotLock = _store.Lock(); + if (!gotLock) + { + await Task.Delay(1000, cancellationToken); + continue; + } - while (!cts.IsCancellationRequested) + while (!cts.IsCancellationRequested) + { + await ScheduleJobFromStore(cts.Token); + await Task.Delay(1000, cts.Token); + } + } + catch (Exception ex) { - await ScheduleJobFromStore(cts.Token); + _logger.LogError(ex, "Schedule Failed"); await Task.Delay(1000, cts.Token); } } @@ -83,39 +91,32 @@ namespace NebulaBus.Scheduler private async Task ScheduleJobFromStore(CancellationToken cancellationToken) { - try + var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(3).ToUnixTimeSeconds()); + if (delayMessages == null) return; + foreach (var delayMessage in delayMessages) { - var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(3).ToUnixTimeSeconds()); - if (delayMessages == null) return; - foreach (var delayMessage in delayMessages) - { - if (delayMessage == null) continue; - if (cancellationToken.IsCancellationRequested) - return; - var job = BuildJobDetail(delayMessage); - if (await _scheduler.CheckExists(job.Key, cancellationToken)) - continue; - - if (delayMessage.TriggerTime < DateTimeOffset.Now.ToUnixTimeSeconds()) - { - var rightNowTrigger = TriggerBuilder.Create() - .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") - .StartNow() - .Build(); - await _scheduler.ScheduleJob(job, rightNowTrigger, cancellationToken); - continue; - } + if (delayMessage == null) continue; + if (cancellationToken.IsCancellationRequested) + return; + var job = BuildJobDetail(delayMessage); + if (await _scheduler.CheckExists(job.Key, cancellationToken)) + continue; - var trigger = TriggerBuilder.Create() + if (delayMessage.TriggerTime < DateTimeOffset.Now.ToUnixTimeSeconds()) + { + var rightNowTrigger = TriggerBuilder.Create() .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") - .StartAt(DateTimeOffset.FromUnixTimeSeconds(delayMessage.TriggerTime)) + .StartNow() .Build(); - await _scheduler.ScheduleJob(job, trigger, cancellationToken); + await _scheduler.ScheduleJob(job, rightNowTrigger, cancellationToken); + continue; } - } - catch (Exception ex) - { - _logger.LogError(ex, "ScheduleJobFromStore"); + + var trigger = TriggerBuilder.Create() + .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") + .StartAt(DateTimeOffset.FromUnixTimeSeconds(delayMessage.TriggerTime)) + .Build(); + await _scheduler.ScheduleJob(job, trigger, cancellationToken); } } } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index df97572..ce86768 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -67,16 +67,8 @@ namespace NebulaBus.Store.Redis public bool Lock() { - try - { - _redisClientLock = _redisClient.Lock(LockKey, 3, true); - return _redisClientLock != null; - } - catch (Exception ex) - { - _logger.LogError(ex, "RedisStore Lock failed"); - return false; - } + _redisClientLock = _redisClient.Lock(LockKey, 3, true); + return _redisClientLock != null; } public void Dispose() -- Gitee