From e39ac5764bb81078bd5367b29bac6857f5dfb7c8 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Tue, 23 Dec 2025 15:26:34 +0800 Subject: [PATCH 1/3] =?UTF-8?q?[fire-1265]=20=E6=96=B0=E5=A2=9Efire-mq?= =?UTF-8?q?=E6=A8=A1=E5=9D=97,=E6=94=AF=E6=8C=81=E8=87=AA=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=E5=BA=8F=E5=88=97=E5=8C=96=E5=99=A8,=E6=96=B0?= =?UTF-8?q?=E5=A2=9Ezto=E5=86=85=E9=83=A8=E5=BA=8F=E5=88=97=E5=8C=96?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fire-bundle/fire-bundle-flink/pom.xml | 2 +- fire-bundle/fire-bundle-spark/pom.xml | 2 +- fire-bundle/pom.xml | 2 +- fire-common/pom.xml | 2 +- .../com/zto/fire/common/bean/MQRecord.scala | 68 +++++++++++++++++- .../com/zto/fire/common/util/MQProducer.scala | 44 ++++++++---- .../base-connectors/fire-hbase/pom.xml | 2 +- .../base-connectors/fire-jdbc/pom.xml | 2 +- .../base-connectors/fire-mq/pom.xml | 70 +++++++++++++++++++ .../zto/fire/mq/conf/ZtoSerializeConf.scala | 52 ++++++++++++++ .../deserializer/BaseDeserializeEntity.java | 10 +++ ...ZtoKafkaMessageDeserializationSchema.scala | 46 ++++++++++++ ...toRocketMessageDeserializationSchema.scala | 46 ++++++++++++ .../zto/fire/mq/utils/DeserializeUtil.java | 52 ++++++++++++++ fire-connectors/base-connectors/pom.xml | 3 +- .../flink-connectors/flink-clickhouse/pom.xml | 2 +- .../flink-connectors/flink-format/pom.xml | 2 +- .../flink-connectors/flink-paimon/pom.xml | 2 +- .../flink-connectors/flink-rocketmq/pom.xml | 2 +- fire-connectors/flink-connectors/pom.xml | 2 +- fire-connectors/pom.xml | 2 +- fire-connectors/spark-connectors/pom.xml | 2 +- .../spark-connectors/spark-hbase/pom.xml | 2 +- .../spark-connectors/spark-hudi/pom.xml | 2 +- .../spark-connectors/spark-paimon/pom.xml | 2 +- .../spark-connectors/spark-rocketmq/pom.xml | 2 +- fire-core/pom.xml | 2 +- fire-engines/fire-flink/pom.xml | 7 +- .../com/zto/fire/flink/sink/KafkaSink.scala | 3 +- .../zto/fire/flink/sink/RocketMQSink.scala | 4 +- .../fire/flink/ext/stream/DataStreamExt.scala | 66 ++++++++++++++--- .../ext/stream/StreamExecutionEnvExt.scala | 43 +++++++++++- fire-engines/fire-spark/pom.xml | 2 +- fire-engines/pom.xml | 2 +- fire-enhance/apache-arthas/pom.xml | 4 +- fire-enhance/apache-flink/pom.xml | 4 +- fire-enhance/apache-spark/pom.xml | 4 +- fire-enhance/pom.xml | 2 +- fire-examples/flink-examples/pom.xml | 2 +- .../flink/serialize/KafkaSinkTest.scala | 54 ++++++++++++++ .../flink/serialize/KafkaSourceTest.scala | 36 ++++++++++ .../examples/flink/serialize/OrderInfo.java | 65 +++++++++++++++++ .../flink/serialize/RocketSinkTest.scala | 50 +++++++++++++ .../flink/serialize/RocketSourceTest.scala | 34 +++++++++ fire-examples/pom.xml | 4 +- fire-examples/spark-examples/pom.xml | 2 +- fire-shell/flink-shell/pom.xml | 2 +- fire-shell/pom.xml | 2 +- fire-shell/spark-shell/pom.xml | 2 +- pom.xml | 4 +- 50 files changed, 760 insertions(+), 65 deletions(-) create mode 100644 fire-connectors/base-connectors/fire-mq/pom.xml create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala create mode 100644 fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala diff --git a/fire-bundle/fire-bundle-flink/pom.xml b/fire-bundle/fire-bundle-flink/pom.xml index cbae5041..c2797473 100644 --- a/fire-bundle/fire-bundle-flink/pom.xml +++ b/fire-bundle/fire-bundle-flink/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-bundle - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-bundle/fire-bundle-spark/pom.xml b/fire-bundle/fire-bundle-spark/pom.xml index 91ed93e9..b5f57577 100644 --- a/fire-bundle/fire-bundle-spark/pom.xml +++ b/fire-bundle/fire-bundle-spark/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-bundle - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-bundle/pom.xml b/fire-bundle/pom.xml index e8fba457..c8545b8f 100644 --- a/fire-bundle/pom.xml +++ b/fire-bundle/pom.xml @@ -15,7 +15,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-common/pom.xml b/fire-common/pom.xml index e8adbe9f..cc5c25d4 100644 --- a/fire-common/pom.xml +++ b/fire-common/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala index 3415bfd9..5a9e6534 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala @@ -19,6 +19,7 @@ package com.zto.fire.common.bean import com.zto.fire.predef._ import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.remoting.common.RemotingHelper @@ -34,6 +35,33 @@ class MQRecord(var topic: String, var msg: String) { var tag: String = null var flag: Int = 0 var waitStoreMsgOK: Boolean = true + var kafkaHeaders: RecordHeaders = null + // mq自定义头部信息,message的property属性 + var mqHeaders: Map[String, String] = Map() + //特殊场景需要value发送byte + var msgBytes: Array[Byte] = null + + def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean, mqHeaders: Map[String, String], msgBytes: Array[Byte]) = { + this(topic, msg) + this.partition = partition + this.key = key + this.tag = tag + this.flag = flag + this.waitStoreMsgOK = waitStoreMsgOK + this.mqHeaders = mqHeaders + this.msgBytes = msgBytes + } + + def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean,kafkaHeaders:RecordHeaders,msgBytes:Array[Byte]) = { + this(topic, msg) + this.partition = partition + this.key = key + this.tag = tag + this.flag = flag + this.waitStoreMsgOK = waitStoreMsgOK + this.kafkaHeaders = kafkaHeaders + this.msgBytes = msgBytes + } def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean) = { this(topic, msg) @@ -79,12 +107,44 @@ class MQRecord(var topic: String, var msg: String) { new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET), waitStoreMsgOK) } + /** + * 转为RocketMQ 原生bbyte消息体 + */ + def toRocketMQBytes: Message = { + new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msgBytes, waitStoreMsgOK) + } + + def toRocketMQByFlag(sendBytes: Boolean): Message = { + if(sendBytes){ + this.toRocketMQBytes + } else{ + this.toRocketMQ + } + } + + /** * 转为Kafka消息体 */ def toKafka: ProducerRecord[String, String] = { - new ProducerRecord[String, String](topic, partition, key, msg) + new ProducerRecord[String, String](topic, partition, key, msg, kafkaHeaders) + } + + /** + * 转为Kafka byte消息体 + */ + def toKafkaBytes: ProducerRecord[String, Array[Byte]] = { + new ProducerRecord[String, Array[Byte]](topic, partition, key, msgBytes, kafkaHeaders) + } + + def toKafkaByFlag(sendBytes: Boolean): ProducerRecord[_, _] = { + if (sendBytes) { + this.toKafkaBytes + } else { + this.toKafka + } } + } object MQRecord { @@ -94,6 +154,12 @@ object MQRecord { def apply(msg: String, partition: JInt, key: String, tags: JString, flag: Int): MQRecord = new MQRecord(null, msg, partition, key, tags, flag, true) + def apply(key: String,tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,headers,msgBytes) + + def apply(key: String,tags: JString, mqHeaders: Map[String, String],msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,mqHeaders,msgBytes) + + def apply(msg: String, partition: JInt, key: String, tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true,headers,msgBytes) + def apply(msg: String, partition: JInt, key: String, tags: JString): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true) def apply(msg: String, partition: JInt, key: String): MQRecord = new MQRecord(null, msg, partition, key, null, 0, true) diff --git a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala index 8f8f6fd5..92103114 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala @@ -24,8 +24,8 @@ import com.zto.fire.common.enu.{Datasource, Operation} import com.zto.fire.common.lineage.parser.connector.KafkaConnectorParser import com.zto.fire.common.util.MQType.MQType import com.zto.fire.predef._ -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, RecordMetadata} -import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} import org.apache.rocketmq.client.producer.{DefaultMQProducer, SendCallback, SendResult} import java.util.Properties @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @since 2.3.1 */ class MQProducer(url: String, mqType: MQType = MQType.kafka, - otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) extends Logging { + otherConf: Map[String, String] = Map.empty, throwable: Boolean = true,sendBytes:Boolean = false) extends Logging { private lazy val maxRetries = FireFrameworkConf.exceptionTraceSendMQMaxRetries private lazy val sendTimeout = FireFrameworkConf.exceptionSendTimeout private var sendErrorCount = 0 @@ -71,18 +71,23 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, FireKafkaConf.kafkaBrokers(this.url)) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + if(sendBytes){ + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + } else{ + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + } props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.sendTimeout.toString) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") props.put(ProducerConfig.ACKS_CONFIG, "all") props.put(ProducerConfig.RETRIES_CONFIG, "3") this.otherConf.foreach(prop => props.put(prop._1, prop._2)) - val producer = new KafkaProducer[String, String](props) + val producer = new KafkaProducer[Any, Any](props) this.useKafka = true producer } + // rocketmq producer private lazy val rocketmqProducer = { val producer = new DefaultMQProducer("fire") @@ -116,7 +121,11 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, * 消息体 */ def sendKafkaRecord(record: MQRecord): Unit = { - requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查") + if(sendBytes){ + requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查") + } else{ + requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查") + } if (this.sendErrorCount >= this.maxRetries) { this.kafkaProducer.close() @@ -124,9 +133,11 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, return } - val kafkaRecord = record.toKafka + //val kafkaRecord = record.toKafka + val kafkaRecord = record.toKafkaByFlag(sendBytes) this.addLineage(record.topic) - kafkaProducer.send(kafkaRecord, new Callback() { + + kafkaProducer.send(kafkaRecord.asInstanceOf[ProducerRecord[Any, Any]], new Callback() { override def onCompletion(recordMetadata: RecordMetadata, exception: Exception): Unit = { if (exception != null) { sendErrorCount += 1 @@ -137,6 +148,7 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, }) } + /** * 发送消息到kafka * @@ -158,14 +170,19 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, * 发送超时时间 */ def sendRocketMQRecord(record: MQRecord, timeout: Long = 10000): Unit = { - requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查") + if (sendBytes) { + requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查") + } else { + requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查") + } if (this.sendErrorCount >= this.maxRetries) { this.rocketmqProducer.shutdown() return } - val rocketRecord = record.toRocketMQ + val rocketRecord = record.toRocketMQByFlag(sendBytes) + record.mqHeaders.foreach(header => rocketRecord.putUserProperty(header._1, header._2)) this.addLineage(record.topic) this.rocketmqProducer.send(rocketRecord, new SendCallback { override def onSuccess(sendResult: SendResult): Unit = { @@ -252,7 +269,8 @@ object MQProducer { } def apply(url: String, mqType: MQType = MQType.kafka, - otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) = new MQProducer(url, mqType, otherConf, throwable) + otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false) = new MQProducer(url, mqType, otherConf, throwable,sendByte) + /** * 发送消息到指定的mq topic @@ -276,8 +294,8 @@ object MQProducer { * 优化参数 */ def sendRecord(url: String, record: MQRecord, - mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true): Unit = { - val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable)) + mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false): Unit = { + val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable,sendByte)) producer.send(record) } diff --git a/fire-connectors/base-connectors/fire-hbase/pom.xml b/fire-connectors/base-connectors/fire-hbase/pom.xml index 58734d82..27ca451f 100644 --- a/fire-connectors/base-connectors/fire-hbase/pom.xml +++ b/fire-connectors/base-connectors/fire-hbase/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-connectors-common - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/base-connectors/fire-jdbc/pom.xml b/fire-connectors/base-connectors/fire-jdbc/pom.xml index a834e58a..07677c89 100644 --- a/fire-connectors/base-connectors/fire-jdbc/pom.xml +++ b/fire-connectors/base-connectors/fire-jdbc/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-connectors-common - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/base-connectors/fire-mq/pom.xml b/fire-connectors/base-connectors/fire-mq/pom.xml new file mode 100644 index 00000000..aa738473 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/pom.xml @@ -0,0 +1,70 @@ + + + + + 4.0.0 + fire-connector-mq_${scala.binary.version} + jar + Fire : Connectors : Common : mq + + + com.zto.fire + fire-connectors-common + 2.7.0-SNAPSHOT + ../pom.xml + + + + + org.apache.flink + flink-connector-kafka${contains.scala.binary.version} + ${flink.kafka.connector.version} + provided + + + com.zto.fire + fire-connector-flink-rocketmq_${flink.reference} + ${fire.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 8 + 8 + + + + + + src/main/resources + true + + + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala new file mode 100644 index 00000000..1b9a9d56 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zto.fire.flink.conf + + +/** + * zto序列化常量 + * + * @author zhanggougou + * @since 1.1.0 + * @create 2025-12-02 14:55 + */ +private[fire] object ZtoSerializeConf { + + lazy val SERIALIZER_CLASS_NAME = "com.zto.bigdata.integeration.util.ZtoSerializeUtil" + + lazy val SERIALIZER_CLASS_INIT_METHOD_NAME = "initSerializerManager" + + lazy val SERIALIZER_CLASS_DES_METHOD_NAME = "deserializeValue" + + lazy val SERIALIZER_HEAD_KEY = "serializer_type" + + +} + + +/** + * 序列化类型枚举 + */ +object SerializeType extends Enumeration { + type SerializeType = Value + + val FURY = Value("FURY") + val JSON = Value("JSON") + // 自动识别 + val JAVA = Value("JAVA") +} diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java new file mode 100644 index 00000000..adf7dbf6 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java @@ -0,0 +1,10 @@ +package com.zto.fire.mq.deserializer; + +import java.io.Serializable; + +/** + * 基础反序列化实体类 + */ +public class BaseDeserializeEntity implements Serializable { + +} diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala new file mode 100644 index 00000000..998717f0 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala @@ -0,0 +1,46 @@ +package com.zto.fire.mq.deserializer + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire.common.util.ReflectionUtils +import com.zto.fire.flink.conf.ZtoSerializeConf +import com.zto.fire.mq.utils.DeserializeUtil +import org.apache.flink.api.common.serialization.DeserializationSchema +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema +import org.apache.kafka.clients.consumer.ConsumerRecord + +import java.nio.charset.StandardCharsets +import scala.reflect.ClassTag +import com.zto.fire.predef._ + +/** + * 自定义反序列化器,支持反序列化kafka消息中的key与value + */ +class ZtoKafkaMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends KafkaDeserializationSchema[T] { + + override def isEndOfStream(t: T): Boolean = false + + override def open(context: DeserializationSchema.InitializationContext): Unit = { + DeserializeUtil.initZtoSerializer(topic) + } + + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = { + try{ + val headerVal = record.headers().lastHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY); + if (headerVal == null) { + throw new RuntimeException("Kafka 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误") + } + val headType = new String(headerVal.value(), StandardCharsets.UTF_8) + DeserializeUtil.deserializeAndFill(getGeneric[T](),headType,record.value()) + } catch { + case e: Throwable => { + throw new RuntimeException("Kafka 反序列化失败", e) + } + } + + } + + override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]()) +} + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala new file mode 100644 index 00000000..6c1fab45 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala @@ -0,0 +1,46 @@ +package com.zto.fire.mq.deserializer + +import com.zto.fire.flink.conf.ZtoSerializeConf +import com.zto.fire.mq.utils.DeserializeUtil +import com.zto.fire.predef._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.rocketmq.common.message.MessageExt +import org.apache.rocketmq.flink.common.serialization.TagKeyValueDeserializationSchema + +import scala.reflect.ClassTag + +/** + * 自定义反序列化器,支持反序列化kafka消息中的key与value + */ +class ZtoRocketMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends TagKeyValueDeserializationSchema[T] { + + @volatile private var initialized = false + + private def initOnce(): Unit = synchronized { + if (!initialized) { + DeserializeUtil.initZtoSerializer(topic) + initialized = true + } + } + + override def deserializeTagKeyAndValue(msg: MessageExt): T = { + try { + this.initOnce() + val headerVal = msg.getProperty(ZtoSerializeConf.SERIALIZER_HEAD_KEY) + if (headerVal == null) { + throw new RuntimeException("rocket 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误") + } + DeserializeUtil.deserializeAndFill(getGeneric[T](),headerVal,msg.getBody) + } catch { + case e: Throwable => { + throw new RuntimeException("rocket 反序列化失败", e) + } + } + } + + override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]()) + +} + + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java new file mode 100644 index 00000000..f6575779 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java @@ -0,0 +1,52 @@ +package com.zto.fire.mq.utils; + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil; +import com.zto.fire.common.util.ReflectionUtils; +import com.zto.fire.flink.conf.ZtoSerializeConf$; +import com.zto.zdata.schema.format.InputData; +import javafx.print.Collation; + +import java.lang.reflect.Field; +import java.util.Collection; + +/** + * 序列化工具类 + */ +public class DeserializeUtil { + /** + * 初始化序列化器(根据 topic) + * @param topic 主题名称 + */ + public static void initZtoSerializer(String topic) throws Throwable { + Class deserializationClass = ReflectionUtils.forName(ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_NAME()); + if (deserializationClass != null) { + ReflectionUtils.getMethodByName(deserializationClass, ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_INIT_METHOD_NAME()).invoke(null, topic, null); + } + } + + /** + * 统一反序列化 + 反射赋值 + * + * @param 目标对象类型 + * @param clazz 目标 Class + * @param headType 序列化类型(header 中的值) + * @param value 原始 byte[] + * @return T + */ + public static T deserializeAndFill(Class clazz, String headType, byte[] value) { + try { + InputData inputData = ZtoSerializeUtil.serializerManager.deserialize(headType, value); + T target = clazz.getDeclaredConstructor().newInstance(); + // 3. 反射字段赋值(包含 private) + Collection fields = ReflectionUtils.getAllFields(clazz).values(); + for(Field field : fields){ + field.setAccessible(true); + Object fieldValue = inputData.getObject(field.getName()); + field.set(target, fieldValue); + } + return target; + } catch (Exception e) { + throw new RuntimeException("反序列化失败,class=" + clazz.getName(), e); + } + } +} diff --git a/fire-connectors/base-connectors/pom.xml b/fire-connectors/base-connectors/pom.xml index a7744a25..650588e8 100644 --- a/fire-connectors/base-connectors/pom.xml +++ b/fire-connectors/base-connectors/pom.xml @@ -26,13 +26,14 @@ com.zto.fire fire-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml fire-jdbc fire-hbase + fire-mq diff --git a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml index 3095eb3c..c8012564 100644 --- a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml +++ b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml @@ -26,7 +26,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-connectors/flink-connectors/flink-format/pom.xml b/fire-connectors/flink-connectors/flink-format/pom.xml index e54b1228..0b9d64a3 100644 --- a/fire-connectors/flink-connectors/flink-format/pom.xml +++ b/fire-connectors/flink-connectors/flink-format/pom.xml @@ -8,7 +8,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/flink-paimon/pom.xml b/fire-connectors/flink-connectors/flink-paimon/pom.xml index 7e91a1c7..7db9cf9d 100644 --- a/fire-connectors/flink-connectors/flink-paimon/pom.xml +++ b/fire-connectors/flink-connectors/flink-paimon/pom.xml @@ -10,7 +10,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml index 2026444e..659ea15a 100644 --- a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml +++ b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml @@ -25,7 +25,7 @@ com.zto.fire fire-flink-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/pom.xml b/fire-connectors/flink-connectors/pom.xml index b41ac446..a5985c68 100644 --- a/fire-connectors/flink-connectors/pom.xml +++ b/fire-connectors/flink-connectors/pom.xml @@ -32,7 +32,7 @@ fire-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/pom.xml b/fire-connectors/pom.xml index 3a736f15..ecd6abcf 100644 --- a/fire-connectors/pom.xml +++ b/fire-connectors/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/pom.xml b/fire-connectors/spark-connectors/pom.xml index aee51d44..cb86dbe5 100644 --- a/fire-connectors/spark-connectors/pom.xml +++ b/fire-connectors/spark-connectors/pom.xml @@ -32,7 +32,7 @@ fire-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-hbase/pom.xml b/fire-connectors/spark-connectors/spark-hbase/pom.xml index e45d7a3a..fca1a7ba 100644 --- a/fire-connectors/spark-connectors/spark-hbase/pom.xml +++ b/fire-connectors/spark-connectors/spark-hbase/pom.xml @@ -26,7 +26,7 @@ fire-spark-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-connectors/spark-connectors/spark-hudi/pom.xml b/fire-connectors/spark-connectors/spark-hudi/pom.xml index 68a33ec8..20e2174f 100644 --- a/fire-connectors/spark-connectors/spark-hudi/pom.xml +++ b/fire-connectors/spark-connectors/spark-hudi/pom.xml @@ -27,7 +27,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-paimon/pom.xml b/fire-connectors/spark-connectors/spark-paimon/pom.xml index 6859bccf..e66e448a 100644 --- a/fire-connectors/spark-connectors/spark-paimon/pom.xml +++ b/fire-connectors/spark-connectors/spark-paimon/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml index ebca0cf1..545e9b1d 100644 --- a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml +++ b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-core/pom.xml b/fire-core/pom.xml index b63e696c..a38fa45e 100644 --- a/fire-core/pom.xml +++ b/fire-core/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-engines/fire-flink/pom.xml b/fire-engines/fire-flink/pom.xml index bab3900b..20aca242 100644 --- a/fire-engines/fire-flink/pom.xml +++ b/fire-engines/fire-flink/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-engines - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml @@ -43,6 +43,11 @@ ${fire.version} ${maven.scope} + + com.zto.fire + fire-connector-mq_${scala.binary.version} + ${fire.version} + com.sparkjava diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala index cefd4bbf..6e4938e5 100644 --- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala +++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala @@ -36,6 +36,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec url: String, topic: String, batch: Int = 100, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { private lazy val (finalBrokers, finalTopic, finalConf) = KafkaUtils.getConfByKeyNum(params, url, topic, keyNum) @@ -47,7 +48,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec override def sink(dataList: List[T]): Unit = { dataList.foreach(record => { if (isEmpty(record.topic)) record.topic = finalTopic - MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf) + MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf,sendByte=sendByte) }) } } diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala index c13705e7..fde43cec 100644 --- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala +++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Object], url: String, topic: String, tag: String = "*", batch: Int = 100, - flushInterval: Long = 1000, keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { + flushInterval: Long = 1000, sendByte: Boolean = false,keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { private lazy val (finalBrokers, finalTopic, finalTag, finalConf) = RocketMQUtils.getConfByKeyNum(url, topic, tag, params, keyNum) @@ -46,7 +46,7 @@ abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Ob dataList.foreach(record => { if (isEmpty(record.topic)) record.topic = finalTopic if (isEmpty(record.tag) && noEmpty(finalTag)) record.tag = finalTag - MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf) + MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf,sendByte=sendByte) }) } } diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala index 1a8f8ae1..4b9e63da 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala @@ -303,8 +303,9 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st tag: String = "*", mqType: MQType = MQType.kafka, batch: Int = 100, flushInterval: Long = 1000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E]) } /** @@ -327,11 +328,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st tag: String = "*", mqType: MQType = MQType.kafka, batch: Int = 100, flushInterval: Long = 1000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1)(mapFunction: T => E): DataStreamSink[_] = { if (mqType == MQType.rocketmq) { - this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(mapFunction) + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte, keyNum)(mapFunction) } else { - this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(mapFunction) + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval,sendByte, keyNum)(mapFunction) } } @@ -344,15 +346,41 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st * 消息队列的url * @param topic * 发送消息到指定的主题 + * * @param sendByte + * 是否发送byte值的消息,默认false发送string * @param keyNum * 指定配置的keyNum,可从配置或注解中获取对应配置信息 */ def sinkKafka[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E]) + } + + /** + * 将数据实时sink到指定的kafka topic + * + * @param params + * 额外的producer参数 + * @param url + * 消息队列的url + * @param topic + * 发送消息到指定的主题 + * * @param sendByte + * 是否发送byte值的消息,默认false发送string + * @param keyNum + * 指定配置的keyNum,可从配置或注解中获取对应配置信息 + */ + def sinkKafkaByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null, + url: String = null, topic: String = null, + batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = true, + keyNum: Int = KeyNum._1): DataStreamSink[_] = { + + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E]) } /** @@ -370,11 +398,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st def sinkKafkaFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = { val finalBatch = if (FireKafkaConf.kafkaSinkBatch(keyNum) > 0) FireKafkaConf.kafkaSinkBatch(keyNum) else batch val finalInterval = if (FireKafkaConf.kafkaFlushInterval(keyNum) > 0) FireKafkaConf.kafkaFlushInterval(keyNum) else flushInterval - this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, keyNum) { + this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, sendByte,keyNum) { override def map(value: T): E = fun(value) }) } @@ -394,8 +423,29 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st def sinkRocketMQ[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, tag: String = "*", batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, + keyNum: Int = KeyNum._1): DataStreamSink[_] = { + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte,keyNum)(_.asInstanceOf[E]) + } + + /** + * 将数据实时sink到指定的rocketmq topic,发送byte类型的数据 + * + * @param params + * 额外的producer参数 + * @param url + * 消息队列的url + * @param topic + * 发送消息到指定的主题 + * @param keyNum + * 指定配置的keyNum,可从配置或注解中获取对应配置信息 + */ + def sinkRocketMQByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null, + url: String = null, topic: String = null, tag: String = "*", + batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = true, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E]) } /** @@ -412,12 +462,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st */ def sinkRocketMQFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, tag: String = "*", - batch: Int = 1000, flushInterval: Long = 5000, + batch: Int = 1000, flushInterval: Long = 5000, sendByte: Boolean = false, keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = { val finalBatch = if (FireRocketMQConf.rocketSinkBatch(keyNum) > 0) FireRocketMQConf.rocketSinkBatch(keyNum) else batch val finalInterval = if (FireRocketMQConf.rocketSinkFlushInterval(keyNum) > 0) FireRocketMQConf.rocketSinkFlushInterval(keyNum) else flushInterval - this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval, keyNum) { + this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval,sendByte, keyNum) { override def map(value: T): E = fun(value) }) } diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala index 19d1812f..d304f017 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala @@ -32,6 +32,7 @@ import com.zto.fire.flink.ext.provider.{HBaseConnectorProvider, JdbcFlinkProvide import com.zto.fire.flink.sql.FlinkSqlExtensionsParser import com.zto.fire.flink.util.{FlinkRocketMQUtils, FlinkSingletonFactory, FlinkUtils, TableUtils} import com.zto.fire.jdbc.JdbcConnectorBridge +import com.zto.fire.mq.deserializer.{BaseDeserializeEntity, ZtoKafkaMessageDeserializationSchema, ZtoRocketMessageDeserializationSchema} import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema} @@ -154,9 +155,7 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu runtimeContext: RuntimeContext = null, deserializer: Any = new SimpleStringSchema, keyNum: Int = KeyNum._1): DataStream[T] = { - val kafkaConsumer = this.createKafkaConsumer[T](kafkaParams, topics, deserializer, keyNum) - if (runtimeContext != null) kafkaConsumer.setRuntimeContext(runtimeContext) if (specificStartupOffsets != null) kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets) // 设置从指定时间戳位置开始消费kafka @@ -173,6 +172,24 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu this.addSourceWrap(kafkaConsumer) } + /** + * 创建DStream流 + * + * @param kafkaParams + * kafka相关的配置参数 + * @return + * DStream + */ + def createDirectStreamWithZtoDeserialize[T <: BaseDeserializeEntity:ClassTag](kafkaParams: Map[String, Object] = null, + topics: Set[String] = null, + specificStartupOffsets: Map[KafkaTopicPartition, java.lang.Long] = null, + runtimeContext: RuntimeContext = null, + keyNum: Int = KeyNum._1)(implicit typeInfo: TypeInformation[T]): DataStream[T] = { + val confTopics = FireKafkaConf.kafkaTopics(keyNum) + val ztoDeserialize = new ZtoKafkaMessageDeserializationSchema[T](confTopics); + this.createDirectStreamBySchema[T](kafkaParams, topics, specificStartupOffsets, runtimeContext, ztoDeserialize, keyNum = keyNum) + } + /** * 创建DStream流 * @@ -261,6 +278,28 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu this.addSourceWrap(new RocketMQSourceWithTag[(String, String, String)](new SimpleTagKeyValueDeserializationSchema, props)).name("RocketMQ Source") } + /** + * 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value + * + * @param rocketParam + * rocketMQ相关消费参数 + * @param groupId + * groupId + * @param topics + * topic列表 + * @return + * rocketMQ DStream + */ + def createRocketMqPullStreamWithEntity[T <: BaseDeserializeEntity:ClassTag:TypeInformation](rocketParam: Map[String, String] = null, + groupId: String = null, + topics: String = null, + tag: String = null, + keyNum: Int = KeyNum._1): DataStream[T] = { + val props = buildRocketMQProps(rocketParam, groupId, topics, tag, keyNum) + val confTopics = FireRocketMQConf.rocketTopics(keyNum) + this.addSourceWrap(new RocketMQSourceWithTag[T](new ZtoRocketMessageDeserializationSchema[T](confTopics), props)).name("RocketMQ Entity Source") + } + /** * 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value等相关元数据信息 * diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml index de5945a5..5436ae0b 100644 --- a/fire-engines/fire-spark/pom.xml +++ b/fire-engines/fire-spark/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-engines - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-engines/pom.xml b/fire-engines/pom.xml index 8a81a48c..c6923f58 100644 --- a/fire-engines/pom.xml +++ b/fire-engines/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-arthas/pom.xml b/fire-enhance/apache-arthas/pom.xml index b1a103c5..b60f5ca3 100644 --- a/fire-enhance/apache-arthas/pom.xml +++ b/fire-enhance/apache-arthas/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-arthas_${scala.binary.version} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Arthas com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-flink/pom.xml b/fire-enhance/apache-flink/pom.xml index 0b3a56cb..229bed08 100644 --- a/fire-enhance/apache-flink/pom.xml +++ b/fire-enhance/apache-flink/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-flink_${flink.reference} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Flink com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-spark/pom.xml b/fire-enhance/apache-spark/pom.xml index 72483511..f2ee634f 100644 --- a/fire-enhance/apache-spark/pom.xml +++ b/fire-enhance/apache-spark/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-spark_${spark.reference} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Spark com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/pom.xml b/fire-enhance/pom.xml index 2ad08a69..5edd1a81 100644 --- a/fire-enhance/pom.xml +++ b/fire-enhance/pom.xml @@ -25,7 +25,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-examples/flink-examples/pom.xml b/fire-examples/flink-examples/pom.xml index 87fc6343..3236dbcb 100644 --- a/fire-examples/flink-examples/pom.xml +++ b/fire-examples/flink-examples/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-examples - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala new file mode 100644 index 00000000..6f067674 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala @@ -0,0 +1,54 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.FireKafkaConf +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} + +import java.util.Date + +/** + * 使用自定义反序列化器发送kafka + * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092 + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire_test_kafka_serialize + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSinkTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] { + + override def open(parameters: Configuration): Unit = { + ZtoSerializeUtil.initSerializerManager(FireKafkaConf.kafkaTopics(1) ,null) + } + + override def map(x: Int): MQRecord = { + val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance) + outputData.setObject("id", x) + outputData.setObject("name", x + "我") + outputData.setObject("createTime", new Date()) + outputData.setObject("money", 1.6 + x) + val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData) + val headers = new RecordHeaders() + headers.add(new RecordHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY, SerializeType.FURY.toString.getBytes())) + MQRecord(null, null, headers,bytes) + } + }).sinkKafkaByte().name("testSink") + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala new file mode 100644 index 00000000..1d8e7cfa --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala @@ -0,0 +1,36 @@ + +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala.createTypeInformation + + +/** + * 使用自定义反序列化器消费kafka + * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092 + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire_test_kafka_serialize + |kafka.group.id=fire_test_kafka_serialize_consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSourceTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createDirectStreamWithZtoDeserialize[OrderInfo]().map(order => { + println("开始执行") + println(order.toString) + println("结束执行") + }).print() + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java new file mode 100644 index 00000000..9d0f5e13 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java @@ -0,0 +1,65 @@ +package com.zto.fire.examples.flink.serialize; + +import com.zto.fire.mq.deserializer.BaseDeserializeEntity; + +import java.io.Serializable; +import java.util.Date; + +public class OrderInfo extends BaseDeserializeEntity { + private Integer id; + private String name; + private Date createTime; + private Double money; + + public OrderInfo() { + } + + public OrderInfo(Integer id, String name, Date createTime, Double money) { + this.id = id; + this.name = name; + this.createTime = createTime; + this.money = money; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Double getMoney() { + return money; + } + + public void setMoney(Double money) { + this.money = money; + } + + @Override + public String toString() { + return "OrderInfo{" + + "id=" + id + + ", name='" + name + '\'' + + ", createTime=" + createTime + + ", money=" + money + + '}'; + } +} diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala new file mode 100644 index 00000000..a9254f00 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala @@ -0,0 +1,50 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.{FireKafkaConf, FireRocketMQConf} +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} + +import java.util.Date + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket_serialize + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketSinkTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] { + + override def open(parameters: Configuration): Unit = { + ZtoSerializeUtil.initSerializerManager(FireRocketMQConf.rocketTopics(1) ,null) + } + + override def map(x: Int): MQRecord = { + val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance) + outputData.setObject("id", x) + outputData.setObject("name", x + "我是rocket") + outputData.setObject("createTime", new Date()) + outputData.setObject("money", 1.6 + x) + val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData) + val headMap = Map((ZtoSerializeConf.SERIALIZER_HEAD_KEY,SerializeType.FURY.toString)) + MQRecord(null, null, headMap,bytes) + } + }).sinkRocketMQByte().name("testSink") + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala new file mode 100644 index 00000000..1fcda3ce --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala @@ -0,0 +1,34 @@ + +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala.createTypeInformation + + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket_serialize + |flink.rocket.group.id=fire_test_rocket_serialize_consumer + |flink.rocket.starting.offsets=earliest + |flink.rocket.consumer.tag=* + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketSourceTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRocketMqPullStreamWithEntity[OrderInfo]().map(order => { + println("开始执行") + println(order.toString) + println("结束执行") + }).print() + } +} + + diff --git a/fire-examples/pom.xml b/fire-examples/pom.xml index 71c4eb55..41279d1a 100644 --- a/fire-examples/pom.xml +++ b/fire-examples/pom.xml @@ -31,7 +31,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml @@ -47,7 +47,7 @@ com.zto.bigdata cluster-env - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT diff --git a/fire-examples/spark-examples/pom.xml b/fire-examples/spark-examples/pom.xml index e5d52889..1a0da013 100644 --- a/fire-examples/spark-examples/pom.xml +++ b/fire-examples/spark-examples/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-examples - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-shell/flink-shell/pom.xml b/fire-shell/flink-shell/pom.xml index e9526496..75593803 100644 --- a/fire-shell/flink-shell/pom.xml +++ b/fire-shell/flink-shell/pom.xml @@ -7,7 +7,7 @@ fire-shell com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-shell/pom.xml b/fire-shell/pom.xml index 54014e7a..97ee38bb 100644 --- a/fire-shell/pom.xml +++ b/fire-shell/pom.xml @@ -15,7 +15,7 @@ fire-parent com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-shell/spark-shell/pom.xml b/fire-shell/spark-shell/pom.xml index 8950eed0..dbb83550 100644 --- a/fire-shell/spark-shell/pom.xml +++ b/fire-shell/spark-shell/pom.xml @@ -9,7 +9,7 @@ fire-shell com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index ff0b7b0d..09543034 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.zto.fire fire-parent pom - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT Fire : Fire framework is a development framework for real-time computing task development . The framework provides a simple and easy to use API that can significantly reduce the development threshold of Spark and Flink and improve development efficiency. https://github.com/fire-framework/fire @@ -1047,7 +1047,7 @@ com.zto.bigdata cluster-env - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT -- Gitee From 51592c2eed978d2d8131a12781ee7955b865b260 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Mon, 29 Dec 2025 16:49:45 +0800 Subject: [PATCH 2/3] =?UTF-8?q?[fire-1268]=20=E4=BC=98=E5=8C=96flink?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E9=87=8D=E5=90=AF=E6=97=B6=E7=9A=84ck?= =?UTF-8?q?=E9=80=89=E6=8B=A9=EF=BC=8C=E9=81=BF=E5=85=8D=E7=94=A8=E9=94=99?= =?UTF-8?q?ck?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FireNoHaCheckPointException.java | 29 +++++++++++++++++++ .../ApplicationDispatcherBootstrap.java | 18 ++++++++++++ .../checkpoint/CheckpointCoordinator.java | 26 +++++++++++++++-- 3 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java diff --git a/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java new file mode 100644 index 00000000..4e629b6d --- /dev/null +++ b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zto.fire.common.exception; + +/** + * 自定义ha无ck异常 + */ +public class FireNoHaCheckPointException extends RuntimeException { + + public FireNoHaCheckPointException() { + super("flink am retry,but find no checkpoint on ha,need failed"); + } + +} diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index e8abcc50..48d6b117 100644 --- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment.application; +import com.zto.fire.common.exception.FireNoHaCheckPointException; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; @@ -146,6 +147,23 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { return dispatcherGateway.shutDownCluster( ApplicationStatus.SUCCEEDED); } + + // TODO: ------------ start:二次开发代码 --------------- // + final Optional noCkException = + ExceptionUtils.findThrowable( + t, FireNoHaCheckPointException.class); + LOG.info("try find no ck exception,{}", noCkException.isPresent()); + if (noCkException.isPresent()) { + LOG.error( + "Application failed due to FireNoHaCheckPointException, " + + "shutdown cluster with UNKNOWN status.", + noCkException.get()); + + return dispatcherGateway.shutDownCluster( + ApplicationStatus.UNKNOWN); + } + // TODO: ------------ end:二次开发代码 --------------- // + final Optional maybeException = ExceptionUtils.findThrowable( t, UnsuccessfulExecutionException.class); diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index f428c117..3ae8b2e3 100644 --- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import com.zto.fire.common.exception.FireNoHaCheckPointException; import com.zto.fire.common.util.FireEngineUtils; import com.zto.fire.common.util.TimeExpression; import org.apache.flink.annotation.VisibleForTesting; @@ -45,6 +46,8 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1612,11 +1615,24 @@ public class CheckpointCoordinator { */ public boolean restoreInitialCheckpointIfPresent(final Set tasks) throws Exception { + // TODO: ------------ start:二次开发代码 --------------- // + //如未采用yarn或出现其他异常,按最严格的方式来校验是否存在最新ck,防止出现故障 + int attemptId = 2; + try{ + attemptId = ConverterUtils.toContainerId(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key())).getApplicationAttemptId().getAttemptId(); + } catch (Throwable e){ + LOG.error("获取yarn重试id出现异常,异常默认am重试次数2"); + } + LOG.info("restoreInitialCheckpointIfPresent start,AttemptId is {}=========",attemptId); + // TODO: ------------ end:二次开发代码 --------------- // final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, - false, // initial checkpoints exist only on JobManager failover. ok if not + // TODO: ------------ start:二次开发代码 --------------- // + //false, // initial checkpoints exist only on JobManager failover. ok if not + attemptId != 1 ,//避免低概率zk被清理后 am重试后用错ck + // TODO: ------------ end:二次开发代码 --------------- // // present. false, true); // JobManager failover means JobGraphs match exactly. @@ -1665,10 +1681,14 @@ public class CheckpointCoordinator { CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); if (latest == null) { - LOG.info("No checkpoint found during restore."); + LOG.info("No checkpoint found during restore.judge need throw Exception,{}",errorIfNoCheckpoint); if (errorIfNoCheckpoint) { - throw new IllegalStateException("No completed checkpoint available"); + //throw new IllegalStateException("No completed checkpoint available"); + // TODO: ------------ start:二次开发代码 --------------- // + // 抛出固定异常用于ApplicationDispatcherBootstrap捕获,用以判断是否需要直接终止flink任务,避免无意义的重试 + throw new FireNoHaCheckPointException(); + // TODO: ------------ end:二次开发代码 --------------- // } LOG.debug("Resetting the master hooks."); -- Gitee From 84941d9d893eb689c61c70d42bbcea7170fd4c47 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Wed, 31 Dec 2025 10:14:55 +0800 Subject: [PATCH 3/3] =?UTF-8?q?[fire-1269]=E4=BF=AE=E5=A4=8D=EF=BC=9A?= =?UTF-8?q?=E7=BD=91=E7=BB=9C=E6=95=85=E9=9A=9C=E6=97=B6=EF=BC=8CsinkKafka?= =?UTF-8?q?=E5=8F=AF=E8=83=BD=E4=BC=9A=E4=B8=80=E7=9B=B4=E9=87=8D=E8=AF=95?= =?UTF-8?q?=EF=BC=8Cflink=E6=97=A0=E6=B3=95=E6=84=9F=E7=9F=A5sink=E7=8A=B6?= =?UTF-8?q?=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zto/fire/common/util/MQProducer.scala | 3 ++ .../flink/serialize/KafkaCommonTest.scala | 43 +++++++++++++++++ .../flink/serialize/KafkaSendErrorTest.scala | 39 +++++++++++++++ .../flink/serialize/RocketCommonTest.scala | 48 +++++++++++++++++++ 4 files changed, 133 insertions(+) create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala create mode 100644 fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala diff --git a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala index 92103114..2f02f3fb 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala @@ -130,6 +130,7 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, if (this.sendErrorCount >= this.maxRetries) { this.kafkaProducer.close() logError(s"异常信息发送MQ重试${this.sendErrorCount}次仍失败,将退出异常信息发送!") + if (throwable) throw new RuntimeException("发送kafka消息失败,请检查是否存在网络问题或集群维护") return } @@ -178,6 +179,8 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, if (this.sendErrorCount >= this.maxRetries) { this.rocketmqProducer.shutdown() + logError(s"异常信息发送rocket重试${this.sendErrorCount}次仍失败,将退出异常信息发送!") + if (throwable) throw new RuntimeException("发送rocket消息失败,请检查是否存在网络问题或集群维护") return } diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala new file mode 100644 index 00000000..fece8fa9 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala @@ -0,0 +1,43 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala._ + +import java.util.Date + +/** + * 测试kafka通用api + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire-test-kafka + |kafka.group.id=fire-test-kafka-consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaCommonTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkKafka() + .uname("sink-kafka-common") + + this.fire.createKafkaDirectStream() + .uname("source-kafka-common").map(x => { + JSONUtils.parseObject(x, classOf[OrderInfo]) + }).print().name("print-kafka-common") + + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala new file mode 100644 index 00000000..d3ff9e5c --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala @@ -0,0 +1,39 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala._ + +import java.util.Date + +/** + * 写错端口 模拟kafka发送失败 + * + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire-test-kafka + |kafka.group.id=fire-test-kafka-consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSendErrorTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkKafka() + .uname("sink-kafka-common") + + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala new file mode 100644 index 00000000..9f2e33de --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala @@ -0,0 +1,48 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.FireRocketMQConf +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration + +import java.util.Date + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket + |flink.rocket.group.id=fire_test_rocket_consumer + |flink.rocket.starting.offsets=earliest + |flink.rocket.consumer.tag=* + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketCommonTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是rocket common test", new Date(), 1.7 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkRocketMQ() + .uname("sink-rocket-common") + + this.fire.createRocketMqPullStream() + .uname("source-rocket-common").map(x => { + JSONUtils.parseObject(x, classOf[OrderInfo]) + }).print().name("print-rocket-common") + + } +} + + -- Gitee