From 995bf9915ccafcc2334a76cfb13462efa6985912 Mon Sep 17 00:00:00 2001 From: liyou <303181885@qq.com> Date: Tue, 16 Dec 2025 22:50:43 +0800 Subject: [PATCH 1/2] fix wordcount oom issue --- cpp/core/typeutils/TupleSerializer.cpp | 8 ++++++-- cpp/streaming/api/operators/StreamGroupedReduceOperator.h | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cpp/core/typeutils/TupleSerializer.cpp b/cpp/core/typeutils/TupleSerializer.cpp index edfd4c1..b1fbcb3 100644 --- a/cpp/core/typeutils/TupleSerializer.cpp +++ b/cpp/core/typeutils/TupleSerializer.cpp @@ -73,10 +73,14 @@ void Tuple2Serializer::createSerializer(nlohmann::json &type) LOG("Tuple type Name: " + typeName) if (typeName == "Long") { LOG("TupleSerializer: Long") - fieldSerializers.push_back(new LongSerializer()); + auto longSerializer = new LongSerializer(); + fieldSerializers.push_back(longSerializer); + longSerializer->setSelfBufferReusable(true); } else if (typeName == "String") { LOG("TupleSerializer: String") - fieldSerializers.push_back(new StringSerializer()); + auto stringSerializer = new StringSerializer(); + fieldSerializers.push_back(stringSerializer); + stringSerializer->setSelfBufferReusable(true); } else { // throw OmniException("not support serializer type"); } diff --git a/cpp/streaming/api/operators/StreamGroupedReduceOperator.h b/cpp/streaming/api/operators/StreamGroupedReduceOperator.h index 91b018b..b8543fb 100644 --- a/cpp/streaming/api/operators/StreamGroupedReduceOperator.h +++ b/cpp/streaming/api/operators/StreamGroupedReduceOperator.h @@ -93,7 +93,7 @@ namespace omnistream::datastream { } else { auto backend = input->clone(); values->update(backend); - + backend->putRefCount(); this->output->collect(record); } LOG("-----StreamGroupedReduceOperator processElement end-----"); -- Gitee From 4ad7d47dec61488372a07d7c63f9c5a4bdd800f3 Mon Sep 17 00:00:00 2001 From: liyou <303181885@qq.com> Date: Wed, 17 Dec 2025 09:23:17 +0800 Subject: [PATCH 2/2] fix wordcount oom issue --- cpp/streaming/runtime/tasks/omni/OmniAsyncDataOutputToOutput.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/streaming/runtime/tasks/omni/OmniAsyncDataOutputToOutput.cpp b/cpp/streaming/runtime/tasks/omni/OmniAsyncDataOutputToOutput.cpp index f96793d..9069f90 100644 --- a/cpp/streaming/runtime/tasks/omni/OmniAsyncDataOutputToOutput.cpp +++ b/cpp/streaming/runtime/tasks/omni/OmniAsyncDataOutputToOutput.cpp @@ -23,6 +23,7 @@ namespace omnistream { } else { auto newRecord = new StreamRecord(streamRecord->getValue(), streamRecord->getTimestamp()); output->collect(newRecord); + delete newRecord; } } -- Gitee