From 0fb6ade2b66e9b6a8f2936355000af3e27d11584 Mon Sep 17 00:00:00 2001 From: liuhai Date: Thu, 17 Apr 2025 07:56:29 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Scheduler/DelayMessageScheduler.cs | 3 ++- src/NebulaBus/Store/IStore.cs | 3 ++- src/NebulaBus/Store/Redis/RedisStore.cs | 10 +++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index ac25e1a..95a2a13 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -51,12 +51,13 @@ namespace NebulaBus.Scheduler _store?.Dispose(); }); + var lockValue = Guid.NewGuid().ToString(); while (!cts.IsCancellationRequested) { try { //lock - var gotLock = _store.Lock(); + var gotLock = _store.Lock(lockValue); if (!gotLock) { await Task.Delay(1000, cancellationToken); diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index c3f30b7..6424911 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -8,6 +8,7 @@ namespace NebulaBus.Store void Add(DelayStoreMessage delayStoreMessage); void Delete(DelayStoreMessage delayStoreMessage); Task Get(long beforeTimestamp); - bool Lock(); + void RefreshLock(); + bool Lock(string value); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index ce86768..a1b0f3a 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -65,10 +65,14 @@ namespace NebulaBus.Store.Redis tran.Exec(); } - public bool Lock() + public bool Lock(string value) { - _redisClientLock = _redisClient.Lock(LockKey, 3, true); - return _redisClientLock != null; + return _redisClient.SetNx(LockKey, value, 3); + } + + public void RefreshLock() + { + _redisClient.Expire(LockKey, 3); } public void Dispose() -- Gitee From 091c31d223166e47c29e4b18b427e506aa43b8d8 Mon Sep 17 00:00:00 2001 From: liuhai Date: Thu, 17 Apr 2025 08:18:09 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Scheduler/DelayMessageScheduler.cs | 15 +++++++++++++-- src/NebulaBus/Store/IStore.cs | 1 + src/NebulaBus/Store/Memory/MemoryStore.cs | 12 ++++++++++-- src/NebulaBus/Store/Redis/RedisStore.cs | 9 ++++++++- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 95a2a13..1ebba33 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -44,14 +44,20 @@ namespace NebulaBus.Scheduler _scheduler.JobFactory = _jobFactory; await _scheduler.Start(cts.Token); + var timer = new System.Timers.Timer(1000); + timer.Elapsed += Timer_Elapsed; + var lockValue = Guid.NewGuid().ToString(); + cts.Token.Register(() => { _scheduler.Clear(); _scheduler.Shutdown(); + timer?.Stop(); + timer?.Dispose(); + _store?.UnLock(lockValue); _store?.Dispose(); }); - var lockValue = Guid.NewGuid().ToString(); while (!cts.IsCancellationRequested) { try @@ -63,7 +69,7 @@ namespace NebulaBus.Scheduler await Task.Delay(1000, cancellationToken); continue; } - + timer.Start(); while (!cts.IsCancellationRequested) { await ScheduleJobFromStore(cts.Token); @@ -78,6 +84,11 @@ namespace NebulaBus.Scheduler } } + private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) + { + _store.RefreshLock(); + } + private IJobDetail BuildJobDetail(DelayStoreMessage delayMessage) { var job = JobBuilder.Create() diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index 6424911..e240023 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -10,5 +10,6 @@ namespace NebulaBus.Store Task Get(long beforeTimestamp); void RefreshLock(); bool Lock(string value); + void UnLock(string value); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Memory/MemoryStore.cs b/src/NebulaBus/Store/Memory/MemoryStore.cs index 8283f8a..44a242e 100644 --- a/src/NebulaBus/Store/Memory/MemoryStore.cs +++ b/src/NebulaBus/Store/Memory/MemoryStore.cs @@ -30,12 +30,20 @@ namespace NebulaBus.Store.Memory return await Task.FromResult(result); } - public bool Lock() + public void Dispose() + { + } + + public void RefreshLock() + { + } + + public bool Lock(string value) { return true; } - public void Dispose() + public void UnLock(string value) { } } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index a1b0f3a..2dec7e3 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -75,11 +75,18 @@ namespace NebulaBus.Store.Redis _redisClient.Expire(LockKey, 3); } + public void UnLock(string value) + { + var val = _redisClient.Get(LockKey); + if (val == value) + _redisClient.Del(LockKey); + } + public void Dispose() { try { - _redisClientLock.Dispose(); + _redisClientLock?.Dispose(); } catch { } -- Gitee From 0ed9786d308af38378fe0ceffbdcf1d63d579cc3 Mon Sep 17 00:00:00 2001 From: liuhai Date: Thu, 17 Apr 2025 08:55:34 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/NebulaBus/Scheduler/DelayMessageScheduler.cs | 5 +++-- src/NebulaBus/Store/Redis/RedisStore.cs | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index 1ebba33..4739286 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -73,14 +73,15 @@ namespace NebulaBus.Scheduler while (!cts.IsCancellationRequested) { await ScheduleJobFromStore(cts.Token); - await Task.Delay(1000, cts.Token); + await Task.Delay(1000); } } catch (Exception ex) { + timer.Stop(); _logger.LogError(ex, "Schedule Failed"); - await Task.Delay(1000, cts.Token); } + await Task.Delay(1000); } } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index 2dec7e3..48dc37d 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -86,6 +86,7 @@ namespace NebulaBus.Store.Redis { try { + _redisClient?.Dispose(); _redisClientLock?.Dispose(); } catch -- Gitee