# spark-file-source **Repository Path**: zhiling-chen/spark-file-source ## Basic Information - **Project Name**: spark-file-source - **Description**: spark 自定义数据源 通过使用spark,将指定路径下的所有文件转为Rdd[Row] schema属性包括文件名,路径,文件大小,后缀,文件内容(字节数组切分) - **Primary Language**: Scala - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-08-30 - **Last Updated**: 2023-01-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: spark自定义数据源, Scala ## README # 1.commonFile数据源 ## 1.1 内容概述 commonFile数据源文件按照指定的schema结构变成相应的Rdd形式,通过指定别名"commonFile"使用,支持流批读写。 Schema结构如下 | 文件名 | 路径 | 后缀名 | 大小 | 字节数组 | | :--------: | :--------: | :--------: | :------: | :--------------------------------------------------: | | StringType | StringType | StringType | LongType | StructFiled[ StructedType(BinaryType,BinaryType,……)] | - 你也可以通过配置文件,规定每行存储的字节数组的上限和字节数组嵌套的数量 - 配置默认每行存储上限128Mb(1024 * 1024 * 128字节),字节数组嵌套个数16 ## 1.2 spark 启动 在shell环境输入以下内容,进行启动。 ```shell spark-shell --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ``` ## 1.2 批处理 ### 1.2.1 读取文件 你可以通过指定文件夹路径的方式,读取文件,通过 ```scala val dataFrame = spark.read.format("commonFile").load("File:///tmp/commonFile/") dataFrame.show() ``` 因为嵌套存储的原因,字节数组显示会是乱码,但内容不会丢失。 也支持单个文件,全路径等 - 单个文件 ```scala spark.read.format("commonFile").load("tmp/commonFile/test.txt") ``` - 文件夹所有的文件 ```scala spark.read.format("commonFile").load("tmp/commonFile/*/") ``` - 通配符 ```scala spark.read.format("commonFile").load("tmp/commonFile/*.xxx") ``` - 对文件名进行模糊匹配 ```scala spark.read.format("commonFile").load("tmp/commonFile/t*.txt") ``` ### 1.2.2 过滤函数 你可以通过filter函数,过滤你不想读取的文件,目前支持通过文件大小过滤,通过。 ```scala spark.read.format("commonFile").load(localReadPath).filter("size>1000") ``` ### 1.2.3 写出文件 你可以通过指定文件夹路径的方式,写出文件,通过如下代码: ```scala dataFrame.write.format("commonFile").mode("append").save("tmp/commonFile/save/") ```