From 05cbde5e8fc30e3bad438f45d0a472b68b794b93 Mon Sep 17 00:00:00 2001 From: read-to Date: Tue, 9 Jul 2024 16:03:06 +0800 Subject: [PATCH] =?UTF-8?q?[fire-1176]=E6=8F=90=E4=BE=9Bspark.read.format(?= =?UTF-8?q?jdbc).options().load=E7=9A=84api=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zto/fire/jdbc/conf/FireJdbcConf.scala | 1 + .../fire/spark/ext/core/SQLContextExt.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala b/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala index e5686246..c82ad64b 100644 --- a/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala +++ b/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala @@ -61,6 +61,7 @@ private[fire] object FireJdbcConf { // 通过JdbcConnector查询后将数据集放到多少个分区中,需根据实际的结果集做配置 lazy val jdbcQueryPartition = PropUtils.getInt(this.FIRE_JDBC_QUERY_REPARTITION, 10) + def jdbcPartition(keyNum: Int = KeyNum._1): String = PropUtils.getString(this.FIRE_JDBC_QUERY_REPARTITION, "1", keyNum = keyNum) // db.jdbc.url def url(keyNum: Int = KeyNum._1): String = PropUtils.getString(this.JDBC_URL, "", keyNum) // jdbc url与别名映射 diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala index 3aac1777..c4c133aa 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala @@ -381,6 +381,32 @@ class SQLContextExt(sqlContext: SQLContext) { sqlContext.read.jdbc(FireJdbcConf.jdbcUrl(keyNum), tableName, DBUtils.getJdbcProps(jdbcProps, keyNum)) } + /** + * + * @param querySql + * 查询sql + * @param jdbcProps + * 调用者指定的数据库连接信息,如果为空,则默认读取配置文件 + * @param keyNum + * 配置文件中数据源配置的数字后缀,用于应对多数据源的情况,如果仅一个数据源,可不填 + * 比如需要操作另一个数据库,那么配置文件中key需携带相应的数字后缀:spark.db.jdbc.url2,那么此处方法调用传参为3,以此类推 + * @return + * DataFrame + */ + def jdbcSqlLoad(querySql: String, jdbcProps: Properties = null, keyNum: Int = KeyNum._1): DataFrame = { + + sqlContext.read.format("jdbc").options( + Map( + "url" -> FireJdbcConf.url(keyNum), + "user" -> FireJdbcConf.user(keyNum), + "password" -> FireJdbcConf.password(keyNum), + "dbtable" -> s"($querySql) t", + "driver" -> FireJdbcConf.driverClass(keyNum), + "batchsize" -> FireJdbcConf.batchSize(keyNum).toString, + "numPartitions" -> FireJdbcConf.jdbcPartition(keyNum) + ) + ).load() + } /** * 指定load的条件,从关系型数据库中并行的load数据,并转为DataFrame * -- Gitee