From 6f0de13aa293bf0a63c58b5c85436013aab145cb Mon Sep 17 00:00:00 2001 From: tongxiaotian Date: Tue, 19 Mar 2024 10:51:06 +0800 Subject: [PATCH] =?UTF-8?q?[fire-1110]Hudi=E6=B3=A8=E8=A7=A3=E6=94=AF?= =?UTF-8?q?=E6=8C=81HBase=E7=B4=A2=E5=BC=95=E4=BE=BF=E6=8D=B7=E5=8C=96?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zto/fire/core/anno/connector/Hudi.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi10.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi11.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi2.java | 95 +++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi3.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi4.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi5.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi6.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi7.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi8.java | 94 ++++++++++++++++++ .../zto/fire/core/anno/connector/Hudi9.java | 94 ++++++++++++++++++ .../com/zto/fire/core/conf/AnnoManager.scala | 92 +++++++++++++++++- .../zto/fire/spark/BaseHudiStreaming.scala | 28 +++--- 13 files changed, 1141 insertions(+), 14 deletions(-) diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java index b50a7476..f07107b1 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java @@ -123,4 +123,98 @@ public @interface Hudi { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java index 320ebd20..10cf7335 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java @@ -123,4 +123,98 @@ public @interface Hudi10 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java index 762c36c8..98b3f050 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java @@ -123,4 +123,98 @@ public @interface Hudi11 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java index fdfe2523..23efe7c5 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java @@ -123,4 +123,99 @@ public @interface Hudi2 { */ boolean useRecordIndex() default false; + + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java index 597ccf0b..94060164 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java @@ -123,4 +123,98 @@ public @interface Hudi3 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java index 7c0126ef..6f8e0f1b 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java @@ -123,4 +123,98 @@ public @interface Hudi4 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java index 234d1479..bc1cb35a 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java @@ -123,4 +123,98 @@ public @interface Hudi5 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java index e31ad945..eecff2b3 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java @@ -123,4 +123,98 @@ public @interface Hudi6 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java index 2ac16c8e..0e30544e 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java @@ -123,4 +123,98 @@ public @interface Hudi7 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java index bf382e9f..7c865163 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java @@ -123,4 +123,98 @@ public @interface Hudi8 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java index ed255bc6..6e378b15 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java @@ -123,4 +123,98 @@ public @interface Hudi9 { */ boolean useRecordIndex() default false; + /** + * 是否使用hbase index + * HbaseIndex 相关参数 + * + * hoodie.index.type=HBASE + * hoodie.index.hbase.zkport=2181 + * hoodie.index.hbase.zkquorum=hzpl004138023-hadoop-zh.ztosys.com,hzpl004138041-hadoop-zh.ztosys.com,hzpl004138024-hadoop-zh.ztosys.com,hzpl004138042-hadoop-zh.ztosys.com,hzpl004138051-hadoop-zh.ztosys.com + * hoodie.index.hbase.zknode.path=/hbase + * hoodie.index.hbase.table=hudi:xxxxx + * hoodie.index.hbase.get.batch.size=2000 + * hoodie.index.hbase.put.batch.size=2000 + * hoodie.index.hbase.put.batch.size.autocompute=true + * hoodie.index.hbase.qps.fraction=0.5 + * hoodie.index.hbase.max.qps.per.region.server=50000 + * hoodie.index.hbase.qps.allocator.class=org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator + * hoodie.index.hbase.rollback.sync=true + * hoodie.hbase.index.update.partition.path=true + * + */ + boolean useHbaseIndex() default false; + + /** + * hbase默认连接端口 + * @return + */ + int hbasePort() default 2181; + + /** + * hbase 连接地址 + * @return + */ + String hbaseZkQuorum() default ""; + + /** + * hbasezkNode路径 + * @return + */ + String hbaseZkNodePath() default "/hbase"; + + /** + * hbase 中tableName + * @return + */ + String hbaseTable() default ""; + + /** + * get操作的批次大小 + * @return + */ + long hbaseGetBatchSize() default 1000; + + /** + * put操作的批次大小 + * @return + */ + long hbasePutBatchSize() default 1000; + + /** + * + * @return + */ + boolean hbasePutBatchSizeAutoCompute() default false; + + /** + * Property to set maximum QPS allowed per Region Server + * should be same across various jobs + * @return + */ + long hbaseMaxQpsPerRegionServer() default 50000; + + /** + * set the fraction of the global share of QPS that should be allocated to this job + * @return + */ + float hbaseQpsFraction() default 0.2f; + + /** + * rollback method will delete the last failed task index + * @return + */ + boolean hbaseRollbackSync() default true; + + /** + * delete old record in old paritition insert in new partition + * @return + */ + boolean hbaseUpdatePartitionPath() default true; + + /** + * 默认hbaseQpsAllocatorClass + * @return + */ + String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + } \ No newline at end of file diff --git a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala index 56713be4..f4715c2e 100644 --- a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala +++ b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala @@ -303,6 +303,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._1) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._1) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._1) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._1) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._1) @@ -327,6 +331,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._2) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._2) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._2) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._2) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._2) @@ -351,6 +359,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._3) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._3) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._3) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._3) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._3) @@ -375,6 +387,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._4) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._4) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._4) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._4) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._4) @@ -399,6 +415,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._5) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._5) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._5) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._5) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._5) @@ -423,6 +443,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._6) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._6) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._6) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._6) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._6) @@ -447,6 +471,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._7) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._7) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._7) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._7) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._7) @@ -471,6 +499,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._8) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._8) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._8) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._8) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._8) @@ -495,6 +527,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._9) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._9) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._9) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._9) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._9) @@ -519,6 +555,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._10) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._10) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._10) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._10) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._10) @@ -543,6 +583,10 @@ private[fire] trait AnnoManager extends Logging { this.hudiBloomIndexConf(hudi.bloomIndexParallelism(), hudi.useBloomIndexBucketized(), hudi.bloomkeysPerBucket, KeyNum._11) // 记录级索引相关设置 this.hudiRecordIndexConf(hudi.useRecordIndex(), KeyNum._11) + // hbase索引相关设置 + this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() + , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._11) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._11) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._11) @@ -573,6 +617,7 @@ private[fire] trait AnnoManager extends Logging { /** * 布隆过滤器参数调优 + * * @param parallelism * 并行度 * @param useBloomIndexBucketized @@ -600,6 +645,49 @@ private[fire] trait AnnoManager extends Logging { } } + /** + * 是否开启hbase Index + * + * @param useHbaseIndex + * @param hbaseZkQuorum + * @param hbasePort + * @param hbaseTable + * @param hbaseZkNodePath + * @param hbaseRollbackSync + * @param hbaseUpdatePartitionPath + * @param hbaseGetBatchSize + * @param hbasePutBatchSize + * @param hbasePutBatchSizeAutoCompute + * @param hbaseMaxQpsPerRegionServer + * @param hbaseQpsFraction + * @param hbaseQpsAllocatorClass + * @param keyNum + */ + @Internal + private[this] def hudiHbaseIndexConf(useHbaseIndex: Boolean, hbaseZkQuorum: String, hbasePort: Int + , hbaseTable: String , hbaseZkNodePath: String, hbaseRollbackSync: Boolean + , hbaseUpdatePartitionPath: Boolean, hbaseGetBatchSize: Long + , hbasePutBatchSize: Long, hbasePutBatchSizeAutoCompute: Boolean + , hbaseMaxQpsPerRegionServer: Long, hbaseQpsFraction: Float + , hbaseQpsAllocatorClass: String, keyNum: Int) { + if (useHbaseIndex) { + this.toHudiConf(("hoodie.index.type", "HBASE"), keyNum) + this.toHudiConf(("hoodie.index.hbase.zkport", hbasePort.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.zkquorum", hbaseZkQuorum), keyNum) + this.toHudiConf(("hoodie.index.hbase.zknode.path", hbaseZkNodePath), keyNum) + this.toHudiConf(("hoodie.index.hbase.table", hbaseTable), keyNum) + this.toHudiConf(("hoodie.index.hbase.put.batch.size.autocompute", hbasePutBatchSizeAutoCompute.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.put.batch.size", hbasePutBatchSize.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.get.batch.size", hbaseGetBatchSize.toString), keyNum) + this.toHudiConf(("hoodie.hbase.index.update.partition.path", hbaseUpdatePartitionPath.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.rollback.sync", hbaseRollbackSync.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.qps.fraction", hbaseQpsFraction.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.max.qps.per.region.server", hbaseMaxQpsPerRegionServer.toString), keyNum) + this.toHudiConf(("hoodie.index.hbase.qps.allocator.class", hbaseQpsAllocatorClass), keyNum) + } + } + + /** * hudi clean相关配置 * @@ -788,6 +876,7 @@ private[fire] trait AnnoManager extends Logging { /** * 将@Jdbc中配置的信息映射为键值对形式 + * * @param Jdbc10 */ @Internal @@ -979,6 +1068,7 @@ private[fire] trait AnnoManager extends Logging { /** * 将@Kafka中配置的信息映射为键值对形式 + * * @param Kafka10 */ @Internal @@ -1197,7 +1287,7 @@ object AnnoManager extends Logging { classOf[RocketMQ], classOf[RocketMQ2], classOf[RocketMQ3], classOf[RocketMQ4], classOf[RocketMQ5], classOf[RocketMQ6], classOf[RocketMQ7], classOf[RocketMQ8], classOf[RocketMQ9], classOf[RocketMQ10], classOf[RocketMQ11], classOf[Hudi], classOf[Hudi2], classOf[Hudi3], - classOf[Hudi4], classOf[Hudi5], classOf[Hudi6], classOf[Hudi7], classOf[Hudi8], classOf[Hudi9],classOf[Hudi10], classOf[Hudi11] + classOf[Hudi4], classOf[Hudi5], classOf[Hudi6], classOf[Hudi7], classOf[Hudi8], classOf[Hudi9], classOf[Hudi10], classOf[Hudi11] ) // 用于注册所有的生命周期注解 diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseHudiStreaming.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseHudiStreaming.scala index 0a7b347f..f4ee6c97 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseHudiStreaming.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseHudiStreaming.scala @@ -18,10 +18,6 @@ package com.zto.fire.spark import com.zto.fire._ -import com.zto.fire.common.anno.Scheduled -import com.zto.fire.common.util.DateFormatUtils -import com.zto.fire.spark.sql.SparkSqlUtils -import org.apache.spark.rdd.RDD /** * 通用的Spark Streaming实时入湖父类 @@ -39,6 +35,7 @@ trait BaseHudiStreaming extends SparkStreaming { protected lazy val sink = this.conf.get[Boolean]("hudi.sink", Some(true)) protected lazy val tmpView = this.conf.get[String]("hudi.tmpView", Some("msg_view")) protected lazy val retryOnFailure = this.conf.get[Int]("hudi.retry.onFailure", Some(1)) + protected lazy val retainDays = this.conf.getInt("hudi.table.retain.days", 32) override def before(args: Array[JString]): Unit = { this.conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") @@ -55,15 +52,15 @@ trait BaseHudiStreaming extends SparkStreaming { requireNonEmpty(sqlUpsert(this.tmpView))("解析临时表的SQL语句不能为空,请通过hudi.sql指定该配置") logInfo( s""" - |-----------------hudi参数----------------- - |hudi表名(hudi.tableName) :$tableName - |hudi表主键(hudi.primaryKey)   :$primaryKey - |hudi表预聚合字段(hudi.precombineKey) :$precombineKey - |hudi表分区字段(hudi.partitionFieldName) :$partitionFieldName - |消息视图名称(hudi.tmpView) :$tmpView - |解析kafka的sql(hudi.sql) :\n ${sqlUpsert(tmpView)} - |----------------------------------------- - |""".stripMargin) + |-----------------hudi参数----------------- + |hudi表名(hudi.tableName) :$tableName + |hudi表主键(hudi.primaryKey)   :$primaryKey + |hudi表预聚合字段(hudi.precombineKey) :$precombineKey + |hudi表分区字段(hudi.partitionFieldName) :$partitionFieldName + |消息视图名称(hudi.tmpView) :$tmpView + |解析kafka的sql(hudi.sql) :\n ${sqlUpsert(tmpView)} + |----------------------------------------- + |""".stripMargin) } /** @@ -87,4 +84,9 @@ trait BaseHudiStreaming extends SparkStreaming { * 临时表名 */ protected def sqlAfter(tmpView: String): String = "" + + /** + * TODO 删除历史分区,存储在实时集群的数据是外部表,只会删除分区,并不会删除数据,待解决 + */ + } -- Gitee