diff --git a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs index ac25e1a73145ad6e7508994176df62a6adb53857..473928645bb647798e5ee319b658d310097efe40 100644 --- a/src/NebulaBus/Scheduler/DelayMessageScheduler.cs +++ b/src/NebulaBus/Scheduler/DelayMessageScheduler.cs @@ -44,10 +44,17 @@ namespace NebulaBus.Scheduler _scheduler.JobFactory = _jobFactory; await _scheduler.Start(cts.Token); + var timer = new System.Timers.Timer(1000); + timer.Elapsed += Timer_Elapsed; + var lockValue = Guid.NewGuid().ToString(); + cts.Token.Register(() => { _scheduler.Clear(); _scheduler.Shutdown(); + timer?.Stop(); + timer?.Dispose(); + _store?.UnLock(lockValue); _store?.Dispose(); }); @@ -56,27 +63,33 @@ namespace NebulaBus.Scheduler try { //lock - var gotLock = _store.Lock(); + var gotLock = _store.Lock(lockValue); if (!gotLock) { await Task.Delay(1000, cancellationToken); continue; } - + timer.Start(); while (!cts.IsCancellationRequested) { await ScheduleJobFromStore(cts.Token); - await Task.Delay(1000, cts.Token); + await Task.Delay(1000); } } catch (Exception ex) { + timer.Stop(); _logger.LogError(ex, "Schedule Failed"); - await Task.Delay(1000, cts.Token); } + await Task.Delay(1000); } } + private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) + { + _store.RefreshLock(); + } + private IJobDetail BuildJobDetail(DelayStoreMessage delayMessage) { var job = JobBuilder.Create() diff --git a/src/NebulaBus/Store/IStore.cs b/src/NebulaBus/Store/IStore.cs index c3f30b74f3a768995aba18df7a889503384b0f7d..e240023350dad5aa5ac11a2ec4554ec96f7ad0e4 100644 --- a/src/NebulaBus/Store/IStore.cs +++ b/src/NebulaBus/Store/IStore.cs @@ -8,6 +8,8 @@ namespace NebulaBus.Store void Add(DelayStoreMessage delayStoreMessage); void Delete(DelayStoreMessage delayStoreMessage); Task Get(long beforeTimestamp); - bool Lock(); + void RefreshLock(); + bool Lock(string value); + void UnLock(string value); } } \ No newline at end of file diff --git a/src/NebulaBus/Store/Memory/MemoryStore.cs b/src/NebulaBus/Store/Memory/MemoryStore.cs index 8283f8a30297aa28ef984e53d656d28b1da4e4b9..44a242e864dae66ffae356dab839c5c4bd1fea01 100644 --- a/src/NebulaBus/Store/Memory/MemoryStore.cs +++ b/src/NebulaBus/Store/Memory/MemoryStore.cs @@ -30,12 +30,20 @@ namespace NebulaBus.Store.Memory return await Task.FromResult(result); } - public bool Lock() + public void Dispose() + { + } + + public void RefreshLock() + { + } + + public bool Lock(string value) { return true; } - public void Dispose() + public void UnLock(string value) { } } diff --git a/src/NebulaBus/Store/Redis/RedisStore.cs b/src/NebulaBus/Store/Redis/RedisStore.cs index ce8676851fb46c51d11bad84887993d7f4498853..48dc37db5b48325c4e0e4a3b7949d82ce181b5c7 100644 --- a/src/NebulaBus/Store/Redis/RedisStore.cs +++ b/src/NebulaBus/Store/Redis/RedisStore.cs @@ -65,17 +65,29 @@ namespace NebulaBus.Store.Redis tran.Exec(); } - public bool Lock() + public bool Lock(string value) { - _redisClientLock = _redisClient.Lock(LockKey, 3, true); - return _redisClientLock != null; + return _redisClient.SetNx(LockKey, value, 3); + } + + public void RefreshLock() + { + _redisClient.Expire(LockKey, 3); + } + + public void UnLock(string value) + { + var val = _redisClient.Get(LockKey); + if (val == value) + _redisClient.Del(LockKey); } public void Dispose() { try { - _redisClientLock.Dispose(); + _redisClient?.Dispose(); + _redisClientLock?.Dispose(); } catch { }