From 1a3ca70f582e070305b7e5187e6c1af5aae93c06 Mon Sep 17 00:00:00 2001 From: liyou <303181885@qq.com> Date: Sat, 6 Dec 2025 17:42:01 +0800 Subject: [PATCH 1/2] fix checkpoint oom issue --- cpp/runtime/state/SnapshotStrategyRunner.h | 3 +++ .../api/operators/OperatorSnapshotFinalizer.h | 6 ++++++ .../api/operators/StreamOperatorStateHandler.h | 4 ++++ .../runtime/tasks/AsyncCheckpointRunnable.cpp | 2 +- .../tasks/SubtaskCheckpointCoordinatorImpl.cpp | 16 +++++++++++++--- .../tasks/SubtaskCheckpointCoordinatorImpl.h | 3 ++- 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/cpp/runtime/state/SnapshotStrategyRunner.h b/cpp/runtime/state/SnapshotStrategyRunner.h index 517fee3..129c1e9 100644 --- a/cpp/runtime/state/SnapshotStrategyRunner.h +++ b/cpp/runtime/state/SnapshotStrategyRunner.h @@ -55,6 +55,9 @@ public: LOG("native rocksdb checkpoint has been finished."); } } + if (snapshotResources) { + delete snapshotResources; + } return task; } diff --git a/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h b/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h index b1fbd96..10fcfef 100644 --- a/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h +++ b/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h @@ -64,6 +64,12 @@ public: *StateObjectCollection::EmptyIfNull(nullptr), *StateObjectCollection::EmptyIfNull(nullptr)); LOG(">>>>>>> end OperatorSnapshotFinalizer") + if (keyedStateManaged) { + delete keyedStateManaged; + } + if (KeyedStateRaw) { + delete KeyedStateRaw; + } }; [[nodiscard]] std::shared_ptr getTaskLocalState() const diff --git a/cpp/streaming/api/operators/StreamOperatorStateHandler.h b/cpp/streaming/api/operators/StreamOperatorStateHandler.h index ee0decd..c9a92c3 100644 --- a/cpp/streaming/api/operators/StreamOperatorStateHandler.h +++ b/cpp/streaming/api/operators/StreamOperatorStateHandler.h @@ -129,6 +129,10 @@ public: snapshotContext, isUsingCustomRawKeyedState); + if (snapshotContext) { + delete snapshotContext + } + return snapshotInProgress; } diff --git a/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp b/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp index a142e25..15ee71c 100644 --- a/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp +++ b/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp @@ -64,7 +64,7 @@ SnapshotsFinalizeResult *AsyncCheckpointRunnable::FinalizeNonFinishedSnapshots() auto operatorID = entry.first; OperatorSnapshotFutures *snapshotInProgress = entry.second; - OperatorSnapshotFinalizer *finalizedSnapshot = new OperatorSnapshotFinalizer(snapshotInProgress); + auto finalizedSnapshot = std::make_shared(snapshotInProgress); jobManagerTaskOperatorSubtaskStates->PutSubtaskStateByOperatorID( operatorID, diff --git a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp index 620c1a7..fb9550a 100644 --- a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp +++ b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp @@ -166,7 +166,8 @@ namespace omnistream::runtime { CheckpointMetricsBuilder *metrics, bool istaskDeployedAsFinished, bool isTaskFinished, - omnistream::Supplier *isRunning) + omnistream::Supplier *isRunning, + CheckpointOptions *options) { LOG(">>>>>> isTaskDeployedAsFinished " << istaskDeployedAsFinished << " isTaskFinished " << isTaskFinished); AsyncCheckpointRunnable *asyncCheckpointRunnable = new AsyncCheckpointRunnable( @@ -187,13 +188,21 @@ namespace omnistream::runtime { isTaskFinished, isRunning); RegisterAsyncCheckpointRunnable(asyncCheckpointRunnable->GetCheckpointId(), asyncCheckpointRunnable); - asyncOperationsThreadPool->Execute([asyncCheckpointRunnable]() { + asyncOperationsThreadPool->Execute([asyncCheckpointRunnable, + operatorSnapshotsInProgress, + metadata, + metrics, + options]() { try { asyncCheckpointRunnable->Run(); } catch (const std::exception &e) { LogError("Exception in async checkpoint: %s", e.what()); } delete asyncCheckpointRunnable; + delete operatorSnapshotsInProgress; + delete metadata; + delete metrics; + delete options; }); LOG(">>>>> Done") } @@ -291,7 +300,8 @@ namespace omnistream::runtime { new std::unordered_map(); try { if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) { - finishAndReportAsync(snapshotFutures, metadata, metrics, operatorChain->IsTaskDeployedAsFinished(), isTaskFinished, isRunning); + finishAndReportAsync(snapshotFutures, metadata, metrics, + operatorChain->IsTaskDeployedAsFinished(), isTaskFinished, isRunning, options); } else { cleanup(snapshotFutures, metadata, metrics, std::runtime_error("Checkpoint declined")); } diff --git a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h index 11b10a8..bd2993e 100644 --- a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h +++ b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h @@ -239,7 +239,8 @@ namespace omnistream::runtime { CheckpointMetricsBuilder *metrics, bool istaskDeployedAsFinished, bool isTaskFinished, - omnistream::Supplier *isRunning + omnistream::Supplier *isRunning, + CheckpointOptions *options ); static ChannelStateWriter *openChannelStateWriter( -- Gitee From 0689e9d50163fbc67fd5f711e264b9c6d7bd317d Mon Sep 17 00:00:00 2001 From: liyou <303181885@qq.com> Date: Sat, 6 Dec 2025 17:56:52 +0800 Subject: [PATCH 2/2] fix checkpoint oom issue --- cpp/streaming/api/operators/StreamOperatorStateHandler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/streaming/api/operators/StreamOperatorStateHandler.h b/cpp/streaming/api/operators/StreamOperatorStateHandler.h index c9a92c3..96898a8 100644 --- a/cpp/streaming/api/operators/StreamOperatorStateHandler.h +++ b/cpp/streaming/api/operators/StreamOperatorStateHandler.h @@ -130,7 +130,7 @@ public: isUsingCustomRawKeyedState); if (snapshotContext) { - delete snapshotContext + delete snapshotContext; } return snapshotInProgress; -- Gitee