数据源
核心数据源
Spark的6大核心数据源:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC连接
- 纯文本文件
由社区创建的数据源
- Cassandra
- HBase
- MongoDB
- AWS Redshift
- XML
- 其他
数据源API
Read API
读取数据的核心结构:DataFrameReader.format(...).option("key", "value").schema(...).load()
format()
:可选,默认情况下Spark将使用Parquet格式option()
:配置键值对来参数化读取数据的方式
Spark数据读取使用DataFrameReader
,通过SparkSession的read属性得到
1 | Spark.read |
1 | // 例子 |
读取模式
读取模式指定当Spark遇到错误格式的记录时应采取的操作
- 默认是
permissive
读取模式 | 说明 |
---|---|
permissive |
遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record 字符串列中 |
dropMalformed |
删除包含错误格式记录的行 |
failFast |
遇到错误格式的记录后,立即返回失败 |
Write API
写数据的核心结构:
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
format()
:可选,默认情况下Spark将使用Parquet格式option()
:配置写出数据的方式PartitionBy
、bucketBy
、sortBy
仅适用基于文件的数据源
Spark的写数据使用DataFrameWriter
,通过DataFrame的write属性来获取DataFrameWriter:
1 | dataFrame.write |
1 | // in Scala |
保存模式
保存模式指明如果Spark在指定目标路径发现有其他数据占用时应采取的操作。
- 默认是
errorIfExists
保存模式 | 说明 |
---|---|
append |
将输出文件追加到目标路径已存在的文件上或目录的文件列表 |
overwrite |
将完全覆盖目标路径中已存在的任何数据 |
errorIfExists |
如果目标路径已存在数据或文件,则抛出错误并返回写入操作失败 |
Ignore |
如果目标路径已存在数据或文件,则不执行任何操作 |
核心数据源
CSV
逗号分隔值(CSV,comma-separated values),一种常见的文本文件格式;每行表示一条记录,用逗号分隔记录中的每个字段
- 是最难处理的文件格式之一
CSV数据源选项:
读取/写入 | Key | 取值范围 | 默认值 | 说明 |
---|---|---|---|---|
Read | escapeQuotes | true, false | true | 声明Spark是否应该转义在行中找到的引号 |
maxColumns | 任意整数 | 20480 | 声明文件中的最大列数 | |
maxCharsPerColumn | 任意整数 | 1000000 | 声明列中的最大字符数 | |
maxMalformedLogPerPartition | 任意整数 | 10 | 设置Spark将为每个分区记录错误格式的行的最大数目,超出此数目的格式错误的记录将被忽略 | |
multiline | true, false | false | 此选项用于读取多行CSV文件,其中CSV文件中的每个逻辑行可能跨越文件本身中的多行 | |
Write | quoteAll | true, false | false | 指定是否将所有值括在引号中,而不是仅转义具有引号字符的值 |
Read & Write | Compression 或 codec | None, uncompressed, bzip2, deflate, gzip, lz4, snappy | none | 声明Spark应该使用什么压缩编码器来读取或写入文件 |
dateFormat | 任何符合Java的SimpleDataFormat式的字符串或字符 | yyyy-MM-dd | 日期类型的列的日期格式 | |
escape | 任意字符串字符 | \ | 用于转义的字符 | |
header | true, false | false | 声明文件中的第一行是否为列的名称 | |
ignoreLeadingWhiteSpace | true, false | false | 声明是否应跳过读取值中的前导空格 | |
ignoreTrailingWhiteSpace | true, false | false | 声明是否应跳过读取值中的尾部空格 | |
inferSchema | true, false | false | 指定在读取文件时Spark是否自动推断列类型 | |
nanValue | 任意字符串字符 | NaN | 声明在CSV文件中表示NaN或缺失字符的字符 | |
negativeInf | 任意字符串或字符 | -Inf | 声明什么字符表示负无穷大 | |
nullValue | 任意字符串字符 | "" | 声明在文件中表示null值的字符 | |
positiveInf | 任意字符串或字符 | Inf | 声明什么字符表示正无穷大 | |
sep | 任意单个字符串字符 | , | 用作每个字段和值的分隔符的单个字符 | |
timestampFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | MMdd HH:mm:ss.SSSZZ | 时间戳类型时间戳格式 | |
- 通常,Spark只会在作业执行而不是DataFrame定义时发生失败
1 | // in Scala |
1 | // in Scala |
制表符分隔值(TSV,Tab-separated values)
- 每一行储存一条记录
- 每条记录的各个字段间以制表符作为分隔
JSON
JSON(JavaScript Object Notation)
- Spark中的JSON文件指的是换行符分隔的JSON,每行必须包含一个单独的、独立的有效JSON对象
- 使用换行符分隔的JSON可以在文件末尾追加新记录
- JSON对象具有结构化信息
- 当
multiLine
为true
时,可以将整个JSON文件作为一个JSON对象读取
JSON数据源选项:
读取/写入 | Key | 取值范围 | 默认值 | 说明 |
---|---|---|---|---|
Read & Write | Compression 或 codec | None, uncompressed, bzip2, deflate, gzip, lz4, snappy | none | 声明Spark应该使用什么压缩编码器来读取或写入文件 |
dateFormat | 任何符合Java的SimpleDataFormat式的字符串或字符 | yyyy-MM-dd | 日期类型的列的日期格式 | |
timestampFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | MMdd HH:mm:ss.SSSZZ | 时间戳类型时间戳格式 | |
Read | allowComments | true, false | false | 忽略JSON记录中的Java/C++样式注释 |
allowBackslashEscAPIngAny | true, false | false | 是否允许反斜杠机制接收所有字符 | |
allowNumericLeadingZeros | true, false | false | 是否允许数字中存在前导零 | |
allowSingleQuotes | true, false | true | 除双引号外,是否允许使用单引号 | |
allowUnquotedFieldNames | true, false | false | 允许不带引号的JSON字段名 | |
columnNameOfCorruptRecord | 任意字符串或字符 | spark.sql.column & NameOfCorruptRecord | 允许重命名permissive模式下添加的新字段,会覆盖重写 | |
multiLine | true, false | false | 允许读取非换行符分隔的JSON文件 | |
primitiveAsString | true, false | false | 将所有原始值推断为字符串类型 | |
1 | // in Scala |
- 将DataFrame写入JSON文件:每个数据分片作为一个文件写出,整个DataFrame将输出到一个文件夹;文件中的每行仍代表一个JSON对象
Parquet文件
Parquet是一种开源的面向列的数据存储格式
-
提供了各种存储优化,尤其适合数据分析
-
提供列压缩,从而节省空间
-
支持按列读取而非读取整个文件
-
是Spark的默认文件格式
-
从Parquet文件读取比从JSON或CSV文件效率更高
-
支持复杂类型:列是一个数组、map映射、struct结构体,都可以正常读取和写入;而CSV文件无法存储数组列
-
Parquet在存储数据时执行本身的schema
-
一般在读取的时候使用默认的schema
1 | // in Scala |
- Parquet只有很少的可选项
Parquet数据源选项:
读取/写入 | Key | 取值范围 | 默认值 | 说明 |
---|---|---|---|---|
Read & Write | Compression 或 codec | None, uncompressed, bzip2, deflate, gzip, lz4, snappy | none | 声明Spark应该使用什么压缩编码器来读取或写入文件 |
Read | mergeSchema | true, false | 配置值spark.sql.par.quet.mergeSchema | 增量地添加列到同一表/文件夹中的Parquet文件里;此选项用于启用或禁用此功能 |
- 写Parquet文件和读取Parquet文件,都只需指定文件的位置即可
1 | // in Scala |
ORC文件
ORC文件是为Hadoop作业设计的自描述、类型感知的列存储文件格式
- 针对大型流式数据读取进行优化
- 读取ORC文件数据时没有可选项
ORC和Parquet有何区别?
- 在大多数情况下,二者非常相似
- 本质区别:
- Parquet针对Spark进行优化
- ORC针对Hive进行优化
1 | // in Scala |
JDBC/ODBC连接
即从数据库读写数据
JDBC数据源选项:
读取/写入 | Key | 取值范围 | 默认值 | 说明 |
---|---|---|---|---|
Read & Write | dbtable | 可以使用SQL查询的FROM子句中的任何有效内容 | 表示要读取的JDBC表 | |
driver | 用于连接到此URL的JDBC驱动器的类名 | |||
numPartitions | 在读取和写入数据表时,数据表可用于并行的最大分区数(决定了并发JDBC连接的最大数目) | |||
url | 表明要连接的JDBC URL,可以在URL中指定特定源的连接属性 如:`jdbc:postgresql://localhost/test?user=fred&password=secret` |
|||
Read | batchsize | 1000 | 表示JDBC批处理大小,用于指定每次写入多少条记录。 | |
createTableColumnTypes | 有效的Spark SQL数据类型 | 表示创建表时使用的数据库列数据类型,而不使用默认值。 应该使用与`CREATE TABLE`列语法相同的格式来指定数据类型信息,指定的类型应是有效的Spark SQL数据类型 |
||
createTableOptions | 用于在创建表时设置特定数据库的表和分区选项 | |||
isolationLevel | NONE, READ_COMMITED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE | READ_UNCOMMITTED | 表示数据库的事务隔离级别(适用于当前连接)。 可取值分别对应于JDBC的Connection对象定义的标准事务隔离级别。 |
|
truncate | true, false | false | 待补充 | |
Write | fetchsize | 表示JDBC每次读取多少条记录 | ||
partitionColumn lowerBound upperBound |
如果指定了其中一个选项,则必须设置其他所有选项;此外,还必须指定`numPartitions`。 这些属性描述了如何在从多个worker并行读取时对表格进行划分。 `partitionColumn`是要分区的列,必须是整数类型。 `lowerBound`和`upperBound`仅用于确定分区跨度,而不用于过滤表中的行(因此表中的所有行都将被划分并返回)。 |
从数据库读取文件:先指定格式(format)和选项,然后加载数据
1 | // in Scala |
1 | # in Python |
- SQLite与其他数据库不同,SQLite只是计算机上的一个文件
- 如果是其他数据库,需要测试连接:
1 | // in Scala |
- SQLite需要的配置很简单,而其他数据库需要配置更多的参数
1 | // in Scala |
1 | // in Scala |
1 | # in Python |
写入SQL数据库,只需指定URL并指定写入模式。
1 | // in Scala |
1 | # in Python |
文本文件
- 文本文件中的每一行将被解析为DataFrame中的一条记录,然后根据要求进行转换
- 文本文件能够充分利用原生类型(native type)的灵活性,因此很适合作为Dataset API的输入
- 读取文本文件时,只需指定类型为
textFile
即可 - 写文本文件时,需确保仅有一个字符串类型的列写出;否则,写操作将失败
- 如果在执行写操作时,同时执行某些数据分片操作,则可以写入更多的列(这些列将在要写入的文件夹中显示为目录,而不是在每个文件中存在多列)
1 | // in Scala |
其他
-
可以通过在写入之前空值数据分片来控制写入文件的并行度
-
可以通过控制数据分桶(bucketing)和数据划分(partitioning)来控制特定的数据布局方式
-
如果使用的是Hadoop分布式文件系统(HDFS),则如果该文件包含多个文件块,分割文件可进一步优化提高性能。同时需要进行压缩管理
-
并非所有的压缩格式都是可分割的
-
推荐采用gzip压缩格式的Parquet文件格式
-
多个执行器不能同时读取同一文件,但可同时读取不同的文件
- 当从包含多个文件的文件夹中读取时,每个文件都将被视为DataFrame的一个分片,并由执行器并行读取,多余的文件会进入读取队列等候
-
写数据涉及的文件数量取决于DataFrame的分区数
- 默认情况:每个数据分片都会有一定的数据写入
1 | csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv") |
- 使用
partitionBy
进行数据划分,可以在后续读取时跳过大量的数据,只读入与问题相关的数据- 基于日期来划分数据最常见
1 | // in Scala |
数据分桶:具有相同桶ID(哈希分桶的ID)的数据将放置到一个物理分区中;可以避免在后续读取数据时进行shuffle
1 | // in Scala |
- 数据分桶仅支持Spark管理的表