diff --git a/cpp/runtime/state/SnapshotStrategyRunner.h b/cpp/runtime/state/SnapshotStrategyRunner.h index 517fee31e03b4975eae317a3216a1f4e7ba62c50..129c1e93d0f964c0e291dcda4e842042911e40d4 100644 --- a/cpp/runtime/state/SnapshotStrategyRunner.h +++ b/cpp/runtime/state/SnapshotStrategyRunner.h @@ -55,6 +55,9 @@ public: LOG("native rocksdb checkpoint has been finished."); } } + if (snapshotResources) { + delete snapshotResources; + } return task; } diff --git a/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h b/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h index b1fbd96583b1e0469e99920a370e1021b7f9cbaf..10fcfefe7b81cf58dd6cf887ad743228083fe916 100644 --- a/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h +++ b/cpp/streaming/api/operators/OperatorSnapshotFinalizer.h @@ -64,6 +64,12 @@ public: *StateObjectCollection::EmptyIfNull(nullptr), *StateObjectCollection::EmptyIfNull(nullptr)); LOG(">>>>>>> end OperatorSnapshotFinalizer") + if (keyedStateManaged) { + delete keyedStateManaged; + } + if (KeyedStateRaw) { + delete KeyedStateRaw; + } }; [[nodiscard]] std::shared_ptr getTaskLocalState() const diff --git a/cpp/streaming/api/operators/StreamOperatorStateHandler.h b/cpp/streaming/api/operators/StreamOperatorStateHandler.h index ee0decd3a36f1d08743b69e1a6dc635e998ed835..96898a81a8c9d5774f94126d3d322b5e9e3e7515 100644 --- a/cpp/streaming/api/operators/StreamOperatorStateHandler.h +++ b/cpp/streaming/api/operators/StreamOperatorStateHandler.h @@ -129,6 +129,10 @@ public: snapshotContext, isUsingCustomRawKeyedState); + if (snapshotContext) { + delete snapshotContext; + } + return snapshotInProgress; } diff --git a/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp b/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp index a142e25ba50923398547935a8f110a188625f2b7..15ee71c8f4ca65f8e295fd53ac9578f45b2e2600 100644 --- a/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp +++ b/cpp/streaming/runtime/tasks/AsyncCheckpointRunnable.cpp @@ -64,7 +64,7 @@ SnapshotsFinalizeResult *AsyncCheckpointRunnable::FinalizeNonFinishedSnapshots() auto operatorID = entry.first; OperatorSnapshotFutures *snapshotInProgress = entry.second; - OperatorSnapshotFinalizer *finalizedSnapshot = new OperatorSnapshotFinalizer(snapshotInProgress); + auto finalizedSnapshot = std::make_shared(snapshotInProgress); jobManagerTaskOperatorSubtaskStates->PutSubtaskStateByOperatorID( operatorID, diff --git a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp index 620c1a79c03aed4adde283f8454b061ea6eaf3a1..fb9550a992cb816d55a21a8b06361ee31f244254 100644 --- a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp +++ b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.cpp @@ -166,7 +166,8 @@ namespace omnistream::runtime { CheckpointMetricsBuilder *metrics, bool istaskDeployedAsFinished, bool isTaskFinished, - omnistream::Supplier *isRunning) + omnistream::Supplier *isRunning, + CheckpointOptions *options) { LOG(">>>>>> isTaskDeployedAsFinished " << istaskDeployedAsFinished << " isTaskFinished " << isTaskFinished); AsyncCheckpointRunnable *asyncCheckpointRunnable = new AsyncCheckpointRunnable( @@ -187,13 +188,21 @@ namespace omnistream::runtime { isTaskFinished, isRunning); RegisterAsyncCheckpointRunnable(asyncCheckpointRunnable->GetCheckpointId(), asyncCheckpointRunnable); - asyncOperationsThreadPool->Execute([asyncCheckpointRunnable]() { + asyncOperationsThreadPool->Execute([asyncCheckpointRunnable, + operatorSnapshotsInProgress, + metadata, + metrics, + options]() { try { asyncCheckpointRunnable->Run(); } catch (const std::exception &e) { LogError("Exception in async checkpoint: %s", e.what()); } delete asyncCheckpointRunnable; + delete operatorSnapshotsInProgress; + delete metadata; + delete metrics; + delete options; }); LOG(">>>>> Done") } @@ -291,7 +300,8 @@ namespace omnistream::runtime { new std::unordered_map(); try { if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) { - finishAndReportAsync(snapshotFutures, metadata, metrics, operatorChain->IsTaskDeployedAsFinished(), isTaskFinished, isRunning); + finishAndReportAsync(snapshotFutures, metadata, metrics, + operatorChain->IsTaskDeployedAsFinished(), isTaskFinished, isRunning, options); } else { cleanup(snapshotFutures, metadata, metrics, std::runtime_error("Checkpoint declined")); } diff --git a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h index 11b10a89e345b45c3e78cba789ba2a094e43d5f7..bd2993e6b5c054e7157ab3fbdd89326029a2299f 100644 --- a/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h +++ b/cpp/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.h @@ -239,7 +239,8 @@ namespace omnistream::runtime { CheckpointMetricsBuilder *metrics, bool istaskDeployedAsFinished, bool isTaskFinished, - omnistream::Supplier *isRunning + omnistream::Supplier *isRunning, + CheckpointOptions *options ); static ChannelStateWriter *openChannelStateWriter(