diff --git a/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs index c87a8f63b3f1eac542bcc5f0dc80e4a5762627e0..77ea530bcbbe9ed69554fb0651417a11113a97ec 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqChannelPool.cs @@ -31,7 +31,8 @@ namespace NebulaBus.Rabbitmq VirtualHost = _rabbitmqOptions.VirtualHost, AutomaticRecoveryEnabled = true, Port = _rabbitmqOptions.Port, - ClientProvidedName = $"NebulaBus:{Environment.MachineName}.{Assembly.GetEntryAssembly().GetName().Name}" + ClientProvidedName = $"NebulaBus:{Environment.MachineName}.{Assembly.GetEntryAssembly().GetName().Name}", + TopologyRecoveryEnabled = true, }; if (_rabbitmqOptions.SslOption != null) _connectionFactory.Ssl = _rabbitmqOptions.SslOption; @@ -50,6 +51,9 @@ namespace NebulaBus.Rabbitmq _connection = await _connectionFactory.CreateConnectionAsync(AmqpTcpEndpoint.ParseMultiple(_rabbitmqOptions.HostName), cancellationToken); else _connection = await _connectionFactory.CreateConnectionAsync(cancellationToken); } + _connection.ConnectionShutdownAsync+= async (sender, args) => + { + }; return await _connection.CreateChannelAsync(cancellationToken: cancellationToken); } catch (Exception ex) diff --git a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs index feac0b89b78a51c6305ade2c767b9dfe52c75b46..c6210306e6e7d105c187154a74dc6215bdb5a5a4 100644 --- a/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs +++ b/src/NebulaBus/Rabbitmq/RabbitmqProcessor.cs @@ -19,6 +19,7 @@ namespace NebulaBus.Rabbitmq private bool _started; private readonly NebulaOptions _nebulaOptions; private readonly IRabbitmqChannelPool _channelPool; + private CancellationToken _originalCancellationToken; public RabbitmqProcessor( IServiceProvider serviceProvider, @@ -50,6 +51,7 @@ namespace NebulaBus.Rabbitmq public async Task Start(CancellationToken cancellationToken) { + _originalCancellationToken = cancellationToken; try { await RegisteConsumer(cancellationToken); @@ -105,38 +107,37 @@ namespace NebulaBus.Rabbitmq var getQos = _rabbitmqOptions.GetQos?.Invoke(info.Name, info.Group); var qos = getQos > 0 ? getQos : _rabbitmqOptions.Qos; - await RegisteConsumerByConfig(info, qos.Value, cancellationToken); + for (byte i = 0; i < info.ExcuteThreadCount; i++) + { + await RegisteConsumerByConfig(info.Name, info.Group, qos.Value, info.Type, cancellationToken); + } } } - private async Task RegisteConsumerByConfig(HandlerInfo handlerInfo, ushort qos, CancellationToken cancellationToken) + private async Task RegisteConsumerByConfig(string name, string group, ushort qos, Type handlerType, CancellationToken cancellationToken) { - //每个handler创建一个channel 一个consumer - for (byte i = 0; i < handlerInfo.ExcuteThreadCount; i++) - { - var channel = await _channelPool.GetChannelAsync(cancellationToken); - _channels.Add(channel); + var channel = await _channelPool.GetChannelAsync(cancellationToken); + _channels.Add(channel); - if (qos > 0) - await channel.BasicQosAsync(0, qos, false); + if (qos > 0) + await channel.BasicQosAsync(0, qos, false); - //Create Exchange - await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); - //Create Queue - await channel.QueueDeclareAsync(handlerInfo.Name, true, false, false, null); + //Create Exchange + await channel.ExchangeDeclareAsync(_rabbitmqOptions.ExchangeName, ExchangeType.Direct, true); + //Create Queue + await channel.QueueDeclareAsync(name, true, false, false, null); - //Bind Group RoutingKey - if (!string.IsNullOrEmpty(handlerInfo.Group)) - await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Group, null); + //Bind Group RoutingKey + if (!string.IsNullOrEmpty(group)) + await channel.QueueBindAsync(name, _rabbitmqOptions.ExchangeName, group, null); - //Bind Name RoutingKey - if (!string.IsNullOrEmpty(handlerInfo.Name)) - await channel.QueueBindAsync(handlerInfo.Name, _rabbitmqOptions.ExchangeName, handlerInfo.Name, null); + //Bind Name RoutingKey + if (!string.IsNullOrEmpty(name)) + await channel.QueueBindAsync(name, _rabbitmqOptions.ExchangeName, name, null); - //Create Consumer - var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerInfo.Type); - await channel.BasicConsumeAsync(handlerInfo.Name, false, consumer, cancellationToken); - } + //Create Consumer + var consumer = new NebulaRabbitmqConsumer(channel, _serviceProvider, handlerType); + await channel.BasicConsumeAsync(name, false, $"{Guid.NewGuid()}", consumer, cancellationToken); } } } \ No newline at end of file diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index f31f01bfcb9e62c7718563bb92f0d0f12fe028ae..ac25e1a73145ad6e7508994176df62a6adb53857 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -53,17 +53,25 @@ namespace NebulaBus.Scheduler while (!cts.IsCancellationRequested) { - //lock - var gotLock = _store.Lock(); - if (!gotLock) + try { - await Task.Delay(1000, cancellationToken); - continue; - } + //lock + var gotLock = _store.Lock(); + if (!gotLock) + { + await Task.Delay(1000, cancellationToken); + continue; + } - while (!cts.IsCancellationRequested) + while (!cts.IsCancellationRequested) + { + await ScheduleJobFromStore(cts.Token); + await Task.Delay(1000, cts.Token); + } + } + catch (Exception ex) { - await ScheduleJobFromStore(cts.Token); + _logger.LogError(ex, "Schedule Failed"); await Task.Delay(1000, cts.Token); } } @@ -83,39 +91,32 @@ namespace NebulaBus.Scheduler private async Task ScheduleJobFromStore(CancellationToken cancellationToken) { - try + var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(3).ToUnixTimeSeconds()); + if (delayMessages == null) return; + foreach (var delayMessage in delayMessages) { - var delayMessages = await _store.Get(DateTimeOffset.Now.AddMinutes(3).ToUnixTimeSeconds()); - if (delayMessages == null) return; - foreach (var delayMessage in delayMessages) - { - if (delayMessage == null) continue; - if (cancellationToken.IsCancellationRequested) - return; - var job = BuildJobDetail(delayMessage); - if (await _scheduler.CheckExists(job.Key, cancellationToken)) - continue; - - if (delayMessage.TriggerTime < DateTimeOffset.Now.ToUnixTimeSeconds()) - { - var rightNowTrigger = TriggerBuilder.Create() - .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") - .StartNow() - .Build(); - await _scheduler.ScheduleJob(job, rightNowTrigger, cancellationToken); - continue; - } + if (delayMessage == null) continue; + if (cancellationToken.IsCancellationRequested) + return; + var job = BuildJobDetail(delayMessage); + if (await _scheduler.CheckExists(job.Key, cancellationToken)) + continue; - var trigger = TriggerBuilder.Create() + if (delayMessage.TriggerTime < DateTimeOffset.Now.ToUnixTimeSeconds()) + { + var rightNowTrigger = TriggerBuilder.Create() .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") - .StartAt(DateTimeOffset.FromUnixTimeSeconds(delayMessage.TriggerTime)) + .StartNow() .Build(); - await _scheduler.ScheduleJob(job, trigger, cancellationToken); + await _scheduler.ScheduleJob(job, rightNowTrigger, cancellationToken); + continue; } - } - catch (Exception ex) - { - _logger.LogError(ex, "ScheduleJobFromStore"); + + var trigger = TriggerBuilder.Create() + .WithIdentity($"NebulaBusTrigger:{delayMessage.MessageId}.{delayMessage.Name}") + .StartAt(DateTimeOffset.FromUnixTimeSeconds(delayMessage.TriggerTime)) + .Build(); + await _scheduler.ScheduleJob(job, trigger, cancellationToken); } } } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index e64385cf365de567815ca06e1e945152720286a0..ce8676851fb46c51d11bad84887993d7f4498853 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -1,5 +1,6 @@ using FreeRedis; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using System; using System.Linq; using System.Threading.Tasks; @@ -15,11 +16,13 @@ namespace NebulaBus.Store.Redis private readonly RedisClient _redisClient; private readonly NebulaOptions _nebulaOptions; private RedisClient.LockController _redisClientLock; + private readonly ILogger _logger; - public RedisStore(IServiceProvider provider, NebulaOptions nebulaOptions) + public RedisStore(IServiceProvider provider, NebulaOptions nebulaOptions, ILogger logger) { _redisClient = provider.GetKeyedService("NebulaBusRedis")!; _nebulaOptions = nebulaOptions; + _logger = logger; } public void Add(DelayStoreMessage delayStoreMessage)