diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java index c3060d28f55920e05edfc6a703fe722660b9b931..230db8e7cce49a2de41bc2582a92ddde9fd48561 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java @@ -216,5 +216,9 @@ public @interface Hudi { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java index 0a77355976fe8d7f39feefbb39c8a31fadde222b..07c4647a64c742ebc1f23993f422c40bb05ef9d4 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java @@ -216,5 +216,8 @@ public @interface Hudi10 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java index fe4fa01bd897cef75504159d662bd61cc0082414..29b5faedb23bfe5c71199e73ea6fa3e39616a1e2 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java @@ -216,5 +216,8 @@ public @interface Hudi11 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java index d3b7b7ebfc462389dc1fdfef39b29cdae94784f3..2d66e09f69c13dde8143b735cd3364529dc10fdb 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java @@ -217,5 +217,8 @@ public @interface Hudi2 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java index 765a834c0c597c03552d799dbf2d0a318b8bfd56..051baa87a2a6e29cb5ae01311ab2fa377c57de8c 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java @@ -217,4 +217,8 @@ public @interface Hudi3 { */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java index 80e17ff09497761b7743f3c26c328b0bd2030e85..08a52490cfe06b5de8de8729fa33702a42a2d6be 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java @@ -216,5 +216,8 @@ public @interface Hudi4 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java index 1ab64ce781e69261fa5ceee3c05c17ae258180dc..20b904e82aab30f267f2742ed0904ba9e74818c4 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java @@ -216,5 +216,8 @@ public @interface Hudi5 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java index 0cde5a379f8fc6db8c3225e90e12606d008bf4bd..3bffe00b006cc99c0f19e471402ffde4769a5221 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java @@ -216,5 +216,8 @@ public @interface Hudi6 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java index d5b07629b709bda8f3dc6c85581220ffa387d7f4..f9b83ad909f664831e1354be99f774603d6d45fd 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java @@ -216,5 +216,8 @@ public @interface Hudi7 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java index 1b678f2c9db106fad60bfcf29d819b23a155b211..24e6696c4b0f5443a7ed2a51cc4c575f2fc4ddb6 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java @@ -216,5 +216,8 @@ public @interface Hudi8 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java index c95a9b1c6ebedbb5888c14ac7c3d813f4771e1f5..c09a147517b0517baef42ad68807bb8de9941b69 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java @@ -216,5 +216,8 @@ public @interface Hudi9 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala index 99a920525ce71c4e76cbf09e273fe132e69336c3..d2246340de53dfdcbdcc63aa186bb3352f62e506 100644 --- a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala +++ b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala @@ -306,7 +306,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._1) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._1) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._1) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._1) @@ -334,7 +334,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._2) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._2) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._2) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._2) @@ -362,7 +362,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._3) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._3) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._3) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._3) @@ -390,7 +390,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._4) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._4) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._4) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._4) @@ -418,7 +418,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._5) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._5) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._5) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._5) @@ -446,7 +446,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._6) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._6) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._6) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._6) @@ -474,7 +474,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._7) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._7) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._7) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._7) @@ -502,7 +502,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._8) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._8) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._8) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._8) @@ -530,7 +530,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._9) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._9) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._9) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._9) @@ -558,7 +558,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._10) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._10) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._10) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._10) @@ -586,7 +586,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHBaseIndexConf(hudi.useHBaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._11) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._11) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._11) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._11) @@ -669,8 +669,9 @@ private[fire] trait AnnoManager extends Logging { , hbaseUpdatePartitionPath: Boolean, hbaseGetBatchSize: Long , hbasePutBatchSize: Long, hbasePutBatchSizeAutoCompute: Boolean , hbaseMaxQpsPerRegionServer: Long, hbaseQpsFraction: Float - , hbaseQpsAllocatorClass: String, keyNum: Int) { + , hbaseQpsAllocatorClass: String, hbaseIndexUser:String, keyNum: Int) { if (useHBaseIndex) { + requireNonEmpty(hbaseZkQuorum,hbaseTable){"Hudi Hbase Index需要指定zk集群地址和对应的hbase表名"} this.toHudiConf(("hoodie.index.type", "HBASE"), keyNum) this.toHudiConf(("hoodie.index.hbase.zkport", hbasePort.toString), keyNum) // 获取zk地址 @@ -695,6 +696,7 @@ private[fire] trait AnnoManager extends Logging { this.toHudiConf(("hoodie.index.hbase.qps.fraction", hbaseQpsFraction.toString), keyNum) this.toHudiConf(("hoodie.index.hbase.max.qps.per.region.server", hbaseMaxQpsPerRegionServer.toString), keyNum) this.toHudiConf(("hoodie.index.hbase.qps.allocator.class", hbaseQpsAllocatorClass), keyNum) + this.toHudiConf(("hoodie.index.hbase.user", hbaseIndexUser), keyNum) } }