diff --git a/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala b/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala index e56862469d89b99a26bb5b930358ec5a90209c12..c82ad64bf7fdcccd26c37fa94633766511d28dc4 100644 --- a/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala +++ b/fire-connectors/base-connectors/fire-jdbc/src/main/scala/com/zto/fire/jdbc/conf/FireJdbcConf.scala @@ -61,6 +61,7 @@ private[fire] object FireJdbcConf { // 通过JdbcConnector查询后将数据集放到多少个分区中,需根据实际的结果集做配置 lazy val jdbcQueryPartition = PropUtils.getInt(this.FIRE_JDBC_QUERY_REPARTITION, 10) + def jdbcPartition(keyNum: Int = KeyNum._1): String = PropUtils.getString(this.FIRE_JDBC_QUERY_REPARTITION, "1", keyNum = keyNum) // db.jdbc.url def url(keyNum: Int = KeyNum._1): String = PropUtils.getString(this.JDBC_URL, "", keyNum) // jdbc url与别名映射 diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala index 3aac1777a61bad5593732db35d066a9ad42496cd..c4c133aab72b3fbf21cb1fc63687d24ec8dfbb71 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/ext/core/SQLContextExt.scala @@ -381,6 +381,32 @@ class SQLContextExt(sqlContext: SQLContext) { sqlContext.read.jdbc(FireJdbcConf.jdbcUrl(keyNum), tableName, DBUtils.getJdbcProps(jdbcProps, keyNum)) } + /** + * + * @param querySql + * 查询sql + * @param jdbcProps + * 调用者指定的数据库连接信息,如果为空,则默认读取配置文件 + * @param keyNum + * 配置文件中数据源配置的数字后缀,用于应对多数据源的情况,如果仅一个数据源,可不填 + * 比如需要操作另一个数据库,那么配置文件中key需携带相应的数字后缀:spark.db.jdbc.url2,那么此处方法调用传参为3,以此类推 + * @return + * DataFrame + */ + def jdbcSqlLoad(querySql: String, jdbcProps: Properties = null, keyNum: Int = KeyNum._1): DataFrame = { + + sqlContext.read.format("jdbc").options( + Map( + "url" -> FireJdbcConf.url(keyNum), + "user" -> FireJdbcConf.user(keyNum), + "password" -> FireJdbcConf.password(keyNum), + "dbtable" -> s"($querySql) t", + "driver" -> FireJdbcConf.driverClass(keyNum), + "batchsize" -> FireJdbcConf.batchSize(keyNum).toString, + "numPartitions" -> FireJdbcConf.jdbcPartition(keyNum) + ) + ).load() + } /** * 指定load的条件,从关系型数据库中并行的load数据,并转为DataFrame *