diff --git a/fire-bundle/fire-bundle-flink/pom.xml b/fire-bundle/fire-bundle-flink/pom.xml index cbae50418a7248d1445662521d3cabd9987da053..c2797473b272801308b9791f4f67e712acd4c2c7 100644 --- a/fire-bundle/fire-bundle-flink/pom.xml +++ b/fire-bundle/fire-bundle-flink/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-bundle - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-bundle/fire-bundle-spark/pom.xml b/fire-bundle/fire-bundle-spark/pom.xml index 91ed93e9fa85900b6868348f80f9406f2e21874c..b5f5757746580bd7d024e81e305b8b31c02ad546 100644 --- a/fire-bundle/fire-bundle-spark/pom.xml +++ b/fire-bundle/fire-bundle-spark/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-bundle - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-bundle/pom.xml b/fire-bundle/pom.xml index e8fba457247f89c847f10ea075d0160519bc0a3e..c8545b8f43c8e119ed3439343bed43bd4bd42df5 100644 --- a/fire-bundle/pom.xml +++ b/fire-bundle/pom.xml @@ -15,7 +15,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-common/pom.xml b/fire-common/pom.xml index e8adbe9f1b1c8e9beac0945823005c4a7c125a70..cc5c25d4437de27ff86a2125cae452f017d511b3 100644 --- a/fire-common/pom.xml +++ b/fire-common/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java new file mode 100644 index 0000000000000000000000000000000000000000..4e629b6d87e276ad1ba4e2506d9ca01f6b303c35 --- /dev/null +++ b/fire-common/src/main/java/com/zto/fire/common/exception/FireNoHaCheckPointException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zto.fire.common.exception; + +/** + * 自定义ha无ck异常 + */ +public class FireNoHaCheckPointException extends RuntimeException { + + public FireNoHaCheckPointException() { + super("flink am retry,but find no checkpoint on ha,need failed"); + } + +} diff --git a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala index 3415bfd9150339b6af63088ccece8baad8757808..5a9e6534bdfc4f8dd050ac7d0065a2424eab82fa 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/bean/MQRecord.scala @@ -19,6 +19,7 @@ package com.zto.fire.common.bean import com.zto.fire.predef._ import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.remoting.common.RemotingHelper @@ -34,6 +35,33 @@ class MQRecord(var topic: String, var msg: String) { var tag: String = null var flag: Int = 0 var waitStoreMsgOK: Boolean = true + var kafkaHeaders: RecordHeaders = null + // mq自定义头部信息,message的property属性 + var mqHeaders: Map[String, String] = Map() + //特殊场景需要value发送byte + var msgBytes: Array[Byte] = null + + def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean, mqHeaders: Map[String, String], msgBytes: Array[Byte]) = { + this(topic, msg) + this.partition = partition + this.key = key + this.tag = tag + this.flag = flag + this.waitStoreMsgOK = waitStoreMsgOK + this.mqHeaders = mqHeaders + this.msgBytes = msgBytes + } + + def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean,kafkaHeaders:RecordHeaders,msgBytes:Array[Byte]) = { + this(topic, msg) + this.partition = partition + this.key = key + this.tag = tag + this.flag = flag + this.waitStoreMsgOK = waitStoreMsgOK + this.kafkaHeaders = kafkaHeaders + this.msgBytes = msgBytes + } def this(topic: String, msg: String, partition: JInt, key: String, tag: String, flag: Int, waitStoreMsgOK: Boolean) = { this(topic, msg) @@ -79,12 +107,44 @@ class MQRecord(var topic: String, var msg: String) { new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET), waitStoreMsgOK) } + /** + * 转为RocketMQ 原生bbyte消息体 + */ + def toRocketMQBytes: Message = { + new Message(topic, if (isEmpty(tag)) "*" else tag, key, flag, msgBytes, waitStoreMsgOK) + } + + def toRocketMQByFlag(sendBytes: Boolean): Message = { + if(sendBytes){ + this.toRocketMQBytes + } else{ + this.toRocketMQ + } + } + + /** * 转为Kafka消息体 */ def toKafka: ProducerRecord[String, String] = { - new ProducerRecord[String, String](topic, partition, key, msg) + new ProducerRecord[String, String](topic, partition, key, msg, kafkaHeaders) + } + + /** + * 转为Kafka byte消息体 + */ + def toKafkaBytes: ProducerRecord[String, Array[Byte]] = { + new ProducerRecord[String, Array[Byte]](topic, partition, key, msgBytes, kafkaHeaders) + } + + def toKafkaByFlag(sendBytes: Boolean): ProducerRecord[_, _] = { + if (sendBytes) { + this.toKafkaBytes + } else { + this.toKafka + } } + } object MQRecord { @@ -94,6 +154,12 @@ object MQRecord { def apply(msg: String, partition: JInt, key: String, tags: JString, flag: Int): MQRecord = new MQRecord(null, msg, partition, key, tags, flag, true) + def apply(key: String,tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,headers,msgBytes) + + def apply(key: String,tags: JString, mqHeaders: Map[String, String],msgBytes:Array[Byte]): MQRecord = new MQRecord(null, null, null, key, tags, 0, true,mqHeaders,msgBytes) + + def apply(msg: String, partition: JInt, key: String, tags: JString, headers: RecordHeaders,msgBytes:Array[Byte]): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true,headers,msgBytes) + def apply(msg: String, partition: JInt, key: String, tags: JString): MQRecord = new MQRecord(null, msg, partition, key, tags, 0, true) def apply(msg: String, partition: JInt, key: String): MQRecord = new MQRecord(null, msg, partition, key, null, 0, true) diff --git a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala index 8f8f6fd57a2ae7eee9fc32dc7234b9567bdf7cbd..2f02f3fb9b431e41d7fb411d2afa20d45a63818a 100644 --- a/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala +++ b/fire-common/src/main/scala/com/zto/fire/common/util/MQProducer.scala @@ -24,8 +24,8 @@ import com.zto.fire.common.enu.{Datasource, Operation} import com.zto.fire.common.lineage.parser.connector.KafkaConnectorParser import com.zto.fire.common.util.MQType.MQType import com.zto.fire.predef._ -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, RecordMetadata} -import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} import org.apache.rocketmq.client.producer.{DefaultMQProducer, SendCallback, SendResult} import java.util.Properties @@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @since 2.3.1 */ class MQProducer(url: String, mqType: MQType = MQType.kafka, - otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) extends Logging { + otherConf: Map[String, String] = Map.empty, throwable: Boolean = true,sendBytes:Boolean = false) extends Logging { private lazy val maxRetries = FireFrameworkConf.exceptionTraceSendMQMaxRetries private lazy val sendTimeout = FireFrameworkConf.exceptionSendTimeout private var sendErrorCount = 0 @@ -71,18 +71,23 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, FireKafkaConf.kafkaBrokers(this.url)) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + if(sendBytes){ + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) + } else{ + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + } props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, this.sendTimeout.toString) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") props.put(ProducerConfig.ACKS_CONFIG, "all") props.put(ProducerConfig.RETRIES_CONFIG, "3") this.otherConf.foreach(prop => props.put(prop._1, prop._2)) - val producer = new KafkaProducer[String, String](props) + val producer = new KafkaProducer[Any, Any](props) this.useKafka = true producer } + // rocketmq producer private lazy val rocketmqProducer = { val producer = new DefaultMQProducer("fire") @@ -116,17 +121,24 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, * 消息体 */ def sendKafkaRecord(record: MQRecord): Unit = { - requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查") + if(sendBytes){ + requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查") + } else{ + requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查") + } if (this.sendErrorCount >= this.maxRetries) { this.kafkaProducer.close() logError(s"异常信息发送MQ重试${this.sendErrorCount}次仍失败,将退出异常信息发送!") + if (throwable) throw new RuntimeException("发送kafka消息失败,请检查是否存在网络问题或集群维护") return } - val kafkaRecord = record.toKafka + //val kafkaRecord = record.toKafka + val kafkaRecord = record.toKafkaByFlag(sendBytes) this.addLineage(record.topic) - kafkaProducer.send(kafkaRecord, new Callback() { + + kafkaProducer.send(kafkaRecord.asInstanceOf[ProducerRecord[Any, Any]], new Callback() { override def onCompletion(recordMetadata: RecordMetadata, exception: Exception): Unit = { if (exception != null) { sendErrorCount += 1 @@ -137,6 +149,7 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, }) } + /** * 发送消息到kafka * @@ -158,14 +171,21 @@ class MQProducer(url: String, mqType: MQType = MQType.kafka, * 发送超时时间 */ def sendRocketMQRecord(record: MQRecord, timeout: Long = 10000): Unit = { - requireNonNull(record, record.topic, record.msg)("消息体参数不合法,请检查") + if (sendBytes) { + requireNonNull(record, record.topic, record.msgBytes)("【发送byte】消息体参数不合法,请检查") + } else { + requireNonNull(record, record.topic, record.msg)("【发送msg】消息体参数不合法,请检查") + } if (this.sendErrorCount >= this.maxRetries) { this.rocketmqProducer.shutdown() + logError(s"异常信息发送rocket重试${this.sendErrorCount}次仍失败,将退出异常信息发送!") + if (throwable) throw new RuntimeException("发送rocket消息失败,请检查是否存在网络问题或集群维护") return } - val rocketRecord = record.toRocketMQ + val rocketRecord = record.toRocketMQByFlag(sendBytes) + record.mqHeaders.foreach(header => rocketRecord.putUserProperty(header._1, header._2)) this.addLineage(record.topic) this.rocketmqProducer.send(rocketRecord, new SendCallback { override def onSuccess(sendResult: SendResult): Unit = { @@ -252,7 +272,8 @@ object MQProducer { } def apply(url: String, mqType: MQType = MQType.kafka, - otherConf: Map[String, String] = Map.empty, throwable: Boolean = true) = new MQProducer(url, mqType, otherConf, throwable) + otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false) = new MQProducer(url, mqType, otherConf, throwable,sendByte) + /** * 发送消息到指定的mq topic @@ -276,8 +297,8 @@ object MQProducer { * 优化参数 */ def sendRecord(url: String, record: MQRecord, - mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true): Unit = { - val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable)) + mqType: MQType = MQType.kafka, otherConf: Map[String, String] = Map.empty, throwable: Boolean = true, sendByte: Boolean = false): Unit = { + val producer = this.producerMap.mergeGet(url + ":" + record.topic)(new MQProducer(url, mqType, otherConf, throwable,sendByte)) producer.send(record) } diff --git a/fire-connectors/base-connectors/fire-hbase/pom.xml b/fire-connectors/base-connectors/fire-hbase/pom.xml index 58734d826bd3711a1529534a33764369da9cd4d5..27ca451fa063d73860b030ab2e787434c9168643 100644 --- a/fire-connectors/base-connectors/fire-hbase/pom.xml +++ b/fire-connectors/base-connectors/fire-hbase/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-connectors-common - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/base-connectors/fire-jdbc/pom.xml b/fire-connectors/base-connectors/fire-jdbc/pom.xml index a834e58ae5a4ec75e6cbfae2a80162c476e559ae..07677c899b093460cc0a1e1489fa24e78a1f1f1a 100644 --- a/fire-connectors/base-connectors/fire-jdbc/pom.xml +++ b/fire-connectors/base-connectors/fire-jdbc/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-connectors-common - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/base-connectors/fire-mq/pom.xml b/fire-connectors/base-connectors/fire-mq/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..aa7384732ef39f4b3bdbf1747795b4db0f47e9a5 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/pom.xml @@ -0,0 +1,70 @@ + + + + + 4.0.0 + fire-connector-mq_${scala.binary.version} + jar + Fire : Connectors : Common : mq + + + com.zto.fire + fire-connectors-common + 2.7.0-SNAPSHOT + ../pom.xml + + + + + org.apache.flink + flink-connector-kafka${contains.scala.binary.version} + ${flink.kafka.connector.version} + provided + + + com.zto.fire + fire-connector-flink-rocketmq_${flink.reference} + ${fire.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 8 + 8 + + + + + + src/main/resources + true + + + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala new file mode 100644 index 0000000000000000000000000000000000000000..1b9a9d56d1576432ea7fbcefc33c3ca6f64167e3 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/conf/ZtoSerializeConf.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.zto.fire.flink.conf + + +/** + * zto序列化常量 + * + * @author zhanggougou + * @since 1.1.0 + * @create 2025-12-02 14:55 + */ +private[fire] object ZtoSerializeConf { + + lazy val SERIALIZER_CLASS_NAME = "com.zto.bigdata.integeration.util.ZtoSerializeUtil" + + lazy val SERIALIZER_CLASS_INIT_METHOD_NAME = "initSerializerManager" + + lazy val SERIALIZER_CLASS_DES_METHOD_NAME = "deserializeValue" + + lazy val SERIALIZER_HEAD_KEY = "serializer_type" + + +} + + +/** + * 序列化类型枚举 + */ +object SerializeType extends Enumeration { + type SerializeType = Value + + val FURY = Value("FURY") + val JSON = Value("JSON") + // 自动识别 + val JAVA = Value("JAVA") +} diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java new file mode 100644 index 0000000000000000000000000000000000000000..adf7dbf6a17565c635ddfe853b8bb7fa220fbfa6 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/BaseDeserializeEntity.java @@ -0,0 +1,10 @@ +package com.zto.fire.mq.deserializer; + +import java.io.Serializable; + +/** + * 基础反序列化实体类 + */ +public class BaseDeserializeEntity implements Serializable { + +} diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala new file mode 100644 index 0000000000000000000000000000000000000000..998717f010061a2ac1844f8df48725d09f1cdce2 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoKafkaMessageDeserializationSchema.scala @@ -0,0 +1,46 @@ +package com.zto.fire.mq.deserializer + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire.common.util.ReflectionUtils +import com.zto.fire.flink.conf.ZtoSerializeConf +import com.zto.fire.mq.utils.DeserializeUtil +import org.apache.flink.api.common.serialization.DeserializationSchema +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema +import org.apache.kafka.clients.consumer.ConsumerRecord + +import java.nio.charset.StandardCharsets +import scala.reflect.ClassTag +import com.zto.fire.predef._ + +/** + * 自定义反序列化器,支持反序列化kafka消息中的key与value + */ +class ZtoKafkaMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends KafkaDeserializationSchema[T] { + + override def isEndOfStream(t: T): Boolean = false + + override def open(context: DeserializationSchema.InitializationContext): Unit = { + DeserializeUtil.initZtoSerializer(topic) + } + + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): T = { + try{ + val headerVal = record.headers().lastHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY); + if (headerVal == null) { + throw new RuntimeException("Kafka 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误") + } + val headType = new String(headerVal.value(), StandardCharsets.UTF_8) + DeserializeUtil.deserializeAndFill(getGeneric[T](),headType,record.value()) + } catch { + case e: Throwable => { + throw new RuntimeException("Kafka 反序列化失败", e) + } + } + + } + + override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]()) +} + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala new file mode 100644 index 0000000000000000000000000000000000000000..6c1fab45fa62b72b222de219129f1e86826f5c0b --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/deserializer/ZtoRocketMessageDeserializationSchema.scala @@ -0,0 +1,46 @@ +package com.zto.fire.mq.deserializer + +import com.zto.fire.flink.conf.ZtoSerializeConf +import com.zto.fire.mq.utils.DeserializeUtil +import com.zto.fire.predef._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.rocketmq.common.message.MessageExt +import org.apache.rocketmq.flink.common.serialization.TagKeyValueDeserializationSchema + +import scala.reflect.ClassTag + +/** + * 自定义反序列化器,支持反序列化kafka消息中的key与value + */ +class ZtoRocketMessageDeserializationSchema[T <: BaseDeserializeEntity:ClassTag](topic: String) extends TagKeyValueDeserializationSchema[T] { + + @volatile private var initialized = false + + private def initOnce(): Unit = synchronized { + if (!initialized) { + DeserializeUtil.initZtoSerializer(topic) + initialized = true + } + } + + override def deserializeTagKeyAndValue(msg: MessageExt): T = { + try { + this.initOnce() + val headerVal = msg.getProperty(ZtoSerializeConf.SERIALIZER_HEAD_KEY) + if (headerVal == null) { + throw new RuntimeException("rocket 反序列化失败,未获取到消息头或初始化异常,请检查消息内容是否有误") + } + DeserializeUtil.deserializeAndFill(getGeneric[T](),headerVal,msg.getBody) + } catch { + case e: Throwable => { + throw new RuntimeException("rocket 反序列化失败", e) + } + } + } + + override def getProducedType: TypeInformation[T] = TypeInformation.of(getGeneric[T]()) + +} + + + diff --git a/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..f6575779679035d0e9dd12d3b6751a32162b8221 --- /dev/null +++ b/fire-connectors/base-connectors/fire-mq/src/main/scala/com/zto/fire/mq/utils/DeserializeUtil.java @@ -0,0 +1,52 @@ +package com.zto.fire.mq.utils; + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil; +import com.zto.fire.common.util.ReflectionUtils; +import com.zto.fire.flink.conf.ZtoSerializeConf$; +import com.zto.zdata.schema.format.InputData; +import javafx.print.Collation; + +import java.lang.reflect.Field; +import java.util.Collection; + +/** + * 序列化工具类 + */ +public class DeserializeUtil { + /** + * 初始化序列化器(根据 topic) + * @param topic 主题名称 + */ + public static void initZtoSerializer(String topic) throws Throwable { + Class deserializationClass = ReflectionUtils.forName(ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_NAME()); + if (deserializationClass != null) { + ReflectionUtils.getMethodByName(deserializationClass, ZtoSerializeConf$.MODULE$.SERIALIZER_CLASS_INIT_METHOD_NAME()).invoke(null, topic, null); + } + } + + /** + * 统一反序列化 + 反射赋值 + * + * @param 目标对象类型 + * @param clazz 目标 Class + * @param headType 序列化类型(header 中的值) + * @param value 原始 byte[] + * @return T + */ + public static T deserializeAndFill(Class clazz, String headType, byte[] value) { + try { + InputData inputData = ZtoSerializeUtil.serializerManager.deserialize(headType, value); + T target = clazz.getDeclaredConstructor().newInstance(); + // 3. 反射字段赋值(包含 private) + Collection fields = ReflectionUtils.getAllFields(clazz).values(); + for(Field field : fields){ + field.setAccessible(true); + Object fieldValue = inputData.getObject(field.getName()); + field.set(target, fieldValue); + } + return target; + } catch (Exception e) { + throw new RuntimeException("反序列化失败,class=" + clazz.getName(), e); + } + } +} diff --git a/fire-connectors/base-connectors/pom.xml b/fire-connectors/base-connectors/pom.xml index a7744a251380eb8f532da02975f2e0da6bdc3720..650588e8550936e9e6005085d3101d3a0fdacefd 100644 --- a/fire-connectors/base-connectors/pom.xml +++ b/fire-connectors/base-connectors/pom.xml @@ -26,13 +26,14 @@ com.zto.fire fire-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml fire-jdbc fire-hbase + fire-mq diff --git a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml index 3095eb3c7753d885fc25332334b7354cced5e7a9..c80125641702ca7f65ed043c2c39cd16d1e365e5 100644 --- a/fire-connectors/flink-connectors/flink-clickhouse/pom.xml +++ b/fire-connectors/flink-connectors/flink-clickhouse/pom.xml @@ -26,7 +26,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-connectors/flink-connectors/flink-format/pom.xml b/fire-connectors/flink-connectors/flink-format/pom.xml index e54b12280e0c0d35fdf089bf223cd290a2f6f797..0b9d64a351ac4ccc5e99b878d57d78843d5db59d 100644 --- a/fire-connectors/flink-connectors/flink-format/pom.xml +++ b/fire-connectors/flink-connectors/flink-format/pom.xml @@ -8,7 +8,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/flink-paimon/pom.xml b/fire-connectors/flink-connectors/flink-paimon/pom.xml index 7e91a1c7f505e829674807f997c9c56bdf850f17..7db9cf9dd63a9352a2b5599279214247a7d085bc 100644 --- a/fire-connectors/flink-connectors/flink-paimon/pom.xml +++ b/fire-connectors/flink-connectors/flink-paimon/pom.xml @@ -10,7 +10,7 @@ fire-flink-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml index 2026444e53edacb310e9bac9044a70cc94860956..659ea15a8555ec01f761621f3d68331eef87709d 100644 --- a/fire-connectors/flink-connectors/flink-rocketmq/pom.xml +++ b/fire-connectors/flink-connectors/flink-rocketmq/pom.xml @@ -25,7 +25,7 @@ com.zto.fire fire-flink-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/flink-connectors/pom.xml b/fire-connectors/flink-connectors/pom.xml index b41ac446eca244f81bd0f7cc2068b80e979be167..a5985c68544eb48d2efc29b3e6d2d8c0d22a3a1a 100644 --- a/fire-connectors/flink-connectors/pom.xml +++ b/fire-connectors/flink-connectors/pom.xml @@ -32,7 +32,7 @@ fire-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/pom.xml b/fire-connectors/pom.xml index 3a736f159a9ae1738745bdaef6eba46420187191..ecd6abcfb96a974ed2f1700a45f88003175db130 100644 --- a/fire-connectors/pom.xml +++ b/fire-connectors/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/pom.xml b/fire-connectors/spark-connectors/pom.xml index aee51d4447d261f88056201b154ee8e416685c2e..cb86dbe5eeb96026e0882f2a64760e85baf2fe53 100644 --- a/fire-connectors/spark-connectors/pom.xml +++ b/fire-connectors/spark-connectors/pom.xml @@ -32,7 +32,7 @@ fire-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-hbase/pom.xml b/fire-connectors/spark-connectors/spark-hbase/pom.xml index e45d7a3a341dde88d5d14506e1e15b7bbb515b15..fca1a7ba241bfa2cee5c7d674c117d67fdb8ad64 100644 --- a/fire-connectors/spark-connectors/spark-hbase/pom.xml +++ b/fire-connectors/spark-connectors/spark-hbase/pom.xml @@ -26,7 +26,7 @@ fire-spark-connectors com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-connectors/spark-connectors/spark-hudi/pom.xml b/fire-connectors/spark-connectors/spark-hudi/pom.xml index 68a33ec82e5dc58aab9c1844b4c811ede871fdba..20e2174fbf361d844768b62d195a7463b09d4899 100644 --- a/fire-connectors/spark-connectors/spark-hudi/pom.xml +++ b/fire-connectors/spark-connectors/spark-hudi/pom.xml @@ -27,7 +27,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-paimon/pom.xml b/fire-connectors/spark-connectors/spark-paimon/pom.xml index 6859bccfdf3081477b382e7d691b078c205a5547..e66e448ac843d5485785ee980f82f5f96c1bc153 100644 --- a/fire-connectors/spark-connectors/spark-paimon/pom.xml +++ b/fire-connectors/spark-connectors/spark-paimon/pom.xml @@ -8,7 +8,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml index ebca0cf1023665f44d2df39da5de2cd75ae063e4..545e9b1dea546fff80d921112c410cf18d6c5849 100644 --- a/fire-connectors/spark-connectors/spark-rocketmq/pom.xml +++ b/fire-connectors/spark-connectors/spark-rocketmq/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-spark-connectors - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-core/pom.xml b/fire-core/pom.xml index b63e696c74c822f7e946f47711bb8bd670a25c6a..a38fa45e70471dc3537dd74e10dd9dab4c7e0b79 100644 --- a/fire-core/pom.xml +++ b/fire-core/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-engines/fire-flink/pom.xml b/fire-engines/fire-flink/pom.xml index bab3900b1c950c47af9aa27bdd4f8b00e7233050..20aca242b40c97bbeedb9a2b8d7d2c5b1cb2f53e 100644 --- a/fire-engines/fire-flink/pom.xml +++ b/fire-engines/fire-flink/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-engines - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml @@ -43,6 +43,11 @@ ${fire.version} ${maven.scope} + + com.zto.fire + fire-connector-mq_${scala.binary.version} + ${fire.version} + com.sparkjava diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala index cefd4bbfe2e22afa80338b43832f3ac5de55ec96..6e4938e51c6e7790313ad76038119d7e6df5dde7 100644 --- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala +++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/KafkaSink.scala @@ -36,6 +36,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec url: String, topic: String, batch: Int = 100, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { private lazy val (finalBrokers, finalTopic, finalConf) = KafkaUtils.getConfByKeyNum(params, url, topic, keyNum) @@ -47,7 +48,7 @@ abstract class KafkaSink[IN, T <: MQRecord : ClassTag](params: Map[String, Objec override def sink(dataList: List[T]): Unit = { dataList.foreach(record => { if (isEmpty(record.topic)) record.topic = finalTopic - MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf) + MQProducer.sendRecord(finalBrokers, record, MQType.kafka, finalConf,sendByte=sendByte) }) } } diff --git a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala index c13705e79965aa486ca53545d43ceef05e421208..fde43cecc9c10d00f96b6b8254d71da7a198ef3f 100644 --- a/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala +++ b/fire-engines/fire-flink/src/main/java/com/zto/fire/flink/sink/RocketMQSink.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Object], url: String, topic: String, tag: String = "*", batch: Int = 100, - flushInterval: Long = 1000, keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { + flushInterval: Long = 1000, sendByte: Boolean = false,keyNum: Int = KeyNum._1) extends BaseSink[IN, T](batch, flushInterval) { private lazy val (finalBrokers, finalTopic, finalTag, finalConf) = RocketMQUtils.getConfByKeyNum(url, topic, tag, params, keyNum) @@ -46,7 +46,7 @@ abstract class RocketMQSink[IN, T <: MQRecord : ClassTag](params: Map[String, Ob dataList.foreach(record => { if (isEmpty(record.topic)) record.topic = finalTopic if (isEmpty(record.tag) && noEmpty(finalTag)) record.tag = finalTag - MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf) + MQProducer.sendRecord(finalBrokers, record, MQType.rocketmq, finalConf,sendByte=sendByte) }) } } diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala index 1a8f8ae18386a90037dc4ad888379823639d7e5d..4b9e63da601948259969cf1824df1dea4bf8df19 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/DataStreamExt.scala @@ -303,8 +303,9 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st tag: String = "*", mqType: MQType = MQType.kafka, batch: Int = 100, flushInterval: Long = 1000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkMQFun[E](params, url, topic, tag, mqType, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E]) } /** @@ -327,11 +328,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st tag: String = "*", mqType: MQType = MQType.kafka, batch: Int = 100, flushInterval: Long = 1000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1)(mapFunction: T => E): DataStreamSink[_] = { if (mqType == MQType.rocketmq) { - this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(mapFunction) + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte, keyNum)(mapFunction) } else { - this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(mapFunction) + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval,sendByte, keyNum)(mapFunction) } } @@ -344,15 +346,41 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st * 消息队列的url * @param topic * 发送消息到指定的主题 + * * @param sendByte + * 是否发送byte值的消息,默认false发送string * @param keyNum * 指定配置的keyNum,可从配置或注解中获取对应配置信息 */ def sinkKafka[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte,keyNum)(_.asInstanceOf[E]) + } + + /** + * 将数据实时sink到指定的kafka topic + * + * @param params + * 额外的producer参数 + * @param url + * 消息队列的url + * @param topic + * 发送消息到指定的主题 + * * @param sendByte + * 是否发送byte值的消息,默认false发送string + * @param keyNum + * 指定配置的keyNum,可从配置或注解中获取对应配置信息 + */ + def sinkKafkaByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null, + url: String = null, topic: String = null, + batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = true, + keyNum: Int = KeyNum._1): DataStreamSink[_] = { + + this.sinkKafkaFun[E](params, url, topic, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E]) } /** @@ -370,11 +398,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st def sinkKafkaFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = { val finalBatch = if (FireKafkaConf.kafkaSinkBatch(keyNum) > 0) FireKafkaConf.kafkaSinkBatch(keyNum) else batch val finalInterval = if (FireKafkaConf.kafkaFlushInterval(keyNum) > 0) FireKafkaConf.kafkaFlushInterval(keyNum) else flushInterval - this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, keyNum) { + this.addSinkWrap(new KafkaSink[T, E](params, url, topic, finalBatch, finalInterval, sendByte,keyNum) { override def map(value: T): E = fun(value) }) } @@ -394,8 +423,29 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st def sinkRocketMQ[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, tag: String = "*", batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = false, + keyNum: Int = KeyNum._1): DataStreamSink[_] = { + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval,sendByte,keyNum)(_.asInstanceOf[E]) + } + + /** + * 将数据实时sink到指定的rocketmq topic,发送byte类型的数据 + * + * @param params + * 额外的producer参数 + * @param url + * 消息队列的url + * @param topic + * 发送消息到指定的主题 + * @param keyNum + * 指定配置的keyNum,可从配置或注解中获取对应配置信息 + */ + def sinkRocketMQByte[E <: MQRecord : ClassTag](params: Map[String, Object] = null, + url: String = null, topic: String = null, tag: String = "*", + batch: Int = 1000, flushInterval: Long = 5000, + sendByte: Boolean = true, keyNum: Int = KeyNum._1): DataStreamSink[_] = { - this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, keyNum)(_.asInstanceOf[E]) + this.sinkRocketMQFun[E](params, url, topic, tag, batch, flushInterval, sendByte, keyNum)(_.asInstanceOf[E]) } /** @@ -412,12 +462,12 @@ class DataStreamExt[T](stream: DataStream[T]) extends DataStreamHelperImpl[T](st */ def sinkRocketMQFun[E <: MQRecord : ClassTag](params: Map[String, Object] = null, url: String = null, topic: String = null, tag: String = "*", - batch: Int = 1000, flushInterval: Long = 5000, + batch: Int = 1000, flushInterval: Long = 5000, sendByte: Boolean = false, keyNum: Int = KeyNum._1)(fun: T => E): DataStreamSink[_] = { val finalBatch = if (FireRocketMQConf.rocketSinkBatch(keyNum) > 0) FireRocketMQConf.rocketSinkBatch(keyNum) else batch val finalInterval = if (FireRocketMQConf.rocketSinkFlushInterval(keyNum) > 0) FireRocketMQConf.rocketSinkFlushInterval(keyNum) else flushInterval - this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval, keyNum) { + this.addSinkWrap(new RocketMQSink[T, E](params, url, topic, tag, finalBatch, finalInterval,sendByte, keyNum) { override def map(value: T): E = fun(value) }) } diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala index 19d1812f5ae509eab0dd772e6e2159d78a5ebff5..d304f01795bf3819675e4adeb4f6a012ac7e4777 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/ext/stream/StreamExecutionEnvExt.scala @@ -32,6 +32,7 @@ import com.zto.fire.flink.ext.provider.{HBaseConnectorProvider, JdbcFlinkProvide import com.zto.fire.flink.sql.FlinkSqlExtensionsParser import com.zto.fire.flink.util.{FlinkRocketMQUtils, FlinkSingletonFactory, FlinkUtils, TableUtils} import com.zto.fire.jdbc.JdbcConnectorBridge +import com.zto.fire.mq.deserializer.{BaseDeserializeEntity, ZtoKafkaMessageDeserializationSchema, ZtoRocketMessageDeserializationSchema} import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema} @@ -154,9 +155,7 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu runtimeContext: RuntimeContext = null, deserializer: Any = new SimpleStringSchema, keyNum: Int = KeyNum._1): DataStream[T] = { - val kafkaConsumer = this.createKafkaConsumer[T](kafkaParams, topics, deserializer, keyNum) - if (runtimeContext != null) kafkaConsumer.setRuntimeContext(runtimeContext) if (specificStartupOffsets != null) kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets) // 设置从指定时间戳位置开始消费kafka @@ -173,6 +172,24 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu this.addSourceWrap(kafkaConsumer) } + /** + * 创建DStream流 + * + * @param kafkaParams + * kafka相关的配置参数 + * @return + * DStream + */ + def createDirectStreamWithZtoDeserialize[T <: BaseDeserializeEntity:ClassTag](kafkaParams: Map[String, Object] = null, + topics: Set[String] = null, + specificStartupOffsets: Map[KafkaTopicPartition, java.lang.Long] = null, + runtimeContext: RuntimeContext = null, + keyNum: Int = KeyNum._1)(implicit typeInfo: TypeInformation[T]): DataStream[T] = { + val confTopics = FireKafkaConf.kafkaTopics(keyNum) + val ztoDeserialize = new ZtoKafkaMessageDeserializationSchema[T](confTopics); + this.createDirectStreamBySchema[T](kafkaParams, topics, specificStartupOffsets, runtimeContext, ztoDeserialize, keyNum = keyNum) + } + /** * 创建DStream流 * @@ -261,6 +278,28 @@ class StreamExecutionEnvExt(env: StreamExecutionEnvironment) extends StreamExecu this.addSourceWrap(new RocketMQSourceWithTag[(String, String, String)](new SimpleTagKeyValueDeserializationSchema, props)).name("RocketMQ Source") } + /** + * 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value + * + * @param rocketParam + * rocketMQ相关消费参数 + * @param groupId + * groupId + * @param topics + * topic列表 + * @return + * rocketMQ DStream + */ + def createRocketMqPullStreamWithEntity[T <: BaseDeserializeEntity:ClassTag:TypeInformation](rocketParam: Map[String, String] = null, + groupId: String = null, + topics: String = null, + tag: String = null, + keyNum: Int = KeyNum._1): DataStream[T] = { + val props = buildRocketMQProps(rocketParam, groupId, topics, tag, keyNum) + val confTopics = FireRocketMQConf.rocketTopics(keyNum) + this.addSourceWrap(new RocketMQSourceWithTag[T](new ZtoRocketMessageDeserializationSchema[T](confTopics), props)).name("RocketMQ Entity Source") + } + /** * 构建RocketMQ拉取消息的DStream流,获取消息中的tag、key以及value等相关元数据信息 * diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml index de5945a513d0ba9a2eabd01827086fcc191d1c49..5436ae0b820e06f9aa3d14de436eefd804b40a94 100644 --- a/fire-engines/fire-spark/pom.xml +++ b/fire-engines/fire-spark/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-engines - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-engines/pom.xml b/fire-engines/pom.xml index 8a81a48cfb86a0fbfb4041a1ddc9699c2055f0d9..c6923f586e5a12835d676d10bd0648ccd4a89d0c 100644 --- a/fire-engines/pom.xml +++ b/fire-engines/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-arthas/pom.xml b/fire-enhance/apache-arthas/pom.xml index b1a103c55fffa609bc84acf28778c005b4763481..b60f5ca39cbe7166e6854429b0c1c8100584b7dc 100644 --- a/fire-enhance/apache-arthas/pom.xml +++ b/fire-enhance/apache-arthas/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-arthas_${scala.binary.version} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Arthas com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-flink/pom.xml b/fire-enhance/apache-flink/pom.xml index 0b3a56cb9fe0c22ca6e76e7c1766c091a37ba268..229bed0825257a29b568ae01b2b940ea5717f280 100644 --- a/fire-enhance/apache-flink/pom.xml +++ b/fire-enhance/apache-flink/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-flink_${flink.reference} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Flink com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index e8abcc507969fe9f2150c09f1596e3c8a335d4fd..48d6b1174e8222f10620e413478ba9337adc6b80 100644 --- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment.application; +import com.zto.fire.common.exception.FireNoHaCheckPointException; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; @@ -146,6 +147,23 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { return dispatcherGateway.shutDownCluster( ApplicationStatus.SUCCEEDED); } + + // TODO: ------------ start:二次开发代码 --------------- // + final Optional noCkException = + ExceptionUtils.findThrowable( + t, FireNoHaCheckPointException.class); + LOG.info("try find no ck exception,{}", noCkException.isPresent()); + if (noCkException.isPresent()) { + LOG.error( + "Application failed due to FireNoHaCheckPointException, " + + "shutdown cluster with UNKNOWN status.", + noCkException.get()); + + return dispatcherGateway.shutDownCluster( + ApplicationStatus.UNKNOWN); + } + // TODO: ------------ end:二次开发代码 --------------- // + final Optional maybeException = ExceptionUtils.findThrowable( t, UnsuccessfulExecutionException.class); diff --git a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index f428c1175223ceb67502582ea35a420ae9f8ac57..3ae8b2e30c021054c3484e2c431f1267096d996f 100644 --- a/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/fire-enhance/apache-flink/src/main/java-flink-1.14/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import com.zto.fire.common.exception.FireNoHaCheckPointException; import com.zto.fire.common.util.FireEngineUtils; import com.zto.fire.common.util.TimeExpression; import org.apache.flink.annotation.VisibleForTesting; @@ -45,6 +46,8 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1612,11 +1615,24 @@ public class CheckpointCoordinator { */ public boolean restoreInitialCheckpointIfPresent(final Set tasks) throws Exception { + // TODO: ------------ start:二次开发代码 --------------- // + //如未采用yarn或出现其他异常,按最严格的方式来校验是否存在最新ck,防止出现故障 + int attemptId = 2; + try{ + attemptId = ConverterUtils.toContainerId(System.getenv(ApplicationConstants.Environment.CONTAINER_ID.key())).getApplicationAttemptId().getAttemptId(); + } catch (Throwable e){ + LOG.error("获取yarn重试id出现异常,异常默认am重试次数2"); + } + LOG.info("restoreInitialCheckpointIfPresent start,AttemptId is {}=========",attemptId); + // TODO: ------------ end:二次开发代码 --------------- // final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, - false, // initial checkpoints exist only on JobManager failover. ok if not + // TODO: ------------ start:二次开发代码 --------------- // + //false, // initial checkpoints exist only on JobManager failover. ok if not + attemptId != 1 ,//避免低概率zk被清理后 am重试后用错ck + // TODO: ------------ end:二次开发代码 --------------- // // present. false, true); // JobManager failover means JobGraphs match exactly. @@ -1665,10 +1681,14 @@ public class CheckpointCoordinator { CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); if (latest == null) { - LOG.info("No checkpoint found during restore."); + LOG.info("No checkpoint found during restore.judge need throw Exception,{}",errorIfNoCheckpoint); if (errorIfNoCheckpoint) { - throw new IllegalStateException("No completed checkpoint available"); + //throw new IllegalStateException("No completed checkpoint available"); + // TODO: ------------ start:二次开发代码 --------------- // + // 抛出固定异常用于ApplicationDispatcherBootstrap捕获,用以判断是否需要直接终止flink任务,避免无意义的重试 + throw new FireNoHaCheckPointException(); + // TODO: ------------ end:二次开发代码 --------------- // } LOG.debug("Resetting the master hooks."); diff --git a/fire-enhance/apache-spark/pom.xml b/fire-enhance/apache-spark/pom.xml index 72483511304c93e422868ecc857b3e46a46e6661..f2ee634fe19b5e7602ba088d9f7bcbc287259d77 100644 --- a/fire-enhance/apache-spark/pom.xml +++ b/fire-enhance/apache-spark/pom.xml @@ -19,14 +19,14 @@ 4.0.0 fire-enhance-spark_${spark.reference} - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT jar Fire : Enhance : Spark com.zto.fire fire-enhance - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-enhance/pom.xml b/fire-enhance/pom.xml index 2ad08a69b5800ffa417568a4d623f164c8cc2ccd..5edd1a810c5503423c83501b4c05796c756c9595 100644 --- a/fire-enhance/pom.xml +++ b/fire-enhance/pom.xml @@ -25,7 +25,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-examples/flink-examples/pom.xml b/fire-examples/flink-examples/pom.xml index 87fc6343f5ada4694372e96a3e52f4791f3e48a6..3236dbcbdc7382335b2437cd7bcc2805c780f66b 100644 --- a/fire-examples/flink-examples/pom.xml +++ b/fire-examples/flink-examples/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-examples - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..fece8fa929576ef2dd0c8faa109c5f797be19332 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaCommonTest.scala @@ -0,0 +1,43 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala._ + +import java.util.Date + +/** + * 测试kafka通用api + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire-test-kafka + |kafka.group.id=fire-test-kafka-consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaCommonTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkKafka() + .uname("sink-kafka-common") + + this.fire.createKafkaDirectStream() + .uname("source-kafka-common").map(x => { + JSONUtils.parseObject(x, classOf[OrderInfo]) + }).print().name("print-kafka-common") + + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..d3ff9e5cbc837ad16aec7bda3d963615e5ccfaa9 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSendErrorTest.scala @@ -0,0 +1,39 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala._ + +import java.util.Date + +/** + * 写错端口 模拟kafka发送失败 + * + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire-test-kafka + |kafka.group.id=fire-test-kafka-consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSendErrorTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是kafka common test", new Date(), 1.6 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkKafka() + .uname("sink-kafka-common") + + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..6f067674323f6bd7b5e9a639bf9b16642afb0393 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSinkTest.scala @@ -0,0 +1,54 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.FireKafkaConf +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} + +import java.util.Date + +/** + * 使用自定义反序列化器发送kafka + * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092 + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire_test_kafka_serialize + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSinkTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] { + + override def open(parameters: Configuration): Unit = { + ZtoSerializeUtil.initSerializerManager(FireKafkaConf.kafkaTopics(1) ,null) + } + + override def map(x: Int): MQRecord = { + val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance) + outputData.setObject("id", x) + outputData.setObject("name", x + "我") + outputData.setObject("createTime", new Date()) + outputData.setObject("money", 1.6 + x) + val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData) + val headers = new RecordHeaders() + headers.add(new RecordHeader(ZtoSerializeConf.SERIALIZER_HEAD_KEY, SerializeType.FURY.toString.getBytes())) + MQRecord(null, null, headers,bytes) + } + }).sinkKafkaByte().name("testSink") + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..1d8e7cfaebbf47415e899d1c2c1edd82b23361cc --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/KafkaSourceTest.scala @@ -0,0 +1,36 @@ + +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala.createTypeInformation + + +/** + * 使用自定义反序列化器消费kafka + * 测试:10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + * 生产:oggkafkanewb1.ztosys.com:9092,oggkafkanewb2.ztosys.com:9092,oggkafkanewb3.ztosys.com:9092 + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |kafka.brokers.name=10.7.114.156:9092,10.7.114.157:9092,10.7.114.158:9092 + |kafka.topics=fire_test_kafka_serialize + |kafka.group.id=fire_test_kafka_serialize_consumer + |kafka.starting.offsets=earliest + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object KafkaSourceTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createDirectStreamWithZtoDeserialize[OrderInfo]().map(order => { + println("开始执行") + println(order.toString) + println("结束执行") + }).print() + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..9d0f5e1391e25a05e010d883d8402b4bc101e971 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/OrderInfo.java @@ -0,0 +1,65 @@ +package com.zto.fire.examples.flink.serialize; + +import com.zto.fire.mq.deserializer.BaseDeserializeEntity; + +import java.io.Serializable; +import java.util.Date; + +public class OrderInfo extends BaseDeserializeEntity { + private Integer id; + private String name; + private Date createTime; + private Double money; + + public OrderInfo() { + } + + public OrderInfo(Integer id, String name, Date createTime, Double money) { + this.id = id; + this.name = name; + this.createTime = createTime; + this.money = money; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Double getMoney() { + return money; + } + + public void setMoney(Double money) { + this.money = money; + } + + @Override + public String toString() { + return "OrderInfo{" + + "id=" + id + + ", name='" + name + '\'' + + ", createTime=" + createTime + + ", money=" + money + + '}'; + } +} diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..9f2e33def41d2234bdef6daed4ce72e873aab7c3 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketCommonTest.scala @@ -0,0 +1,48 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.FireRocketMQConf +import com.zto.fire.common.util.JSONUtils +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration + +import java.util.Date + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket + |flink.rocket.group.id=fire_test_rocket_consumer + |flink.rocket.starting.offsets=earliest + |flink.rocket.consumer.tag=* + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketCommonTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(x => { + val orderInfo = new OrderInfo(x, x + "我是rocket common test", new Date(), 1.7 + x) + MQRecord(JSONUtils.toJSONString(orderInfo)) + }) + .sinkRocketMQ() + .uname("sink-rocket-common") + + this.fire.createRocketMqPullStream() + .uname("source-rocket-common").map(x => { + JSONUtils.parseObject(x, classOf[OrderInfo]) + }).print().name("print-rocket-common") + + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..a9254f003dd8ba3525988d1e3ea5218866d91830 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSinkTest.scala @@ -0,0 +1,50 @@ +package com.zto.fire.examples.flink.serialize + +import com.zto.bigdata.integeration.util.ZtoSerializeUtil +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.common.bean.MQRecord +import com.zto.fire.common.conf.{FireKafkaConf, FireRocketMQConf} +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import com.zto.fire.flink.conf.{SerializeType, ZtoSerializeConf} +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} + +import java.util.Date + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket_serialize + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketSinkTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRandomIntStream(1).map(new RichMapFunction[Int,MQRecord] { + + override def open(parameters: Configuration): Unit = { + ZtoSerializeUtil.initSerializerManager(FireRocketMQConf.rocketTopics(1) ,null) + } + + override def map(x: Int): MQRecord = { + val outputData = ZtoSerializeUtil.serializerManager.outputData(ZtoSerializeUtil.classInfoMetadata.newInstance) + outputData.setObject("id", x) + outputData.setObject("name", x + "我是rocket") + outputData.setObject("createTime", new Date()) + outputData.setObject("money", 1.6 + x) + val bytes = ZtoSerializeUtil.serializerManager.serialize(outputData) + val headMap = Map((ZtoSerializeConf.SERIALIZER_HEAD_KEY,SerializeType.FURY.toString)) + MQRecord(null, null, headMap,bytes) + } + }).sinkRocketMQByte().name("testSink") + } +} + + diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..1fcda3ceb716a3cf01648d0be09268176caaedb6 --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/serialize/RocketSourceTest.scala @@ -0,0 +1,34 @@ + +package com.zto.fire.examples.flink.serialize + +import com.zto.fire._ +import com.zto.fire.common.anno.Config +import com.zto.fire.flink.FlinkStreaming +import com.zto.fire.flink.anno.Streaming +import org.apache.flink.api.scala.createTypeInformation + + +/** + * @author zhanggougou 2025-12-08 10:09:19 + */ +@Config( + """ + |flink.rocket.brokers.name=testmq02ns1.test.ztosys.com:9876; testmq02ns2.test.ztosys.com:9876;testmq02ns3.test.ztosys.com:9876 + |flink.rocket.topics=fire_test_rocket_serialize + |flink.rocket.group.id=fire_test_rocket_serialize_consumer + |flink.rocket.starting.offsets=earliest + |flink.rocket.consumer.tag=* + |""") +@Streaming(interval = 300, unaligned = true, disableOperatorChaining = false, timeout = 180) +object RocketSourceTest extends FlinkStreaming { + + override def process(): Unit = { + this.fire.createRocketMqPullStreamWithEntity[OrderInfo]().map(order => { + println("开始执行") + println(order.toString) + println("结束执行") + }).print() + } +} + + diff --git a/fire-examples/pom.xml b/fire-examples/pom.xml index 71c4eb55b91aa1575ab5b775835f265b3a7808f8..41279d1acc68a1cc4448e461a2869eb01f36c3a8 100644 --- a/fire-examples/pom.xml +++ b/fire-examples/pom.xml @@ -31,7 +31,7 @@ com.zto.fire fire-parent - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml @@ -47,7 +47,7 @@ com.zto.bigdata cluster-env - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT diff --git a/fire-examples/spark-examples/pom.xml b/fire-examples/spark-examples/pom.xml index e5d52889efa52bb5b0da9891026055b9f7164bd0..1a0da01306a052a4a7d1496a822c9f3c2833a3b8 100644 --- a/fire-examples/spark-examples/pom.xml +++ b/fire-examples/spark-examples/pom.xml @@ -26,7 +26,7 @@ com.zto.fire fire-examples - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT ../pom.xml diff --git a/fire-shell/flink-shell/pom.xml b/fire-shell/flink-shell/pom.xml index e9526496c17557f4dbd708640cc5921bec152c45..755938032e183febecd286d3545e6195338d1527 100644 --- a/fire-shell/flink-shell/pom.xml +++ b/fire-shell/flink-shell/pom.xml @@ -7,7 +7,7 @@ fire-shell com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-shell/pom.xml b/fire-shell/pom.xml index 54014e7ae893d9ec209022cd791af526e2c3b0e7..97ee38bbc2210379df504a599b152333a0c88138 100644 --- a/fire-shell/pom.xml +++ b/fire-shell/pom.xml @@ -15,7 +15,7 @@ fire-parent com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/fire-shell/spark-shell/pom.xml b/fire-shell/spark-shell/pom.xml index 8950eed01c4b53da8a1c3e21649ce7dc1d271ba5..dbb83550be117260213a4d59f11c7990fae073c5 100644 --- a/fire-shell/spark-shell/pom.xml +++ b/fire-shell/spark-shell/pom.xml @@ -9,7 +9,7 @@ fire-shell com.zto.fire - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index ff0b7b0d449bc4804ff09117d54c76b424ff0020..09543034506c926aa77f7b936a3c07ff02c942da 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ com.zto.fire fire-parent pom - 2.6.3-SNAPSHOT + 2.7.0-SNAPSHOT Fire : Fire framework is a development framework for real-time computing task development . The framework provides a simple and easy to use API that can significantly reduce the development threshold of Spark and Flink and improve development efficiency. https://github.com/fire-framework/fire @@ -1047,7 +1047,7 @@ com.zto.bigdata cluster-env - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT