From 8e5bbf95e09265c22c6495d9fd7fab33c333d3ab Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Tue, 8 Apr 2025 16:13:26 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=AE=9E=E7=8E=B0redis=E5=88=86=E5=B8=83?= =?UTF-8?q?=E5=BC=8F=E9=94=81=E7=9A=84=E7=AB=9E=E4=BA=89=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=EF=BC=8Cschedule=E8=8A=82=E7=82=B9=E5=AE=95=E6=9C=BA=E5=8F=AF?= =?UTF-8?q?=E8=A2=AB=E5=85=B6=E4=BB=96=E8=8A=82=E7=82=B9=E5=8F=96=E4=BB=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Bootstrapper.cs | 2 +- .../Scheduler/DelayMessageScheduler.cs | 45 +++++++++++++------ .../Scheduler/IDelayMessageScheduler.cs | 5 ++- src/NebulaBus/Store/IStore.cs | 1 + src/NebulaBus/Store/Memory/MemoryStore.cs | 5 +++ src/NebulaBus/Store/Redis/RedisStore.cs | 9 ++-- src/WebApplicationSample/Program.cs | 1 + 7 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/NebulaBus/Bootstrapper.cs b/src/NebulaBus/Bootstrapper.cs index 0e38d78..9daf1fc 100644 --- a/src/NebulaBus/Bootstrapper.cs +++ b/src/NebulaBus/Bootstrapper.cs @@ -54,7 +54,7 @@ namespace NebulaBus } //Start Store Scheduler - await _delayMessageScheduler.StartStoreSchedule(); + await _delayMessageScheduler.StartStoreSchedule(_cts.Token); } public override async Task StopAsync(CancellationToken cancellationToken) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index a2b2d2b..42ea5cd 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -2,6 +2,7 @@ using Quartz; using Quartz.Impl; using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Newtonsoft.Json; @@ -40,33 +41,49 @@ namespace NebulaBus.Scheduler await _senderScheduler.ScheduleJob(job, trigger); } - public async Task StartStoreSchedule() + public async Task StartStoreSchedule(CancellationToken cancellationToken) { StdSchedulerFactory factory = new StdSchedulerFactory(); _scheduler = await factory.GetScheduler(); _scheduler.JobFactory = _jobFactory; await _scheduler.Start(); - var delayMessages = await _store.GetAll(); - foreach (var delayMessage in delayMessages) + while (true) { - var job = BuildJobDetail(delayMessage.Value); + if (cancellationToken.IsCancellationRequested) + return; + + //lock + var gotLock = _store.Lock(); + if (!gotLock) + { + await Task.Delay(1000); + continue; + } - if (delayMessage.Value.TriggerTime < DateTimeOffset.Now) + var delayMessages = await _store.GetAll(); + foreach (var delayMessage in delayMessages) { - var rightNowTrigger = TriggerBuilder.Create() + var job = BuildJobDetail(delayMessage.Value); + + if (delayMessage.Value.TriggerTime < DateTimeOffset.Now) + { + var rightNowTrigger = TriggerBuilder.Create() + .WithIdentity($"Delay:{delayMessage.Key}") + .StartNow() + .Build(); + await _scheduler.ScheduleJob(job, rightNowTrigger); + continue; + } + + var trigger = TriggerBuilder.Create() .WithIdentity($"Delay:{delayMessage.Key}") - .StartNow() + .StartAt(delayMessage.Value.TriggerTime) .Build(); - await _scheduler.ScheduleJob(job, rightNowTrigger); - continue; + await _scheduler.ScheduleJob(job, trigger); } - var trigger = TriggerBuilder.Create() - .WithIdentity($"Delay:{delayMessage.Key}") - .StartAt(delayMessage.Value.TriggerTime) - .Build(); - await _scheduler.ScheduleJob(job, trigger); + await Task.Delay(1000); } } diff --git a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs index 7c13bac..13d54eb 100644 --- a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs @@ -1,11 +1,12 @@ -using NebulaBus.Store; +using System.Threading; +using NebulaBus.Store; using System.Threading.Tasks; namespace NebulaBus.Scheduler { internal interface IDelayMessageScheduler { - Task StartStoreSchedule(); + Task StartStoreSchedule(CancellationToken cancellationToken); Task StartSenderScheduler(); Task Schedule(DelayStoreMessage delayStoreMessage); } diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index ebe562e..ebcf87d 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -8,5 +8,6 @@ namespace NebulaBus.Store Task Add(DelayStoreMessage delayStoreMessage); Task Delete(string messageId); Task> GetAll(); + bool Lock(); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Memory/MemoryStore.cs b/src/NebulaBus/Store/Memory/MemoryStore.cs index 505f832..2485948 100644 --- a/src/NebulaBus/Store/Memory/MemoryStore.cs +++ b/src/NebulaBus/Store/Memory/MemoryStore.cs @@ -31,5 +31,10 @@ namespace NebulaBus.Store.Memory { return await Task.FromResult(_storeMessages.ToDictionary(x => x.Key, x => x.Value)); } + + public bool Lock() + { + return true; + } } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 4b7e78b..26d257a 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -35,11 +35,14 @@ namespace NebulaBus.Store.Redis public async Task> GetAll() { - using var redisLock = _redisClient.Lock($"{RedisLockKey}.Delete", 10); - if (redisLock == null) - throw new Exception("got redis lock failed"); var result = await _redisClient.HGetAllAsync(RedisKey); return result; } + + public bool Lock() + { + var redisLock= _redisClient.Lock($"{RedisLockKey}.Lock", 1, true); + return redisLock != null; + } } } \ No newline at end of file diff --git a/src/WebApplicationSample/Program.cs b/src/WebApplicationSample/Program.cs index edefec9..04a0b82 100644 --- a/src/WebApplicationSample/Program.cs +++ b/src/WebApplicationSample/Program.cs @@ -3,6 +3,7 @@ using WebApplicationSample.Handlers; using WebApplicationSample.Messages; var builder = WebApplication.CreateBuilder(args); +builder.WebHost.UseUrls("http://*:0"); var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT"); builder.Configuration.SetBasePath(Directory.GetCurrentDirectory()) -- Gitee From 5c8236d2e4463f08120d6e1ff6c2eead4e6e6225 Mon Sep 17 00:00:00 2001 From: "javen.liu" Date: Tue, 8 Apr 2025 17:12:17 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=EF=BC=9A=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=A4=9A=E8=8A=82=E7=82=B9=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?schedule?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Bootstrapper.cs | 5 +- .../Scheduler/DelayMessageScheduler.cs | 79 ++++++++----------- .../Scheduler/IDelayMessageScheduler.cs | 3 +- src/NebulaBus/Store/Redis/RedisStore.cs | 6 -- .../Properties/launchSettings.json | 10 +-- 5 files changed, 38 insertions(+), 65 deletions(-) diff --git a/src/NebulaBus/Bootstrapper.cs b/src/NebulaBus/Bootstrapper.cs index 9daf1fc..59b4df5 100644 --- a/src/NebulaBus/Bootstrapper.cs +++ b/src/NebulaBus/Bootstrapper.cs @@ -38,9 +38,6 @@ namespace NebulaBus { } }); - //Start Sender Scheduler - await _delayMessageScheduler.StartSenderScheduler(); - //Start all processors _disposed = false; foreach (var processor in _processors) @@ -54,7 +51,7 @@ namespace NebulaBus } //Start Store Scheduler - await _delayMessageScheduler.StartStoreSchedule(_cts.Token); + await _delayMessageScheduler.StartSchedule(_cts.Token); } public override async Task StopAsync(CancellationToken cancellationToken) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 42ea5cd..1fd8dd0 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -13,7 +13,6 @@ namespace NebulaBus.Scheduler internal class DelayMessageScheduler : IDelayMessageScheduler { private readonly IStore _store; - private IScheduler _senderScheduler; private IScheduler _scheduler; private readonly IJobFactory _jobFactory; @@ -26,22 +25,12 @@ namespace NebulaBus.Scheduler public async Task Schedule(DelayStoreMessage delayMessage) { if (string.IsNullOrEmpty(delayMessage.MessageId)) - { return; - } - - var job = BuildJobDetail(delayMessage); - - var trigger = TriggerBuilder.Create() - .WithIdentity($"Delay:{delayMessage.MessageId}") - .StartAt(delayMessage.TriggerTime) - .Build(); await _store.Add(delayMessage); - await _senderScheduler.ScheduleJob(job, trigger); } - public async Task StartStoreSchedule(CancellationToken cancellationToken) + public async Task StartSchedule(CancellationToken cancellationToken) { StdSchedulerFactory factory = new StdSchedulerFactory(); _scheduler = await factory.GetScheduler(); @@ -52,53 +41,29 @@ namespace NebulaBus.Scheduler { if (cancellationToken.IsCancellationRequested) return; - + //lock var gotLock = _store.Lock(); if (!gotLock) { - await Task.Delay(1000); + await Task.Delay(1000, cancellationToken); continue; } - var delayMessages = await _store.GetAll(); - foreach (var delayMessage in delayMessages) + while (true) { - var job = BuildJobDetail(delayMessage.Value); - - if (delayMessage.Value.TriggerTime < DateTimeOffset.Now) - { - var rightNowTrigger = TriggerBuilder.Create() - .WithIdentity($"Delay:{delayMessage.Key}") - .StartNow() - .Build(); - await _scheduler.ScheduleJob(job, rightNowTrigger); - continue; - } - - var trigger = TriggerBuilder.Create() - .WithIdentity($"Delay:{delayMessage.Key}") - .StartAt(delayMessage.Value.TriggerTime) - .Build(); - await _scheduler.ScheduleJob(job, trigger); + if (cancellationToken.IsCancellationRequested) + return; + await ScheduleJobFromStore(cancellationToken); + await Task.Delay(1000, cancellationToken); } - - await Task.Delay(1000); } } - public async Task StartSenderScheduler() - { - StdSchedulerFactory factory = new StdSchedulerFactory(); - _senderScheduler = await factory.GetScheduler(); - _senderScheduler.JobFactory = _jobFactory; - await _senderScheduler.Start(); - } - private static IJobDetail BuildJobDetail(DelayStoreMessage delayMessage) { var job = JobBuilder.Create() - .WithIdentity($"Schedule:{delayMessage.MessageId}") + .WithIdentity($"NebulaBusJob:{delayMessage.MessageId}") .UsingJobData("data", JsonConvert.SerializeObject(delayMessage)) .UsingJobData("messageId", delayMessage.MessageId) .UsingJobData("name", delayMessage.Name) @@ -106,5 +71,31 @@ namespace NebulaBus.Scheduler .Build(); return job; } + + private async Task ScheduleJobFromStore(CancellationToken cancellationToken) + { + var delayMessages = await _store.GetAll(); + foreach (var delayMessage in delayMessages) + { + var job = BuildJobDetail(delayMessage.Value); + if (await _scheduler.CheckExists(job.Key, cancellationToken)) + continue; + if (delayMessage.Value.TriggerTime < DateTimeOffset.Now) + { + var rightNowTrigger = TriggerBuilder.Create() + .WithIdentity($"NebulaBusTrigger:{delayMessage.Key}") + .StartNow() + .Build(); + await _scheduler.ScheduleJob(job, rightNowTrigger); + continue; + } + + var trigger = TriggerBuilder.Create() + .WithIdentity($"NebulaBusTrigger:{delayMessage.Key}") + .StartAt(delayMessage.Value.TriggerTime) + .Build(); + await _scheduler.ScheduleJob(job, trigger); + } + } } } \ No newline at end of file diff --git a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs index 13d54eb..b368ddf 100644 --- a/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/IDelayMessageScheduler.cs @@ -6,8 +6,7 @@ namespace NebulaBus.Scheduler { internal interface IDelayMessageScheduler { - Task StartStoreSchedule(CancellationToken cancellationToken); - Task StartSenderScheduler(); + Task StartSchedule(CancellationToken cancellationToken); Task Schedule(DelayStoreMessage delayStoreMessage); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 26d257a..615dce5 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -19,17 +19,11 @@ namespace NebulaBus.Store.Redis public async Task Add(DelayStoreMessage delayStoreMessage) { - using var redisLock = _redisClient.Lock($"{RedisLockKey}.Add", 2); - if (redisLock == null) - throw new Exception("got redis lock failed"); await _redisClient.HSetAsync(RedisKey, $"{delayStoreMessage.MessageId}", delayStoreMessage); } public async Task Delete(string messageId) { - using var redisLock = _redisClient.Lock($"{RedisLockKey}.Delete", 2); - if (redisLock == null) - throw new Exception("got redis lock failed"); await _redisClient.HDelAsync(RedisKey, $"{messageId}"); } diff --git a/src/WebApplicationSample/Properties/launchSettings.json b/src/WebApplicationSample/Properties/launchSettings.json index 942b1aa..f1601df 100644 --- a/src/WebApplicationSample/Properties/launchSettings.json +++ b/src/WebApplicationSample/Properties/launchSettings.json @@ -1,19 +1,11 @@ { "$schema": "http://json.schemastore.org/launchsettings.json", - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:7719", - "sslPort": 0 - } - }, "profiles": { "Local": { "commandName": "Project", "dotnetRunMessages": true, "launchBrowser": true, - "applicationUrl": "http://localhost:5203", + "launchUrl": "/swagger", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Local" } -- Gitee