From e39ac5764bb81078bd5367b29bac6857f5dfb7c8 Mon Sep 17 00:00:00 2001
From: zhanggougou <15651908511@163.com>
Date: Tue, 23 Dec 2025 15:26:34 +0800
Subject: [PATCH 1/3] =?UTF-8?q?[fire-1265]=20=E6=96=B0=E5=A2=9Efire-mq?=
=?UTF-8?q?=E6=A8=A1=E5=9D=97,=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?=
=?UTF-8?q?=E4=B9=89=E5=BA=8F=E5=88=97=E5=8C=96=E5=99=A8,=E6=96=B0?=
=?UTF-8?q?=E5=A2=9Ezto=E5=86=85=E9=83=A8=E5=BA=8F=E5=88=97=E5=8C=96?=
=?UTF-8?q?=E5=99=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
fire-bundle/fire-bundle-flink/pom.xml | 2 +-
fire-bundle/fire-bundle-spark/pom.xml | 2 +-
fire-bundle/pom.xml | 2 +-
fire-common/pom.xml | 2 +-
.../com/zto/fire/common/bean/MQRecord.scala | 68 +++++++++++++++++-
.../com/zto/fire/common/util/MQProducer.scala | 44 ++++++++----
.../base-connectors/fire-hbase/pom.xml | 2 +-
.../base-connectors/fire-jdbc/pom.xml | 2 +-
.../base-connectors/fire-mq/pom.xml | 70 +++++++++++++++++++
.../zto/fire/mq/conf/ZtoSerializeConf.scala | 52 ++++++++++++++
.../deserializer/BaseDeserializeEntity.java | 10 +++
...ZtoKafkaMessageDeserializationSchema.scala | 46 ++++++++++++
...toRocketMessageDeserializationSchema.scala | 46 ++++++++++++
.../zto/fire/mq/utils/DeserializeUtil.java | 52 ++++++++++++++
fire-connectors/base-connectors/pom.xml | 3 +-
.../flink-connectors/flink-clickhouse/pom.xml | 2 +-
.../flink-connectors/flink-format/pom.xml | 2 +-
.../flink-connectors/flink-paimon/pom.xml | 2 +-
.../flink-connectors/flink-rocketmq/pom.xml | 2 +-
fire-connectors/flink-connectors/pom.xml | 2 +-
fire-connectors/pom.xml | 2 +-
fire-connectors/spark-connectors/pom.xml | 2 +-
.../spark-connectors/spark-hbase/pom.xml | 2 +-
.../spark-connectors/spark-hudi/pom.xml | 2 +-
.../spark-connectors/spark-paimon/pom.xml | 2 +-
.../spark-connectors/spark-rocketmq/pom.xml | 2 +-
fire-core/pom.xml | 2 +-
fire-engines/fire-flink/pom.xml | 7 +-
.../com/zto/fire/flink/sink/KafkaSink.scala | 3 +-
.../zto/fire/flink/sink/RocketMQSink.scala | 4 +-
.../fire/flink/ext/stream/DataStreamExt.scala | 66 ++++++++++++++---
.../ext/stream/StreamExecutionEnvExt.scala | 43 +++++++++++-
fire-engines/fire-spark/pom.xml | 2 +-
fire-engines/pom.xml | 2 +-
fire-enhance/apache-arthas/pom.xml | 4 +-
fire-enhance/apache-flink/pom.xml | 4 +-
fire-enhance/apache-spark/pom.xml | 4 +-
fire-enhance/pom.xml | 2 +-
fire-examples/flink-examples/pom.xml | 2 +-
.../flink/serialize/KafkaSinkTest.scala | 54 ++++++++++++++
.../flink/serialize/KafkaSourceTest.scala | 36 ++++++++++
.../examples/flink/serialize/OrderInfo.java | 65 +++++++++++++++++
.../flink/serialize/RocketSinkTest.scala | 50 +++++++++++++
.../flink/serialize/RocketSourceTest.scala | 34 +++++++++
fire-examples/pom.xml | 4 +-
fire-examples/spark-examples/pom.xml | 2 +-
fire-shell/flink-shell/pom.xml | 2 +-
fire-shell/pom.xml | 2 +-
fire-shell/spark-shell/pom.xml | 2 +-
pom.xml | 4 +-
50 files changed, 760 insertions(+), 65 deletions(-)
create mode 100644 fire-connectors/base-connectors/fire-mq/pom.xml
create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala
create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java
create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala
create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala
create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala
diff --git a/fire-bundle/fire-bundle-flink/pom.xml b/fire-bundle/fire-bundle-flink/pom.xml
index cbae5041..c2797473 100644
--- a/fire-bundle/fire-bundle-flink/pom.xml
+++ b/fire-bundle/fire-bundle-flink/pom.xml
@@ -8,7 +8,7 @@
com.zto.fire
fire-bundle
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-bundle/fire-bundle-spark/pom.xml b/fire-bundle/fire-bundle-spark/pom.xml
index 91ed93e9..b5f57577 100644
--- a/fire-bundle/fire-bundle-spark/pom.xml
+++ b/fire-bundle/fire-bundle-spark/pom.xml
@@ -8,7 +8,7 @@
com.zto.fire
fire-bundle
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-bundle/pom.xml b/fire-bundle/pom.xml
index e8fba457..c8545b8f 100644
--- a/fire-bundle/pom.xml
+++ b/fire-bundle/pom.xml
@@ -15,7 +15,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-common/pom.xml b/fire-common/pom.xml
index e8adbe9f..cc5c25d4 100644
--- a/fire-common/pom.xml
+++ b/fire-common/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala
index 3415bfd9..5a9e6534 100644
--- a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala
+++ b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala
@@ -19,6 +19,7 @@ package com.zto.fire.common.bean
import com.zto.fire.predef._
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
@@ -34,6 +35,33 @@ class MQRecord(var topic: String, var msg: String) {
var tag: String = null
var flag: Int = 0
var waitStoreMsgOK: Boolean = true
+ var kafkaHeaders: RecordHeaders = null
+ // mq自定义头部信息,message的property属性
+ var mqHeaders: Map[String, String] = Map()
+ //特殊场景需要value发送byte
+ var msgBytes: Array[Byte] = null
+
+ def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean, mqHeaders: Map[String, String], msgBytes: Array[Byte]) = {
+ this(topic, msg)
+ this.partition = partition
+ this.key = key
+ this.tag = tag
+ this.flag = flag
+ this.waitStoreMsgOK = waitStoreMsgOK
+ this.mqHeaders = mqHeaders
+ this.msgBytes = msgBytes
+ }
+
+ def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean,kafkaHeaders:RecordHeaders,msgBytes:Array[Byte]) = {
+ this(topic, msg)
+ this.partition = partition
+ this.key = key
+ this.tag = tag
+ this.flag = flag
+ this.waitStoreMsgOK = waitStoreMsgOK
+ this.kafkaHeaders = kafkaHeaders
+ this.msgBytes = msgBytes
+ }
def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean) = {
this(topic, msg)
@@ -79,12 +107,44 @@ class MQRecord(var topic: String, var msg: String) {
new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET), waitStoreMsgOK)
}
+ /**
+ * 转为RocketMQ 原生bbyte消息体
+ */
+ def toRocketMQBytes: Message = {
+ new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msgBytes, waitStoreMsgOK)
+ }
+
+ def toRocketMQByFlag(sendBytes: Boolean): Message = {
+ if(sendBytes){
+ this.toRocketMQBytes
+ } else{
+ this.toRocketMQ
+ }
+ }
+
+
/**
* 转为Kafka消息体
*/
def toKafka: ProducerRecord[String, String] = {
- new ProducerRecord[String, String](topic, partition, key, msg)
+ new ProducerRecord[String, String](topic, partition, key, msg, kafkaHeaders)
+ }
+
+ /**
+ * 转为Kafka byte消息体
+ */
+ def toKafkaBytes: ProducerRecord[String, Array[Byte]] = {
+ new ProducerRecord[String, Array[Byte]](topic, partition, key, msgBytes, kafkaHeaders)
+ }
+
+ def toKafkaByFlag(sendBytes: Boolean): ProducerRecord[_, _] = {
+ if (sendBytes) {
+ this.toKafkaBytes
+ } else {
+ this.toKafka
+ }
}
+
}
object MQRecord {
@@ -94,6 +154,12 @@ object MQRecord {
def apply(msg: String, partition: JInt, key: String, tags: JString, flag: Int): MQRecord = new MQRecord(null, msg, partition, key, tags, flag, true)
+ def apply(key: String,tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,headers,msgBytes)
+
+ def apply(key: String,tags: JString, mqHeaders: Map[String, String],msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,mqHeaders,msgBytes)
+
+ def apply(msg: String, partition: JInt, key: String, tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true,headers,msgBytes)
+
def apply(msg: String, partition: JInt, key: String, tags: JString): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true)
def apply(msg: String, partition: JInt, key: String): MQRecord = new MQRecord(null, msg, partition, key, null, 0, true)
diff --git a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
index 8f8f6fd5..92103114 100644
--- a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
+++ b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
@@ -24,8 +24,8 @@ import com.zto.fire.common.enu.{Datasource, Operation}
import com.zto.fire.common.lineage.parser.connector.KafkaConnectorParser
import com.zto.fire.common.util.MQType.MQType
import com.zto.fire.predef._
-import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, RecordMetadata}
-import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.apache.rocketmq.client.producer.{DefaultMQProducer, SendCallback, SendResult}
import java.util.Properties
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @since 2.3.1
*/
class MQProducer(url: String, mqType: MQType = MQType.kafka,
- otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) extends Logging {
+ otherConf: Map[String, String] = Map.empty, throwable: Boolean = true,sendBytes:Boolean = false) extends Logging {
private lazy val maxRetries = FireFrameworkConf.exceptionTraceSendMQMaxRetries
private lazy val sendTimeout = FireFrameworkConf.exceptionSendTimeout
private var sendErrorCount = 0
@@ -71,18 +71,23 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, FireKafkaConf.kafkaBrokers(this.url))
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+ if(sendBytes){
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
+ } else{
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+ }
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.sendTimeout.toString)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.RETRIES_CONFIG, "3")
this.otherConf.foreach(prop => props.put(prop._1, prop._2))
- val producer = new KafkaProducer[String, String](props)
+ val producer = new KafkaProducer[Any, Any](props)
this.useKafka = true
producer
}
+
// rocketmq producer
private lazy val rocketmqProducer = {
val producer = new DefaultMQProducer("fire")
@@ -116,7 +121,11 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
* 消息体
*/
def sendKafkaRecord(record: MQRecord): Unit = {
- requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查")
+ if(sendBytes){
+ requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查")
+ } else{
+ requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查")
+ }
if (this.sendErrorCount >= this.maxRetries) {
this.kafkaProducer.close()
@@ -124,9 +133,11 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
return
}
- val kafkaRecord = record.toKafka
+ //val kafkaRecord = record.toKafka
+ val kafkaRecord = record.toKafkaByFlag(sendBytes)
this.addLineage(record.topic)
- kafkaProducer.send(kafkaRecord, new Callback() {
+
+ kafkaProducer.send(kafkaRecord.asInstanceOf[ProducerRecord[Any, Any]], new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
sendErrorCount += 1
@@ -137,6 +148,7 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
})
}
+
/**
* 发送消息到kafka
*
@@ -158,14 +170,19 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
* 发送超时时间
*/
def sendRocketMQRecord(record: MQRecord, timeout: Long = 10000): Unit = {
- requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查")
+ if (sendBytes) {
+ requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查")
+ } else {
+ requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查")
+ }
if (this.sendErrorCount >= this.maxRetries) {
this.rocketmqProducer.shutdown()
return
}
- val rocketRecord = record.toRocketMQ
+ val rocketRecord = record.toRocketMQByFlag(sendBytes)
+ record.mqHeaders.foreach(header => rocketRecord.putUserProperty(header._1, header._2))
this.addLineage(record.topic)
this.rocketmqProducer.send(rocketRecord, new SendCallback {
override def onSuccess(sendResult: SendResult): Unit = {
@@ -252,7 +269,8 @@ object MQProducer {
}
def apply(url: String, mqType: MQType = MQType.kafka,
- otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) = new MQProducer(url, mqType, otherConf, throwable)
+ otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false) = new MQProducer(url, mqType, otherConf, throwable,sendByte)
+
/**
* 发送消息到指定的mq topic
@@ -276,8 +294,8 @@ object MQProducer {
* 优化参数
*/
def sendRecord(url: String, record: MQRecord,
- mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true): Unit = {
- val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable))
+ mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false): Unit = {
+ val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable,sendByte))
producer.send(record)
}
diff --git a/fire-connectors/base-connectors/fire-hbase/pom.xml b/fire-connectors/base-connectors/fire-hbase/pom.xml
index 58734d82..27ca451f 100644
--- a/fire-connectors/base-connectors/fire-hbase/pom.xml
+++ b/fire-connectors/base-connectors/fire-hbase/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-connectors-common
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/base-connectors/fire-jdbc/pom.xml b/fire-connectors/base-connectors/fire-jdbc/pom.xml
index a834e58a..07677c89 100644
--- a/fire-connectors/base-connectors/fire-jdbc/pom.xml
+++ b/fire-connectors/base-connectors/fire-jdbc/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-connectors-common
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/base-connectors/fire-mq/pom.xml b/fire-connectors/base-connectors/fire-mq/pom.xml
new file mode 100644
index 00000000..aa738473
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/pom.xml
@@ -0,0 +1,70 @@
+
+
+
+
+ 4.0.0
+ fire-connector-mq_${scala.binary.version}
+ jar
+ Fire : Connectors : Common : mq
+
+
+ com.zto.fire
+ fire-connectors-common
+ 2.7.0-SNAPSHOT
+ ../pom.xml
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka${contains.scala.binary.version}
+ ${flink.kafka.connector.version}
+ provided
+
+
+ com.zto.fire
+ fire-connector-flink-rocketmq_${flink.reference}
+ ${fire.version}
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 8
+ 8
+
+
+
+
+
+ src/main/resources
+ true
+
+
+
+
diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala
new file mode 100644
index 00000000..1b9a9d56
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.zto.fire.flink.conf
+
+
+/**
+ * zto序列化常量
+ *
+ * @author zhanggougou
+ * @since 1.1.0
+ * @create 2025-12-02 14:55
+ */
+private[fire] object ZtoSerializeConf {
+
+ lazy val SERIALIZER_CLASS_NAME = "com.zto.bigdata.integeration.util.ZtoSerializeUtil"
+
+ lazy val SERIALIZER_CLASS_INIT_METHOD_NAME = "initSerializerManager"
+
+ lazy val SERIALIZER_CLASS_DES_METHOD_NAME = "deserializeValue"
+
+ lazy val SERIALIZER_HEAD_KEY = "serializer_type"
+
+
+}
+
+
+/**
+ * 序列化类型枚举
+ */
+object SerializeType extends Enumeration {
+ type SerializeType = Value
+
+ val FURY = Value("FURY")
+ val JSON = Value("JSON")
+ // 自动识别
+ val JAVA = Value("JAVA")
+}
diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java
new file mode 100644
index 00000000..adf7dbf6
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java
@@ -0,0 +1,10 @@
+package com.zto.fire.mq.deserializer;
+
+import java.io.Serializable;
+
+/**
+ * 基础反序列化实体类
+ */
+public class BaseDeserializeEntity implements Serializable {
+
+}
diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala
new file mode 100644
index 00000000..998717f0
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala
@@ -0,0 +1,46 @@
+package com.zto.fire.mq.deserializer
+
+import com.zto.bigdata.integeration.util.ZtoSerializeUtil
+import com.zto.fire.common.util.ReflectionUtils
+import com.zto.fire.flink.conf.ZtoSerializeConf
+import com.zto.fire.mq.utils.DeserializeUtil
+import org.apache.flink.api.common.serialization.DeserializationSchema
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import java.nio.charset.StandardCharsets
+import scala.reflect.ClassTag
+import com.zto.fire.predef._
+
+/**
+ * 自定义反序列化器,支持反序列化kafka消息中的key与value
+ */
+class ZtoKafkaMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends KafkaDeserializationSchema[T] {
+
+ override def isEndOfStream(t: T): Boolean = false
+
+ override def open(context: DeserializationSchema.InitializationContext): Unit = {
+ DeserializeUtil.initZtoSerializer(topic)
+ }
+
+ override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
+ try{
+ val headerVal = record.headers().lastHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY);
+ if (headerVal == null) {
+ throw new RuntimeException("Kafka 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误")
+ }
+ val headType = new String(headerVal.value(), StandardCharsets.UTF_8)
+ DeserializeUtil.deserializeAndFill(getGeneric[T](),headType,record.value())
+ } catch {
+ case e: Throwable => {
+ throw new RuntimeException("Kafka 反序列化失败", e)
+ }
+ }
+
+ }
+
+ override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]())
+}
+
+
diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala
new file mode 100644
index 00000000..6c1fab45
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala
@@ -0,0 +1,46 @@
+package com.zto.fire.mq.deserializer
+
+import com.zto.fire.flink.conf.ZtoSerializeConf
+import com.zto.fire.mq.utils.DeserializeUtil
+import com.zto.fire.predef._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.rocketmq.common.message.MessageExt
+import org.apache.rocketmq.flink.common.serialization.TagKeyValueDeserializationSchema
+
+import scala.reflect.ClassTag
+
+/**
+ * 自定义反序列化器,支持反序列化kafka消息中的key与value
+ */
+class ZtoRocketMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends TagKeyValueDeserializationSchema[T] {
+
+ @volatile private var initialized = false
+
+ private def initOnce(): Unit = synchronized {
+ if (!initialized) {
+ DeserializeUtil.initZtoSerializer(topic)
+ initialized = true
+ }
+ }
+
+ override def deserializeTagKeyAndValue(msg: MessageExt): T = {
+ try {
+ this.initOnce()
+ val headerVal = msg.getProperty(ZtoSerializeConf.SERIALIZER_HEAD_KEY)
+ if (headerVal == null) {
+ throw new RuntimeException("rocket 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误")
+ }
+ DeserializeUtil.deserializeAndFill(getGeneric[T](),headerVal,msg.getBody)
+ } catch {
+ case e: Throwable => {
+ throw new RuntimeException("rocket 反序列化失败", e)
+ }
+ }
+ }
+
+ override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]())
+
+}
+
+
+
diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java
new file mode 100644
index 00000000..f6575779
--- /dev/null
+++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java
@@ -0,0 +1,52 @@
+package com.zto.fire.mq.utils;
+
+import com.zto.bigdata.integeration.util.ZtoSerializeUtil;
+import com.zto.fire.common.util.ReflectionUtils;
+import com.zto.fire.flink.conf.ZtoSerializeConf$;
+import com.zto.zdata.schema.format.InputData;
+import javafx.print.Collation;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+
+/**
+ * 序列化工具类
+ */
+public class DeserializeUtil {
+ /**
+ * 初始化序列化器(根据 topic)
+ * @param topic 主题名称
+ */
+ public static void initZtoSerializer(String topic) throws Throwable {
+ Class deserializationClass = ReflectionUtils.forName(ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_NAME());
+ if (deserializationClass != null) {
+ ReflectionUtils.getMethodByName(deserializationClass, ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_INIT_METHOD_NAME()).invoke(null, topic, null);
+ }
+ }
+
+ /**
+ * 统一反序列化 + 反射赋值
+ *
+ * @param 目标对象类型
+ * @param clazz 目标 Class
+ * @param headType 序列化类型(header 中的值)
+ * @param value 原始 byte[]
+ * @return T
+ */
+ public static T deserializeAndFill(Class clazz, String headType, byte[] value) {
+ try {
+ InputData inputData = ZtoSerializeUtil.serializerManager.deserialize(headType, value);
+ T target = clazz.getDeclaredConstructor().newInstance();
+ // 3. 反射字段赋值(包含 private)
+ Collection fields = ReflectionUtils.getAllFields(clazz).values();
+ for(Field field : fields){
+ field.setAccessible(true);
+ Object fieldValue = inputData.getObject(field.getName());
+ field.set(target, fieldValue);
+ }
+ return target;
+ } catch (Exception e) {
+ throw new RuntimeException("反序列化失败,class=" + clazz.getName(), e);
+ }
+ }
+}
diff --git a/fire-connectors/base-connectors/pom.xml b/fire-connectors/base-connectors/pom.xml
index a7744a25..650588e8 100644
--- a/fire-connectors/base-connectors/pom.xml
+++ b/fire-connectors/base-connectors/pom.xml
@@ -26,13 +26,14 @@
com.zto.fire
fire-connectors
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
fire-jdbc
fire-hbase
+ fire-mq
diff --git a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml
index 3095eb3c..c8012564 100644
--- a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml
+++ b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml
@@ -26,7 +26,7 @@
fire-flink-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
diff --git a/fire-connectors/flink-connectors/flink-format/pom.xml b/fire-connectors/flink-connectors/flink-format/pom.xml
index e54b1228..0b9d64a3 100644
--- a/fire-connectors/flink-connectors/flink-format/pom.xml
+++ b/fire-connectors/flink-connectors/flink-format/pom.xml
@@ -8,7 +8,7 @@
fire-flink-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/flink-connectors/flink-paimon/pom.xml b/fire-connectors/flink-connectors/flink-paimon/pom.xml
index 7e91a1c7..7db9cf9d 100644
--- a/fire-connectors/flink-connectors/flink-paimon/pom.xml
+++ b/fire-connectors/flink-connectors/flink-paimon/pom.xml
@@ -10,7 +10,7 @@
fire-flink-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml
index 2026444e..659ea15a 100644
--- a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml
+++ b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml
@@ -25,7 +25,7 @@
com.zto.fire
fire-flink-connectors
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/flink-connectors/pom.xml b/fire-connectors/flink-connectors/pom.xml
index b41ac446..a5985c68 100644
--- a/fire-connectors/flink-connectors/pom.xml
+++ b/fire-connectors/flink-connectors/pom.xml
@@ -32,7 +32,7 @@
fire-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/pom.xml b/fire-connectors/pom.xml
index 3a736f15..ecd6abcf 100644
--- a/fire-connectors/pom.xml
+++ b/fire-connectors/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/spark-connectors/pom.xml b/fire-connectors/spark-connectors/pom.xml
index aee51d44..cb86dbe5 100644
--- a/fire-connectors/spark-connectors/pom.xml
+++ b/fire-connectors/spark-connectors/pom.xml
@@ -32,7 +32,7 @@
fire-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/spark-connectors/spark-hbase/pom.xml b/fire-connectors/spark-connectors/spark-hbase/pom.xml
index e45d7a3a..fca1a7ba 100644
--- a/fire-connectors/spark-connectors/spark-hbase/pom.xml
+++ b/fire-connectors/spark-connectors/spark-hbase/pom.xml
@@ -26,7 +26,7 @@
fire-spark-connectors
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
diff --git a/fire-connectors/spark-connectors/spark-hudi/pom.xml b/fire-connectors/spark-connectors/spark-hudi/pom.xml
index 68a33ec8..20e2174f 100644
--- a/fire-connectors/spark-connectors/spark-hudi/pom.xml
+++ b/fire-connectors/spark-connectors/spark-hudi/pom.xml
@@ -27,7 +27,7 @@
com.zto.fire
fire-spark-connectors
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/spark-connectors/spark-paimon/pom.xml b/fire-connectors/spark-connectors/spark-paimon/pom.xml
index 6859bccf..e66e448a 100644
--- a/fire-connectors/spark-connectors/spark-paimon/pom.xml
+++ b/fire-connectors/spark-connectors/spark-paimon/pom.xml
@@ -8,7 +8,7 @@
com.zto.fire
fire-spark-connectors
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml
index ebca0cf1..545e9b1d 100644
--- a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml
+++ b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-spark-connectors
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-core/pom.xml b/fire-core/pom.xml
index b63e696c..a38fa45e 100644
--- a/fire-core/pom.xml
+++ b/fire-core/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-engines/fire-flink/pom.xml b/fire-engines/fire-flink/pom.xml
index bab3900b..20aca242 100644
--- a/fire-engines/fire-flink/pom.xml
+++ b/fire-engines/fire-flink/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-engines
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
@@ -43,6 +43,11 @@
${fire.version}
${maven.scope}
+
+ com.zto.fire
+ fire-connector-mq_${scala.binary.version}
+ ${fire.version}
+
com.sparkjava
diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala
index cefd4bbf..6e4938e5 100644
--- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala
+++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala
@@ -36,6 +36,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec
url: String, topic: String,
batch: Int = 100,
flushInterval: Long = 5000,
+ sendByte: Boolean = false,
keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) {
private lazy val (finalBrokers, finalTopic, finalConf) = KafkaUtils.getConfByKeyNum(params, url, topic, keyNum)
@@ -47,7 +48,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec
override def sink(dataList: List[T]): Unit = {
dataList.foreach(record => {
if (isEmpty(record.topic)) record.topic = finalTopic
- MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf)
+ MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf,sendByte=sendByte)
})
}
}
diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala
index c13705e7..fde43cec 100644
--- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala
+++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala
@@ -33,7 +33,7 @@ import scala.reflect.ClassTag
abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Object],
url: String, topic: String,
tag: String = "*", batch: Int = 100,
- flushInterval: Long = 1000, keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) {
+ flushInterval: Long = 1000, sendByte: Boolean = false,keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) {
private lazy val (finalBrokers, finalTopic, finalTag, finalConf) = RocketMQUtils.getConfByKeyNum(url, topic, tag, params, keyNum)
@@ -46,7 +46,7 @@ abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Ob
dataList.foreach(record => {
if (isEmpty(record.topic)) record.topic = finalTopic
if (isEmpty(record.tag) && noEmpty(finalTag)) record.tag = finalTag
- MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf)
+ MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf,sendByte=sendByte)
})
}
}
diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala
index 1a8f8ae1..4b9e63da 100644
--- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala
+++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala
@@ -303,8 +303,9 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
tag: String = "*",
mqType: MQType = MQType.kafka,
batch: Int = 100, flushInterval: Long = 1000,
+ sendByte: Boolean = false,
keyNum: Int = KeyNum._1): DataStreamSink[_] = {
- this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, keyNum)(_.asInstanceOf[E])
+ this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E])
}
/**
@@ -327,11 +328,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
tag: String = "*",
mqType: MQType = MQType.kafka,
batch: Int = 100, flushInterval: Long = 1000,
+ sendByte: Boolean = false,
keyNum: Int = KeyNum._1)(mapFunction: T => E): DataStreamSink[_] = {
if (mqType == MQType.rocketmq) {
- this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(mapFunction)
+ this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte, keyNum)(mapFunction)
} else {
- this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(mapFunction)
+ this.sinkKafkaFun[E](params, url, topic, batch, flushInterval,sendByte, keyNum)(mapFunction)
}
}
@@ -344,15 +346,41 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
* 消息队列的url
* @param topic
* 发送消息到指定的主题
+ * * @param sendByte
+ * 是否发送byte值的消息,默认false发送string
* @param keyNum
* 指定配置的keyNum,可从配置或注解中获取对应配置信息
*/
def sinkKafka[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
url: String = null, topic: String = null,
batch: Int = 1000, flushInterval: Long = 5000,
+ sendByte: Boolean = false,
keyNum: Int = KeyNum._1): DataStreamSink[_] = {
- this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(_.asInstanceOf[E])
+ this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E])
+ }
+
+ /**
+ * 将数据实时sink到指定的kafka topic
+ *
+ * @param params
+ * 额外的producer参数
+ * @param url
+ * 消息队列的url
+ * @param topic
+ * 发送消息到指定的主题
+ * * @param sendByte
+ * 是否发送byte值的消息,默认false发送string
+ * @param keyNum
+ * 指定配置的keyNum,可从配置或注解中获取对应配置信息
+ */
+ def sinkKafkaByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
+ url: String = null, topic: String = null,
+ batch: Int = 1000, flushInterval: Long = 5000,
+ sendByte: Boolean = true,
+ keyNum: Int = KeyNum._1): DataStreamSink[_] = {
+
+ this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E])
}
/**
@@ -370,11 +398,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
def sinkKafkaFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
url: String = null, topic: String = null,
batch: Int = 1000, flushInterval: Long = 5000,
+ sendByte: Boolean = false,
keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = {
val finalBatch = if (FireKafkaConf.kafkaSinkBatch(keyNum) > 0) FireKafkaConf.kafkaSinkBatch(keyNum) else batch
val finalInterval = if (FireKafkaConf.kafkaFlushInterval(keyNum) > 0) FireKafkaConf.kafkaFlushInterval(keyNum) else flushInterval
- this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, keyNum) {
+ this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, sendByte,keyNum) {
override def map(value: T): E = fun(value)
})
}
@@ -394,8 +423,29 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
def sinkRocketMQ[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
url: String = null, topic: String = null, tag: String = "*",
batch: Int = 1000, flushInterval: Long = 5000,
+ sendByte: Boolean = false,
+ keyNum: Int = KeyNum._1): DataStreamSink[_] = {
+ this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte,keyNum)(_.asInstanceOf[E])
+ }
+
+ /**
+ * 将数据实时sink到指定的rocketmq topic,发送byte类型的数据
+ *
+ * @param params
+ * 额外的producer参数
+ * @param url
+ * 消息队列的url
+ * @param topic
+ * 发送消息到指定的主题
+ * @param keyNum
+ * 指定配置的keyNum,可从配置或注解中获取对应配置信息
+ */
+ def sinkRocketMQByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
+ url: String = null, topic: String = null, tag: String = "*",
+ batch: Int = 1000, flushInterval: Long = 5000,
+ sendByte: Boolean = true,
keyNum: Int = KeyNum._1): DataStreamSink[_] = {
- this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(_.asInstanceOf[E])
+ this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E])
}
/**
@@ -412,12 +462,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st
*/
def sinkRocketMQFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null,
url: String = null, topic: String = null, tag: String = "*",
- batch: Int = 1000, flushInterval: Long = 5000,
+ batch: Int = 1000, flushInterval: Long = 5000, sendByte: Boolean = false,
keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = {
val finalBatch = if (FireRocketMQConf.rocketSinkBatch(keyNum) > 0) FireRocketMQConf.rocketSinkBatch(keyNum) else batch
val finalInterval = if (FireRocketMQConf.rocketSinkFlushInterval(keyNum) > 0) FireRocketMQConf.rocketSinkFlushInterval(keyNum) else flushInterval
- this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval, keyNum) {
+ this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval,sendByte, keyNum) {
override def map(value: T): E = fun(value)
})
}
diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala
index 19d1812f..d304f017 100644
--- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala
+++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala
@@ -32,6 +32,7 @@ import com.zto.fire.flink.ext.provider.{HBaseConnectorProvider, JdbcFlinkProvide
import com.zto.fire.flink.sql.FlinkSqlExtensionsParser
import com.zto.fire.flink.util.{FlinkRocketMQUtils, FlinkSingletonFactory, FlinkUtils, TableUtils}
import com.zto.fire.jdbc.JdbcConnectorBridge
+import com.zto.fire.mq.deserializer.{BaseDeserializeEntity, ZtoKafkaMessageDeserializationSchema, ZtoRocketMessageDeserializationSchema}
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
@@ -154,9 +155,7 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu
runtimeContext: RuntimeContext = null,
deserializer: Any = new SimpleStringSchema,
keyNum: Int = KeyNum._1): DataStream[T] = {
-
val kafkaConsumer = this.createKafkaConsumer[T](kafkaParams, topics, deserializer, keyNum)
-
if (runtimeContext != null) kafkaConsumer.setRuntimeContext(runtimeContext)
if (specificStartupOffsets != null) kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets)
// 设置从指定时间戳位置开始消费kafka
@@ -173,6 +172,24 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu
this.addSourceWrap(kafkaConsumer)
}
+ /**
+ * 创建DStream流
+ *
+ * @param kafkaParams
+ * kafka相关的配置参数
+ * @return
+ * DStream
+ */
+ def createDirectStreamWithZtoDeserialize[T <: BaseDeserializeEntity:ClassTag](kafkaParams: Map[String, Object] = null,
+ topics: Set[String] = null,
+ specificStartupOffsets: Map[KafkaTopicPartition, java.lang.Long] = null,
+ runtimeContext: RuntimeContext = null,
+ keyNum: Int = KeyNum._1)(implicit typeInfo: TypeInformation[T]): DataStream[T] = {
+ val confTopics = FireKafkaConf.kafkaTopics(keyNum)
+ val ztoDeserialize = new ZtoKafkaMessageDeserializationSchema[T](confTopics);
+ this.createDirectStreamBySchema[T](kafkaParams, topics, specificStartupOffsets, runtimeContext, ztoDeserialize, keyNum = keyNum)
+ }
+
/**
* 创建DStream流
*
@@ -261,6 +278,28 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu
this.addSourceWrap(new RocketMQSourceWithTag[(String, String, String)](new SimpleTagKeyValueDeserializationSchema, props)).name("RocketMQ Source")
}
+ /**
+ * 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value
+ *
+ * @param rocketParam
+ * rocketMQ相关消费参数
+ * @param groupId
+ * groupId
+ * @param topics
+ * topic列表
+ * @return
+ * rocketMQ DStream
+ */
+ def createRocketMqPullStreamWithEntity[T <: BaseDeserializeEntity:ClassTag:TypeInformation](rocketParam: Map[String, String] = null,
+ groupId: String = null,
+ topics: String = null,
+ tag: String = null,
+ keyNum: Int = KeyNum._1): DataStream[T] = {
+ val props = buildRocketMQProps(rocketParam, groupId, topics, tag, keyNum)
+ val confTopics = FireRocketMQConf.rocketTopics(keyNum)
+ this.addSourceWrap(new RocketMQSourceWithTag[T](new ZtoRocketMessageDeserializationSchema[T](confTopics), props)).name("RocketMQ Entity Source")
+ }
+
/**
* 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value等相关元数据信息
*
diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml
index de5945a5..5436ae0b 100644
--- a/fire-engines/fire-spark/pom.xml
+++ b/fire-engines/fire-spark/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-engines
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-engines/pom.xml b/fire-engines/pom.xml
index 8a81a48c..c6923f58 100644
--- a/fire-engines/pom.xml
+++ b/fire-engines/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-enhance/apache-arthas/pom.xml b/fire-enhance/apache-arthas/pom.xml
index b1a103c5..b60f5ca3 100644
--- a/fire-enhance/apache-arthas/pom.xml
+++ b/fire-enhance/apache-arthas/pom.xml
@@ -19,14 +19,14 @@
4.0.0
fire-enhance-arthas_${scala.binary.version}
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
jar
Fire : Enhance : Arthas
com.zto.fire
fire-enhance
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-enhance/apache-flink/pom.xml b/fire-enhance/apache-flink/pom.xml
index 0b3a56cb..229bed08 100644
--- a/fire-enhance/apache-flink/pom.xml
+++ b/fire-enhance/apache-flink/pom.xml
@@ -19,14 +19,14 @@
4.0.0
fire-enhance-flink_${flink.reference}
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
jar
Fire : Enhance : Flink
com.zto.fire
fire-enhance
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-enhance/apache-spark/pom.xml b/fire-enhance/apache-spark/pom.xml
index 72483511..f2ee634f 100644
--- a/fire-enhance/apache-spark/pom.xml
+++ b/fire-enhance/apache-spark/pom.xml
@@ -19,14 +19,14 @@
4.0.0
fire-enhance-spark_${spark.reference}
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
jar
Fire : Enhance : Spark
com.zto.fire
fire-enhance
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-enhance/pom.xml b/fire-enhance/pom.xml
index 2ad08a69..5edd1a81 100644
--- a/fire-enhance/pom.xml
+++ b/fire-enhance/pom.xml
@@ -25,7 +25,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-examples/flink-examples/pom.xml b/fire-examples/flink-examples/pom.xml
index 87fc6343..3236dbcb 100644
--- a/fire-examples/flink-examples/pom.xml
+++ b/fire-examples/flink-examples/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-examples
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala
new file mode 100644
index 00000000..6f067674
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala
@@ -0,0 +1,54 @@
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.bigdata.integeration.util.ZtoSerializeUtil
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.common.bean.MQRecord
+import com.zto.fire.common.conf.FireKafkaConf
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf}
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+
+import java.util.Date
+
+/**
+ * 使用自定义反序列化器发送kafka
+ * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ |kafka.topics=fire_test_kafka_serialize
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object KafkaSinkTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] {
+
+ override def open(parameters: Configuration): Unit = {
+ ZtoSerializeUtil.initSerializerManager(FireKafkaConf.kafkaTopics(1) ,null)
+ }
+
+ override def map(x: Int): MQRecord = {
+ val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance)
+ outputData.setObject("id", x)
+ outputData.setObject("name", x + "我")
+ outputData.setObject("createTime", new Date())
+ outputData.setObject("money", 1.6 + x)
+ val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData)
+ val headers = new RecordHeaders()
+ headers.add(new RecordHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY, SerializeType.FURY.toString.getBytes()))
+ MQRecord(null, null, headers,bytes)
+ }
+ }).sinkKafkaByte().name("testSink")
+ }
+}
+
+
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala
new file mode 100644
index 00000000..1d8e7cfa
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala
@@ -0,0 +1,36 @@
+
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import org.apache.flink.api.scala.createTypeInformation
+
+
+/**
+ * 使用自定义反序列化器消费kafka
+ * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ |kafka.topics=fire_test_kafka_serialize
+ |kafka.group.id=fire_test_kafka_serialize_consumer
+ |kafka.starting.offsets=earliest
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object KafkaSourceTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createDirectStreamWithZtoDeserialize[OrderInfo]().map(order => {
+ println("开始执行")
+ println(order.toString)
+ println("结束执行")
+ }).print()
+ }
+}
+
+
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java
new file mode 100644
index 00000000..9d0f5e13
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java
@@ -0,0 +1,65 @@
+package com.zto.fire.examples.flink.serialize;
+
+import com.zto.fire.mq.deserializer.BaseDeserializeEntity;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class OrderInfo extends BaseDeserializeEntity {
+ private Integer id;
+ private String name;
+ private Date createTime;
+ private Double money;
+
+ public OrderInfo() {
+ }
+
+ public OrderInfo(Integer id, String name, Date createTime, Double money) {
+ this.id = id;
+ this.name = name;
+ this.createTime = createTime;
+ this.money = money;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Double getMoney() {
+ return money;
+ }
+
+ public void setMoney(Double money) {
+ this.money = money;
+ }
+
+ @Override
+ public String toString() {
+ return "OrderInfo{" +
+ "id=" + id +
+ ", name='" + name + '\'' +
+ ", createTime=" + createTime +
+ ", money=" + money +
+ '}';
+ }
+}
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala
new file mode 100644
index 00000000..a9254f00
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala
@@ -0,0 +1,50 @@
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.bigdata.integeration.util.ZtoSerializeUtil
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.common.bean.MQRecord
+import com.zto.fire.common.conf.{FireKafkaConf, FireRocketMQConf}
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf}
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+
+import java.util.Date
+
+/**
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876
+ |flink.rocket.topics=fire_test_rocket_serialize
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object RocketSinkTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] {
+
+ override def open(parameters: Configuration): Unit = {
+ ZtoSerializeUtil.initSerializerManager(FireRocketMQConf.rocketTopics(1) ,null)
+ }
+
+ override def map(x: Int): MQRecord = {
+ val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance)
+ outputData.setObject("id", x)
+ outputData.setObject("name", x + "我是rocket")
+ outputData.setObject("createTime", new Date())
+ outputData.setObject("money", 1.6 + x)
+ val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData)
+ val headMap = Map((ZtoSerializeConf.SERIALIZER_HEAD_KEY,SerializeType.FURY.toString))
+ MQRecord(null, null, headMap,bytes)
+ }
+ }).sinkRocketMQByte().name("testSink")
+ }
+}
+
+
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala
new file mode 100644
index 00000000..1fcda3ce
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala
@@ -0,0 +1,34 @@
+
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import org.apache.flink.api.scala.createTypeInformation
+
+
+/**
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876
+ |flink.rocket.topics=fire_test_rocket_serialize
+ |flink.rocket.group.id=fire_test_rocket_serialize_consumer
+ |flink.rocket.starting.offsets=earliest
+ |flink.rocket.consumer.tag=*
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object RocketSourceTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRocketMqPullStreamWithEntity[OrderInfo]().map(order => {
+ println("开始执行")
+ println(order.toString)
+ println("结束执行")
+ }).print()
+ }
+}
+
+
diff --git a/fire-examples/pom.xml b/fire-examples/pom.xml
index 71c4eb55..41279d1a 100644
--- a/fire-examples/pom.xml
+++ b/fire-examples/pom.xml
@@ -31,7 +31,7 @@
com.zto.fire
fire-parent
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
@@ -47,7 +47,7 @@
com.zto.bigdata
cluster-env
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
diff --git a/fire-examples/spark-examples/pom.xml b/fire-examples/spark-examples/pom.xml
index e5d52889..1a0da013 100644
--- a/fire-examples/spark-examples/pom.xml
+++ b/fire-examples/spark-examples/pom.xml
@@ -26,7 +26,7 @@
com.zto.fire
fire-examples
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
../pom.xml
diff --git a/fire-shell/flink-shell/pom.xml b/fire-shell/flink-shell/pom.xml
index e9526496..75593803 100644
--- a/fire-shell/flink-shell/pom.xml
+++ b/fire-shell/flink-shell/pom.xml
@@ -7,7 +7,7 @@
fire-shell
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
diff --git a/fire-shell/pom.xml b/fire-shell/pom.xml
index 54014e7a..97ee38bb 100644
--- a/fire-shell/pom.xml
+++ b/fire-shell/pom.xml
@@ -15,7 +15,7 @@
fire-parent
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
diff --git a/fire-shell/spark-shell/pom.xml b/fire-shell/spark-shell/pom.xml
index 8950eed0..dbb83550 100644
--- a/fire-shell/spark-shell/pom.xml
+++ b/fire-shell/spark-shell/pom.xml
@@ -9,7 +9,7 @@
fire-shell
com.zto.fire
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index ff0b7b0d..09543034 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
com.zto.fire
fire-parent
pom
- 2.6.3-SNAPSHOT
+ 2.7.0-SNAPSHOT
Fire :
Fire framework is a development framework for real-time computing task development . The framework provides a simple and easy to use API that can significantly reduce the development threshold of Spark and Flink and improve development efficiency.
https://github.com/fire-framework/fire
@@ -1047,7 +1047,7 @@
com.zto.bigdata
cluster-env
- 1.0.7-SNAPSHOT
+ 1.0.8-SNAPSHOT
--
Gitee
From 51592c2eed978d2d8131a12781ee7955b865b260 Mon Sep 17 00:00:00 2001
From: zhanggougou <15651908511@163.com>
Date: Mon, 29 Dec 2025 16:49:45 +0800
Subject: [PATCH 2/3] =?UTF-8?q?[fire-1268]=20=E4=BC=98=E5=8C=96flink?=
=?UTF-8?q?=E8=87=AA=E5=8A=A8=E9=87=8D=E5=90=AF=E6=97=B6=E7=9A=84ck?=
=?UTF-8?q?=E9=80=89=E6=8B=A9=EF=BC=8C=E9=81=BF=E5=85=8D=E7=94=A8=E9=94=99?=
=?UTF-8?q?ck?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../FireNoHaCheckPointException.java | 29 +++++++++++++++++++
.../ApplicationDispatcherBootstrap.java | 18 ++++++++++++
.../checkpoint/CheckpointCoordinator.java | 26 +++++++++++++++--
3 files changed, 70 insertions(+), 3 deletions(-)
create mode 100644 fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java
diff --git a/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java
new file mode 100644
index 00000000..4e629b6d
--- /dev/null
+++ b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.zto.fire.common.exception;
+
+/**
+ * 自定义ha无ck异常
+ */
+public class FireNoHaCheckPointException extends RuntimeException {
+
+ public FireNoHaCheckPointException() {
+ super("flink am retry,but find no checkpoint on ha,need failed");
+ }
+
+}
diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index e8abcc50..48d6b117 100644
--- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.deployment.application;
+import com.zto.fire.common.exception.FireNoHaCheckPointException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
@@ -146,6 +147,23 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
return dispatcherGateway.shutDownCluster(
ApplicationStatus.SUCCEEDED);
}
+
+ // TODO: ------------ start:二次开发代码 --------------- //
+ final Optional noCkException =
+ ExceptionUtils.findThrowable(
+ t, FireNoHaCheckPointException.class);
+ LOG.info("try find no ck exception,{}", noCkException.isPresent());
+ if (noCkException.isPresent()) {
+ LOG.error(
+ "Application failed due to FireNoHaCheckPointException, "
+ + "shutdown cluster with UNKNOWN status.",
+ noCkException.get());
+
+ return dispatcherGateway.shutDownCluster(
+ ApplicationStatus.UNKNOWN);
+ }
+ // TODO: ------------ end:二次开发代码 --------------- //
+
final Optional maybeException =
ExceptionUtils.findThrowable(
t, UnsuccessfulExecutionException.class);
diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index f428c117..3ae8b2e3 100644
--- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import com.zto.fire.common.exception.FireNoHaCheckPointException;
import com.zto.fire.common.util.FireEngineUtils;
import com.zto.fire.common.util.TimeExpression;
import org.apache.flink.annotation.VisibleForTesting;
@@ -45,6 +46,8 @@ import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1612,11 +1615,24 @@ public class CheckpointCoordinator {
*/
public boolean restoreInitialCheckpointIfPresent(final Set tasks)
throws Exception {
+ // TODO: ------------ start:二次开发代码 --------------- //
+ //如未采用yarn或出现其他异常,按最严格的方式来校验是否存在最新ck,防止出现故障
+ int attemptId = 2;
+ try{
+ attemptId = ConverterUtils.toContainerId(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key())).getApplicationAttemptId().getAttemptId();
+ } catch (Throwable e){
+ LOG.error("获取yarn重试id出现异常,异常默认am重试次数2");
+ }
+ LOG.info("restoreInitialCheckpointIfPresent start,AttemptId is {}=========",attemptId);
+ // TODO: ------------ end:二次开发代码 --------------- //
final OptionalLong restoredCheckpointId =
restoreLatestCheckpointedStateInternal(
tasks,
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
- false, // initial checkpoints exist only on JobManager failover. ok if not
+ // TODO: ------------ start:二次开发代码 --------------- //
+ //false, // initial checkpoints exist only on JobManager failover. ok if not
+ attemptId != 1 ,//避免低概率zk被清理后 am重试后用错ck
+ // TODO: ------------ end:二次开发代码 --------------- //
// present.
false,
true); // JobManager failover means JobGraphs match exactly.
@@ -1665,10 +1681,14 @@ public class CheckpointCoordinator {
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
if (latest == null) {
- LOG.info("No checkpoint found during restore.");
+ LOG.info("No checkpoint found during restore.judge need throw Exception,{}",errorIfNoCheckpoint);
if (errorIfNoCheckpoint) {
- throw new IllegalStateException("No completed checkpoint available");
+ //throw new IllegalStateException("No completed checkpoint available");
+ // TODO: ------------ start:二次开发代码 --------------- //
+ // 抛出固定异常用于ApplicationDispatcherBootstrap捕获,用以判断是否需要直接终止flink任务,避免无意义的重试
+ throw new FireNoHaCheckPointException();
+ // TODO: ------------ end:二次开发代码 --------------- //
}
LOG.debug("Resetting the master hooks.");
--
Gitee
From 84941d9d893eb689c61c70d42bbcea7170fd4c47 Mon Sep 17 00:00:00 2001
From: zhanggougou <15651908511@163.com>
Date: Wed, 31 Dec 2025 10:14:55 +0800
Subject: [PATCH 3/3] =?UTF-8?q?[fire-1269]=E4=BF=AE=E5=A4=8D=EF=BC=9A?=
=?UTF-8?q?=E7=BD=91=E7=BB=9C=E6=95=85=E9=9A=9C=E6=97=B6=EF=BC=8CsinkKafka?=
=?UTF-8?q?=E5=8F=AF=E8=83=BD=E4=BC=9A=E4=B8=80=E7=9B=B4=E9=87=8D=E8=AF=95?=
=?UTF-8?q?=EF=BC=8Cflink=E6=97=A0=E6=B3=95=E6=84=9F=E7=9F=A5sink=E7=8A=B6?=
=?UTF-8?q?=E6=80=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../com/zto/fire/common/util/MQProducer.scala | 3 ++
.../flink/serialize/KafkaCommonTest.scala | 43 +++++++++++++++++
.../flink/serialize/KafkaSendErrorTest.scala | 39 +++++++++++++++
.../flink/serialize/RocketCommonTest.scala | 48 +++++++++++++++++++
4 files changed, 133 insertions(+)
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala
create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala
diff --git a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
index 92103114..2f02f3fb 100644
--- a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
+++ b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala
@@ -130,6 +130,7 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
if (this.sendErrorCount >= this.maxRetries) {
this.kafkaProducer.close()
logError(s"异常信息发送MQ重试${this.sendErrorCount}次仍失败,将退出异常信息发送!")
+ if (throwable) throw new RuntimeException("发送kafka消息失败,请检查是否存在网络问题或集群维护")
return
}
@@ -178,6 +179,8 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka,
if (this.sendErrorCount >= this.maxRetries) {
this.rocketmqProducer.shutdown()
+ logError(s"异常信息发送rocket重试${this.sendErrorCount}次仍失败,将退出异常信息发送!")
+ if (throwable) throw new RuntimeException("发送rocket消息失败,请检查是否存在网络问题或集群维护")
return
}
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala
new file mode 100644
index 00000000..fece8fa9
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala
@@ -0,0 +1,43 @@
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.common.bean.MQRecord
+import com.zto.fire.common.util.JSONUtils
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import org.apache.flink.api.scala._
+
+import java.util.Date
+
+/**
+ * 测试kafka通用api
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ |kafka.topics=fire-test-kafka
+ |kafka.group.id=fire-test-kafka-consumer
+ |kafka.starting.offsets=earliest
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object KafkaCommonTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRandomIntStream(1).map(x => {
+ val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x)
+ MQRecord(JSONUtils.toJSONString(orderInfo))
+ })
+ .sinkKafka()
+ .uname("sink-kafka-common")
+
+ this.fire.createKafkaDirectStream()
+ .uname("source-kafka-common").map(x => {
+ JSONUtils.parseObject(x, classOf[OrderInfo])
+ }).print().name("print-kafka-common")
+
+ }
+}
+
+
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala
new file mode 100644
index 00000000..d3ff9e5c
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala
@@ -0,0 +1,39 @@
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.common.bean.MQRecord
+import com.zto.fire.common.util.JSONUtils
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import org.apache.flink.api.scala._
+
+import java.util.Date
+
+/**
+ * 写错端口 模拟kafka发送失败
+ *
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092
+ |kafka.topics=fire-test-kafka
+ |kafka.group.id=fire-test-kafka-consumer
+ |kafka.starting.offsets=earliest
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object KafkaSendErrorTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRandomIntStream(1).map(x => {
+ val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x)
+ MQRecord(JSONUtils.toJSONString(orderInfo))
+ })
+ .sinkKafka()
+ .uname("sink-kafka-common")
+
+ }
+}
+
+
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala
new file mode 100644
index 00000000..9f2e33de
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala
@@ -0,0 +1,48 @@
+package com.zto.fire.examples.flink.serialize
+
+import com.zto.bigdata.integeration.util.ZtoSerializeUtil
+import com.zto.fire._
+import com.zto.fire.common.anno.Config
+import com.zto.fire.common.bean.MQRecord
+import com.zto.fire.common.conf.FireRocketMQConf
+import com.zto.fire.common.util.JSONUtils
+import com.zto.fire.flink.FlinkStreaming
+import com.zto.fire.flink.anno.Streaming
+import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf}
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+
+import java.util.Date
+
+/**
+ * @author zhanggougou 2025-12-08 10:09:19
+ */
+@Config(
+ """
+ |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876
+ |flink.rocket.topics=fire_test_rocket
+ |flink.rocket.group.id=fire_test_rocket_consumer
+ |flink.rocket.starting.offsets=earliest
+ |flink.rocket.consumer.tag=*
+ |""")
+@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180)
+object RocketCommonTest extends FlinkStreaming {
+
+ override def process(): Unit = {
+ this.fire.createRandomIntStream(1).map(x => {
+ val orderInfo = new OrderInfo(x, x + "我是rocket common test", new Date(), 1.7 + x)
+ MQRecord(JSONUtils.toJSONString(orderInfo))
+ })
+ .sinkRocketMQ()
+ .uname("sink-rocket-common")
+
+ this.fire.createRocketMqPullStream()
+ .uname("source-rocket-common").map(x => {
+ JSONUtils.parseObject(x, classOf[OrderInfo])
+ }).print().name("print-rocket-common")
+
+ }
+}
+
+
--
Gitee