From 8eb72bb384db8cd4bf2c8c260f36c53481927227 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Thu, 4 Sep 2025 10:31:13 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BC=98=E5=8C=96spark=E8=A1=80=E7=BC=98?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SparkSqlParser.scala | 36 +++++++++++++++++-- .../fire/spark/sql/SparkSqlParserBase.scala | 4 +-- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala index 5573e9ff..b16c11ed 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} +import org.apache.spark.sql.execution.adaptive.QueryStageInput +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec /** * Spark SQL解析器,用于解析Spark SQL语句中的库、表、分区、操作类型等信息 @@ -159,7 +161,7 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { */ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { var sinkTable: Option[TableIdentifier] = None - + LineageManager.printLog(s"开始解析物理执行计划, $sparkPlan") sparkPlan.collect { //Hive表扫描信息 case plan if plan.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => @@ -167,9 +169,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { relationField.setAccessible(true) val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) - LineageManager.printLog(s"解析到select表名: $tableIdentifier") + LineageManager.printLog(s"hive scan解析到select表名: $tableIdentifier") this.addCatalog(tableIdentifier, Operation.SELECT) sinkTable = Some(tableIdentifier) + //cache scan + case p: InMemoryTableScanExec => + handleInMemoryTableScan(p).foreach(x => { + LineageManager.printLog(s"cache scan中解析到select表名: $x") + this.addCatalog(x, Operation.SELECT) + sinkTable = Some(x) + }) //表写入信息 case plan: DataWritingCommandExec => plan.cmd match { @@ -273,12 +282,33 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { case StreamingExplainCommand(queryExecution, extended) => case TruncateTableCommand(tableName, partitionSpec) => case UncacheTableCommand(tableIdent, ifExists) => - case _ => + case _ => LineageManager.printLog(s"解析物理执行计划异常,无法匹配该Statement") } } sinkTable } + /** + * 处理执行计划中InMemoryTableScanExec + * + * @param plan 物理执行计划 + * @return 表操作信息 + */ + def handleInMemoryTableScan(plan: SparkPlan): Seq[TableIdentifier] = { + plan match { + case p if p.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => + val relationField = p.getClass.getDeclaredField("relation") + relationField.setAccessible(true) + val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] + val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) + Seq(tableIdentifier) + case p: QueryStageInput => handleInMemoryTableScan(p.childStage) + case p: InMemoryTableScanExec => handleInMemoryTableScan(p.relation.child) + case p: SparkPlan => p.children.flatMap(handleInMemoryTableScan) + } + } + + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala index 3c9b3e3b..8177b570 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala @@ -136,12 +136,12 @@ private[fire] trait SparkSqlParserBase extends SqlParser { sinkTable = this.ddlParserWithPlan(sparkPlan) } catch { case e: Throwable => { - LineageManager.printLog(s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}") + LineageManager.printLog(s"可忽略异常:实时血缘解析物理执行计划SQL报错,sparkPlan: ${sparkPlan}") } } finally { tryWithLog { this.queryParser(queryExecution.optimizedPlan, sinkTable) - }(this.logger, catchLog = s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) + }(this.logger, catchLog = s"可忽略异常:实时血缘解析物理执行计划SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) } } -- Gitee From a0351c71ff77b18b635aee9240816604e8f231d4 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Tue, 16 Sep 2025 09:12:47 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8DflinkSQL=E8=AF=AF?= =?UTF-8?q?=E8=AF=86=E5=88=ABhive=E4=B8=BApaimon?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala index b08acc1e..e738c6df 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala @@ -288,7 +288,9 @@ private[fire] trait FlinkSqlParserBase extends SqlParser { } val paimonCatalog = this.tableEnv.paimonCatalog.get() - paimonCatalog.listTables(tableIdentifier.database).contains(tableIdentifier.table) + val checkTable = paimonCatalog.getTable(new ObjectPath(tableIdentifier.database, tableIdentifier.table)); + //目前只能通过该方法判断,hive表为org.apache.paimon.flink.FormatCatalogTable + checkTable != null && checkTable.getClass.getName.equals("org.apache.paimon.flink.DataCatalogTable") }(this.logger, catchLog = s"判断${tableIdentifier}是否为paimon表失败", hook = false) } } -- Gitee From 34c36bf698b09378a2c96a3c00629129ce0052fd Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Tue, 16 Sep 2025 09:12:47 +0800 Subject: [PATCH 3/3] =?UTF-8?q?[fire-1242]1.=E4=BF=AE=E5=A4=8DflinkSQL?= =?UTF-8?q?=E8=AF=AF=E8=AF=86=E5=88=ABhive=E4=B8=BApaimon=202.=E5=AE=8C?= =?UTF-8?q?=E5=96=84spark=E8=A1=80=E7=BC=98=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala index b08acc1e..e738c6df 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala @@ -288,7 +288,9 @@ private[fire] trait FlinkSqlParserBase extends SqlParser { } val paimonCatalog = this.tableEnv.paimonCatalog.get() - paimonCatalog.listTables(tableIdentifier.database).contains(tableIdentifier.table) + val checkTable = paimonCatalog.getTable(new ObjectPath(tableIdentifier.database, tableIdentifier.table)); + //目前只能通过该方法判断,hive表为org.apache.paimon.flink.FormatCatalogTable + checkTable != null && checkTable.getClass.getName.equals("org.apache.paimon.flink.DataCatalogTable") }(this.logger, catchLog = s"判断${tableIdentifier}是否为paimon表失败", hook = false) } } -- Gitee