diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ae6b4f4d5bd1dd7a2b09f814af2766c58a8a8c05..818c20e924e926b1545938568953020152e579a7 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -9,7 +9,7 @@ set(CMAKE_CXX_STANDARD 17) if(DEFINED ENV{OMNI_HOME}) set(CMAKE_INSTALL_PREFIX $ENV{OMNI_HOME} CACHE PATH "Install path prefix" FORCE) endif() -option(WITH_OMNISTATESTORE "Enable to build with OmniStateStore" OFF) +option(WITH_OMNISTATESTORE "Enable to build with OmniStateStore" ON) add_library(project_config INTERFACE) if(WITH_OMNISTATESTORE) target_compile_definitions(project_config INTERFACE WITH_OMNISTATESTORE) @@ -149,6 +149,7 @@ endif() set(CMAKE_SKIP_RPATH TRUE) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS} -lrt -fno-var-tracking-assignments -pipe -g -Wall -fPIC -fno-omit-frame-pointer -fno-common -fno-stack-protector") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS} -lrt -fno-var-tracking-assignments -Wno-terminate -Wno-delete-incomplete -pipe -Wall -Wno-strict-aliasing -Wtrampolines -D_FORTIFY_SOURCE=2 -fPIC -finline-functions -fstack-protector-strong -s -Wl,-z,noexecstack -Wl,-z,relro,-z,now -finline-limit=6000 --param inline-unit-growth=300") +add_compile_options("-march=armv8-a+sve") # Default including directory include_directories("${CMAKE_CURRENT_SOURCE_DIR}/core/include/" "${CMAKE_CURRENT_SOURCE_DIR}" diff --git a/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h b/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h index b00befeeabe428189ef9df411538ba54dbb0cf12..d5cbae5d189adc7fa34bf0e2326c03ac286622d1 100644 --- a/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h +++ b/cpp/streaming/runtime/partitioner/V2/StreamPartitionerV2.h @@ -18,6 +18,7 @@ #include "table/data/vectorbatch/VectorBatch.h" #include "vector/vector_helper.h" #include "table/data/Row.h" +#include namespace omnistream { template @@ -91,6 +92,31 @@ public: [[nodiscard]] virtual std::string toString() const = 0; virtual std::unique_ptr getUpstreamSubtaskStateMapper() = 0; + void setRowKind_sve(int i, int size, uint8_t* src, int32_t* offsets, uint8_t* dst) { + auto pg = svwhilelt_b32(i, size); + svint32_t offsetRaw = svld1_s32(pg, offsets); + svuint32_t rawData = svld1ub_gather_offset_u32(pg, src, offsetRaw); + svuint8_t u8_vec = svreinterpret_u8_u32(rawData); + svuint8_t indices = svindex_u8(0, sizeof(uint32_t)); + svuint8_t packed = svtbl(u8_vec, indices); + auto pg2 = svwhilelt_b8(i, size); + svst1_u8(pg2, dst, packed); + } + + void setTimestamp_sve(int i, int size, int64_t* src, int32_t* offsets, int64_t* dst) { + auto pg = svwhilelt_b32(i, size); + svint32_t offsetRaw = svld1_s32(pg, offsets); + svint64_t offset1 = svunpklo(offsetRaw); + svint64_t offset2 = svunpkhi(offsetRaw); + auto pg2 = svwhilelt_b64(i, size); + svint64_t rawData = svld1_gather_index(pg2, src, offset1); + svst1_s64(pg2, dst, rawData); + int jump = svcntd(); + auto pg3 = svwhilelt_b64(i + jump, size); + svint64_t rawData2 = svld1_gather_index(pg3, src, offset2); + svst1_s64(pg3, dst + jump, rawData2); + } + StreamRecord* buildNewStreamRecordBasedOnOffsets(std::vector& offsets, StreamRecord* originStreamRecord, long timestamp) { @@ -110,10 +136,15 @@ public: offsets.data(), 0, offsets.size())); } } - for (size_t i = 0; i < offsets.size(); i++) { + int processElement = svcntw(); + int size = offsets.size(); + for (size_t i = 0; i < offsets.size(); i += processElement) { int position = offsets[i]; - copyedVectorBatch->setRowKind(i, vectorBatch->getRowKind(position)); - copyedVectorBatch->setTimestamp(i, vectorBatch->getTimestamp(position)); + + setRowKind_sve(i, size, reinterpret_cast(vectorBatch->getRowKinds()), offsets.data() + i, + reinterpret_cast(copyedVectorBatch->getRowKinds()) + i); + setTimestamp_sve(i, size, vectorBatch->getTimestamps(), offsets.data() + i, + copyedVectorBatch->getTimestamps() + i); } return new StreamRecord(copyedVectorBatch, timestamp); } diff --git a/cpp/table/data/util/VectorBatchUtil.h b/cpp/table/data/util/VectorBatchUtil.h index efeef0a92b051e24ad92b269717ba79e7a381de8..ff966d247975f91baea32de272acc35227144c64 100644 --- a/cpp/table/data/util/VectorBatchUtil.h +++ b/cpp/table/data/util/VectorBatchUtil.h @@ -13,6 +13,7 @@ #include "table/data/vectorbatch/VectorBatch.h" #include "OmniOperatorJIT/core/src/vector/unsafe_vector.h" +#include class VectorBatchUtil { public: @@ -35,6 +36,36 @@ public: return static_cast(ubatchId); } + static void deComboIDSVE(uint64_t* src, uint32_t* batchIDdst, uint32_t* rowIDdst, int num) + { + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, src + i); + svuint64_t comboID2 = svld1(pg2, src + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } + } + + static void getComboId_sve(int batchId, int rowCount, int64_t* result) { + int processNum = svcntd(); + svint64_t batchData = svlsl_n_s64_x(svptrue_b64(), svdup_n_s64(batchId), 32); + for (int i = 0; i < rowCount; i+= processNum) { + svbool_t pg = svwhilelt_b64(i, rowCount); + svint64_t rowData = svindex_s64(i, 1); + svint64_t comboIDs = svorr_z(pg, batchData, rowData); + svst1_s64(pg, result + i, comboIDs); + } + } + // To print VectorBatch, use VectorHelper::PrintVecBatch from OmniOperatorJIT/core/src/vector/vector_helper.h template diff --git a/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp b/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp index d78c103ee7a60b54c4977c57be468cf2a45eb5bb..521ca42a95e8a60f593863afe3a18f2b9f0d5bd0 100644 --- a/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp +++ b/cpp/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.cpp @@ -45,8 +45,10 @@ unordered_map RowTimeDeduplicateFunction::getUpdateState( { unordered_map tmpState; int curBatchId = getCurrentBatchId() - 1; + long* comboIDs = new long[rowCount]; + VectorBatchUtil::getComboId_sve(curBatchId, rowCount, comboIDs); for (int i = 0; i < rowCount; i++) { - long comboId = VectorBatchUtil::getComboId(curBatchId, i); + long comboId = comboIDs[i]; // 建立key RowData *key = groupByKeySelector->getKey(inputVB, i); @@ -63,11 +65,12 @@ unordered_map RowTimeDeduplicateFunction::getUpdateState( } else if (itTmp != tmpState.end()) { long curComboId = itTmp->second; if (isDuplicate(curComboId, comboId)) { - itTmp->second = comboId; + tmpState[key] = comboId; } delete key; } } + delete[] comboIDs; return tmpState; } diff --git a/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h b/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h index 8d50d13a6d21e1c36b966cdab166f1b24594b602..659b43923ba6e836beff0accb1ccaac656e14df5 100644 --- a/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h +++ b/cpp/table/runtime/operators/join/AbstractStreamingJoinOperator.h @@ -30,6 +30,7 @@ #include "streaming/api/operators/TimestampedCollector.h" #include "JoinRecordStateView.h" +#include // joinCondition includes 2 steps: // (1) check if key is null @@ -127,7 +128,7 @@ protected: // Null-padded entries that need to be inserted/deleted std::vector deleteRecords; // Kinds for those null-padded entries based on accumulate(0) or retract(1) - std::vector deleteKinds; + std::vector deleteKinds; FilterFuncPtr generatedFilter = nullptr; JoinedRowFilterFunc joinCondition; @@ -259,12 +260,12 @@ void AbstractStreamingJoinOperator::of( if (RowDataUtil::isAccumulateMsg(input->getRowKind(i))) { if (std::get<1>(it->second) == 0) { deleteRecords.push_back(std::get<2>(it->second)); - deleteKinds.push_back(0); + deleteKinds.push_back(static_cast(0)); } } else { if (std::get<1>(it->second) == 1) { deleteRecords.push_back(std::get<2>(it->second)); - deleteKinds.push_back(1); + deleteKinds.push_back(static_cast(1)); } } int32_t newNumAssociate = RowDataUtil::isAccumulateMsg(input->getRowKind(i))? std::get<1>(it->second) + 1 : std::get<1>(it->second) - 1; @@ -396,11 +397,30 @@ std::unique_ptr> AbstractStreamingJoinOperator::filterRe joinCondition[col](vector, inputRowId, col, vals.data(), reinterpret_cast(nulls.data())); } } + int num = (*matchedRecords).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast((*matchedRecords).data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast((*matchedRecords).data()) + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } // for the otherSide - for (const int64_t &comboId : *matchedRecords) { - int32_t othersideRowId = VectorBatchUtil::getRowId(comboId); - int32_t othersideBatchId = VectorBatchUtil::getBatchId(comboId); + for (int i = 0; i < num; i++) { + int32_t othersideRowId = rowIDdst[i]; + int32_t othersideBatchId = batchIDdst[i]; for (auto col : colRefsForNonEquiCondition) { bool isLeftColumn = col < leftArity; @@ -415,10 +435,11 @@ std::unique_ptr> AbstractStreamingJoinOperator::filterRe auto result = generatedFilter( vals.data(), reinterpret_cast(nulls.data()), nullptr, &resultBool, nullptr, (int64_t)(&context)); if (result) { - filteredRecords->push_back(comboId); + filteredRecords->push_back((*matchedRecords)[i]); } } - + delete[] rowIDdst; + delete[] batchIDdst; return filteredRecords; } diff --git a/cpp/table/runtime/operators/join/JoinRecordStateView.h b/cpp/table/runtime/operators/join/JoinRecordStateView.h index b1b58d51898ab806f3e3ff30d0d4c6f422991feb..4c2f08f37a216ab136a76e2c8459479d4804c0e6 100644 --- a/cpp/table/runtime/operators/join/JoinRecordStateView.h +++ b/cpp/table/runtime/operators/join/JoinRecordStateView.h @@ -177,6 +177,8 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in } std::vector xxh128Hashes = input->getXXH128s(); + long* comboIDs = new long[input->GetRowCount()]; + VectorBatchUtil::getComboId_sve(batchId, input->GetRowCount(), comboIDs); for (int i = 0; i < input->GetRowCount(); i++) { if (filterNulls && keySelector->isAnyKeyNull(input, i)) { continue; @@ -188,7 +190,7 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in recordStateVB->updateOrCreate( ukey, /* default value used only if key is missing and delta is positive */ - UV {1, VectorBatchUtil::getComboId(batchId, i)}, + UV {1, comboIDs[i]}, [delta, &numAssociates, i](UV& val) -> std::optional { int newCount = std::get<0>(val) + delta; if (newCount != 0) { @@ -204,6 +206,7 @@ void InputSideHasNoUniqueKey::addOrRectractRecord(omnistream::VectorBatch *in delete key; } } + delete[] comboIDs; } template diff --git a/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp b/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp index 612de1d9eedd606f49b315dd7b4552c6d8a70e43..695751359b541275b04451047c61ee1830a6a1ef 100644 --- a/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp +++ b/cpp/table/runtime/operators/join/StreamingJoinOperator.cpp @@ -213,11 +213,30 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumn(omn outputCol->SetNull(rowIndex++); } } + int num = this->deleteRecords.size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast(this->deleteRecords.data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast(this->deleteRecords.data()) + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } // Loop wont run for inner join as deletedRecords can have elements only if other is Outer - for (auto id : this->deleteRecords) { - auto batchId = VectorBatchUtil::getBatchId(id); - auto rowId = VectorBatchUtil::getRowId(id); + for (int i = 0; i < num; i++) { + auto batchId = batchIDdst[i]; + auto rowId = rowIDdst[i]; if (curbatchId != batchId) { if (otherSideStateView->getVectorBatch(batchId) == nullptr) { throw std::runtime_error("get batch is nullptr in buildOtherSideColumn"); @@ -229,7 +248,8 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumn(omn auto val = inputCol->GetValue(rowId); outputCol->SetValue(rowIndex++, val); } - + delete[] batchIDdst; + delete[] rowIDdst; return outputCol; } @@ -274,17 +294,35 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumnVarc outputCol->SetNull(rowIndex++); } } + int num = this->deleteRecords.size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, reinterpret_cast(this->deleteRecords.data()) + i); + svuint64_t comboID2 = svld1(pg2, reinterpret_cast(this->deleteRecords.data()) + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } // Loop wont run for inner join as deletedRecords can have elements only if other is Outer - for (auto id : this->deleteRecords) { - auto batchId = VectorBatchUtil::getBatchId(id); - auto rowId = VectorBatchUtil::getRowId(id); - auto vectorBatch = otherSideStateView->getVectorBatch(batchId); - if (vectorBatch == nullptr) { + for (int i = 0; i < num; i++) { + auto batchId = batchIDdst[i]; + auto rowId = rowIDdst[i]; + auto inputCol = otherSideStateView->getVectorBatch(batchId)->Get(icol); + if (otherSideStateView->getVectorBatch(batchId) == nullptr) { LOG("string from vectorBatch is nullptr") throw std::runtime_error("string from vectorBatch is nullptr"); } - auto inputCol = vectorBatch->Get(icol); if (inputCol->GetEncoding() == OMNI_FLAT) { auto castedCol = reinterpret_cast(inputCol); auto sv = castedCol->GetValue(rowId); @@ -295,7 +333,8 @@ omniruntime::vec::BaseVector *StreamingJoinOperator::buildOtherSideColumnVarc outputCol->SetValue(rowIndex++, sv); } } - + delete[] batchIDdst; + delete[] rowIDdst; return outputCol; } @@ -428,6 +467,23 @@ void StreamingJoinOperator::setOutPutValueOther(omnistream::VectorBatch *inpu } } +template +void StreamingJoinOperator::setRowKind_sve(int i, int size, uint8_t* dst, int8_t* condition) { + auto pg = svwhilelt_b8(i, size); + svint8_t conditionData = svld1_s8(pg, condition); + svbool_t mask = svcmpeq_n_s8(pg, conditionData, 0); + svuint8_t data = svsel_u8(mask, svdup_n_u8(3), svdup_n_u8(0)); + svst1_u8(pg, dst, data); +} + +template +void StreamingJoinOperator::setTimestamp_raw(int start, int size, const int64_t* src, int64_t* dst, int rowIndex) { + int processElement = svcntb(); + for (int i = 0; i < processElement && start + i < size; i++) { + dst[i + rowIndex] = src[i + start]; + } +} + template RowKind StreamingJoinOperator::getOutputVBRowKind(omnistream::VectorBatch *input, bool inputIsOuter, bool otherIsOuter, int index) @@ -466,12 +522,11 @@ void StreamingJoinOperator::setOutPutMetaData(omnistream::VectorBatch *input, } } + int size = this->deleteRecords.size(); + int processElement = svcntb(); for (size_t i = 0; i < this->deleteRecords.size(); i++) { - if (this->deleteKinds[i] == 0) { // For accumulate when numAssociates is 0 - outputVB->setRowKind(rowIndex, RowKind::DELETE); - } else { // For retract when numAssociates is 1 - outputVB->setRowKind(rowIndex, RowKind::INSERT); - } - outputVB->setTimestamp(rowIndex++, input->getTimestamp(i)); + setRowKind_sve(i, size, reinterpret_cast(outputVB->getRowKinds()) + rowIndex, this->deleteKinds.data() + i); + setTimestamp_raw(i, size, input->getTimestamps(), outputVB->getTimestamps(), rowIndex); + rowIndex += processElement; } } \ No newline at end of file diff --git a/cpp/table/runtime/operators/join/StreamingJoinOperator.h b/cpp/table/runtime/operators/join/StreamingJoinOperator.h index 13ff02fccd33d2dccb0d341f9f34c09d225b9255..b0bca57f47dd118e96313e1f8e4086230f9e0092 100644 --- a/cpp/table/runtime/operators/join/StreamingJoinOperator.h +++ b/cpp/table/runtime/operators/join/StreamingJoinOperator.h @@ -18,6 +18,7 @@ #include "table/data/util/RowDataUtil.h" #include "table/data/vectorbatch/VectorBatch.h" #include "OmniOperatorJIT/core/src/vector/large_string_container.h" +#include template class StreamingJoinOperator : public AbstractStreamingJoinOperator { @@ -179,6 +180,10 @@ private: RowKind getOutputVBRowKind(omnistream::VectorBatch *input, bool inputIsOuter, bool otherIsOuter, int index); + void setRowKind_sve(int i, int size, uint8_t* dst, int8_t* condition); + + void setTimestamp_raw(int start, int size, const int64_t* src, int64_t* dst, int rowIndex); + void AssembleFisrtTime(omnistream::VectorBatch* input, omnistream::VectorBatch* outputVB, bool inputIsLeft); void AssembleSecondTime(omnistream::VectorBatch* input, omnistream::VectorBatch* outputVB, diff --git a/cpp/table/runtime/operators/join/window/WindowJoinOperator.h b/cpp/table/runtime/operators/join/window/WindowJoinOperator.h index 4b9913dcf2dc201a89eacf79c1c7b3283618959a..f8411f3332b9c9dff0958ce10bff0af36ff70c7e 100644 --- a/cpp/table/runtime/operators/join/window/WindowJoinOperator.h +++ b/cpp/table/runtime/operators/join/window/WindowJoinOperator.h @@ -28,6 +28,7 @@ #include "OmniOperatorJIT/core/src/codegen/simple_filter_codegen.h" #include "OmniOperatorJIT/core/src/vector/unsafe_vector.h" #include "OmniOperatorJIT/core/src/operator/execution_context.h" +#include using VectorBatchId = uint64_t; @@ -508,21 +509,40 @@ template inline void WindowJoinOperator::insertLeft(int colIdx, std::vector *leftElements, std::vector *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) { + int num = (*leftElements).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, (*leftElements).data() + i); + svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half); + + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } auto col = reinterpret_cast *>(outputBatch->Get(colIdx)); if (isNonEquiCondition || !isInner) { int rowIdx = 0; - for (auto element : *leftElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int j = 0; j < num; j++) { + int batchId = batchIDdst[j]; + int rowId = rowIDdst[j]; col->SetValue( rowIdx, leftWindowState->getVectorBatch(batchId)->template GetValueAt(colIdx, rowId)); rowIdx++; } } else { int rowIdx = 0; - for (auto element : *leftElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int j = 0; j < num; j++) { + int batchId = batchIDdst[j]; + int rowId = rowIDdst[j]; auto value = leftWindowState->getVectorBatch(batchId)->template GetValueAt(colIdx, rowId); for (size_t i = 0; i < rightElements->size(); i++) { col->SetValue(i + rowIdx, value); @@ -530,6 +550,8 @@ inline void WindowJoinOperator::insertLeft(int colIdx, std::vectorsize(); } } + delete batchIDdst; + delete rowIDdst; } template @@ -550,9 +572,25 @@ void WindowJoinOperator::insertLeftVarchar(int colIdx, std::vector( leftWindowState->getVectorBatch(batchId)->Get(colIdx))->GetValue(rowId); for (size_t i = 0; i < rightElements->size(); i++) { @@ -560,6 +598,8 @@ void WindowJoinOperator::insertLeftVarchar(int colIdx, std::vectorsize(); } + delete batchIDdst; + delete rowIDdst; } } @@ -568,12 +608,30 @@ template inline void WindowJoinOperator::insertRight(int colIdx, std::vector *leftElements, std::vector *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) { + int num = (*rightElements).size(); + uint32_t* batchIDdst = new uint32_t[num]; + uint32_t* rowIDdst = new uint32_t[num]; + + int processNum = svcntw(); + int half = svcntd(); + for (int i = 0; i < num; i+=processNum) { + svbool_t pg = svwhilelt_b64(i, num); + svbool_t pg2 = svwhilelt_b64(i + half, num); + svbool_t pg3 = svwhilelt_b32(i, num); + svuint64_t comboID = svld1(pg, (*rightElements).data() + i); + svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half); + svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2)); + svst1_u32(pg3, rowIDdst + i, rowID); + svst1_u32(pg3, batchIDdst + i, batchID); + } + auto col = reinterpret_cast *>(outputBatch->Get(colIdx)); if (isNonEquiCondition || !isInner) { int rowIdx = 0; - for (auto element : *rightElements) { - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + for (int i = 0; i < num; i++) { + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; col->SetValue(rowIdx, rightWindowState->getVectorBatch(batchId)->template GetValueAt( colIdx - leftTypes.size(), rowId)); @@ -582,8 +640,8 @@ inline void WindowJoinOperator::insertRight(int colIdx, std::vectorsize(); i++) { auto element = rightElements->at(i); - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; auto value = rightWindowState->getVectorBatch(batchId)->template GetValueAt( colIdx - leftTypes.size(), rowId); for (size_t j = 0; j < leftElements->size(); j++) { @@ -592,6 +650,8 @@ inline void WindowJoinOperator::insertRight(int colIdx, std::vector @@ -611,10 +671,28 @@ void WindowJoinOperator::insertRightVarchar(int colIdx, std::vectorsize(); i++) { auto element = rightElements->at(i); - int batchId = VectorBatchUtil::getBatchId(element); - int rowId = VectorBatchUtil::getRowId(element); + int batchId = batchIDdst[i]; + int rowId = rowIDdst[i]; auto value = reinterpret_cast( rightWindowState->getVectorBatch(batchId)->Get(colIdx - leftTypes.size()))->GetValue(rowId); for (size_t j = 0; j < leftElements->size(); j++) { @@ -622,6 +700,8 @@ void WindowJoinOperator::insertRightVarchar(int colIdx, std::vectorSetValue(valIdx, value); } } + delete[] batchIDdst; + delete[] rowIDdst; } } template