DataFrameReader — Loading Data From External Data Sources
DataFrameReader
is the public interface to describe how to load data from an external data source (e.g. files, tables, JDBC or Dataset[String]).
Method | Description | ||
---|---|---|---|
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
DataFrameReader
is available using SparkSession.read.
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = ... import org.apache.spark.sql.DataFrameReader val reader: DataFrameReader = spark.read |
DataFrameReader
supports many file formats natively and offers the interface to define custom formats.
Note
|
DataFrameReader assumes parquet data source file format by default that you can change using spark.sql.sources.default Spark property.
|
After you have described the loading pipeline (i.e. the “Extract” part of ETL in Spark SQL), you eventually “trigger” the loading using format-agnostic load or format-specific (e.g. json, csv, jdbc) operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = ... import org.apache.spark.sql.DataFrame // Using format-agnostic load operator val csvs: DataFrame = spark .read .format("csv") .option("header", true) .option("inferSchema", true) .load("*.csv") // Using format-specific load operator val jsons: DataFrame = spark .read .json("metrics/*.json") |
Note
|
All methods of DataFrameReader merely describe a process of loading a data and do not trigger a Spark job (until an action is called).
|
DataFrameReader
can read text files using textFile methods that return typed Datasets
.
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = ... import org.apache.spark.sql.Dataset val lines: Dataset[String] = spark .read .textFile("README.md") |
Note
|
Loading datasets using textFile methods allows for additional preprocessing before final processing of the string values as json or csv lines. |
(New in Spark 2.2) DataFrameReader
can load datasets from Dataset[String]
(with lines being complete “files”) using format-specific csv and json operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
val csvLine = "0,Warsaw,Poland" import org.apache.spark.sql.Dataset val cities: Dataset[String] = Seq(csvLine).toDS scala> cities.show +---------------+ | value| +---------------+ |0,Warsaw,Poland| +---------------+ // Define schema explicitly (as below) // or // option("header", true) + option("inferSchema", true) import org.apache.spark.sql.types.StructType val schema = new StructType() .add($"id".long.copy(nullable = false)) .add($"city".string) .add($"country".string) scala> schema.printTreeString root |-- id: long (nullable = false) |-- city: string (nullable = true) |-- country: string (nullable = true) import org.apache.spark.sql.DataFrame val citiesDF: DataFrame = spark .read .schema(schema) .csv(cities) scala> citiesDF.show +---+------+-------+ | id| city|country| +---+------+-------+ | 0|Warsaw| Poland| +---+------+-------+ |
Name | Description |
---|---|
|
Used when…FIXME |
|
Name of the input data source (aka format or provider) with the default format per spark.sql.sources.default configuration property (default: parquet).
Used exclusively when |
|
Optional used-specified schema (default: Set when Used when |
Specifying Format Of Input Data Source — format
method
1 2 3 4 5 |
format(source: String): DataFrameReader |
You use format
to configure DataFrameReader
to use appropriate source
format.
Supported data formats:
Note
|
Spark SQL allows for developing custom data source formats. |
Specifying Schema — schema
method
1 2 3 4 5 |
schema(schema: StructType): DataFrameReader |
schema
allows for specyfing the schema
of a data source (that the DataFrameReader
is about to read a dataset from).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.apache.spark.sql.types.StructType val schema = new StructType() .add($"id".long.copy(nullable = false)) .add($"city".string) .add($"country".string) scala> schema.printTreeString root |-- id: long (nullable = false) |-- city: string (nullable = true) |-- country: string (nullable = true) import org.apache.spark.sql.DataFrameReader val r: DataFrameReader = spark.read.schema(schema) |
Note
|
Some formats can infer schema from datasets (e.g. csv or json) using inferSchema option. |
Tip
|
Read up on Schema. |
Specifying Load Options — option
and options
Methods
1 2 3 4 5 6 7 8 |
option(key: String, value: String): DataFrameReader option(key: String, value: Boolean): DataFrameReader option(key: String, value: Long): DataFrameReader option(key: String, value: Double): DataFrameReader |
You can also use options
method to describe different options in a single Map
.
1 2 3 4 5 |
options(options: scala.collection.Map[String, String]): DataFrameReader |
Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators
DataFrameReader
supports the following file formats:
json
method
1 2 3 4 5 6 7 8 |
json(path: String): DataFrame json(paths: String*): DataFrame json(jsonDataset: Dataset[String]): DataFrame json(jsonRDD: RDD[String]): DataFrame |
New in 2.0.0: prefersDecimal
csv
method
1 2 3 4 5 6 7 |
csv(path: String): DataFrame csv(paths: String*): DataFrame csv(csvDataset: Dataset[String]): DataFrame |
parquet
method
1 2 3 4 5 6 |
parquet(path: String): DataFrame parquet(paths: String*): DataFrame |
The supported options:
-
compression (default:
snappy
)
New in 2.0.0: snappy
is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.
-
none
oruncompressed
-
snappy
– the default codec in Spark 2.0.0. -
gzip
– the default codec in Spark before 2.0.0 -
lzo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
val tokens = Seq("hello", "henry", "and", "harry") .zipWithIndex .map(_.swap) .toDF("id", "token") val parquetWriter = tokens.write parquetWriter.option("compression", "none").save("hello-none") // The exception is mostly for my learning purposes // so I know where and how to find the trace to the compressions // Sorry... scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported") java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none. at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43) at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116) at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59) at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230) ... 48 elided |
orc
method
1 2 3 4 5 6 |
orc(path: String): DataFrame orc(paths: String*): DataFrame |
Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.
Tip
|
Read ORC Files document to learn about the ORC file format. |
text
method
text
method loads a text file.
1 2 3 4 5 6 |
text(path: String): DataFrame text(paths: String*): DataFrame |
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
val lines: Dataset[String] = spark.read.text("README.md").as[String] scala> lines.show +--------------------+ | value| +--------------------+ | # Apache Spark| | | |Spark is a fast a...| |high-level APIs i...| |supports general ...| |rich set of highe...| |MLlib for machine...| |and Spark Streami...| | | |<http://spark.apa...| | | | | |## Online Documen...| | | |You can find the ...| |guide, on the [pr...| |and [project wiki...| |This README file ...| | | | ## Building Spark| +--------------------+ only showing top 20 rows |
Loading Table to DataFrame — table
Method
1 2 3 4 5 |
table(tableName: String): DataFrame |
table
loads the content of the tableName
table into an untyped DataFrame.
1 2 3 4 5 6 7 8 9 10 |
scala> spark.catalog.tableExists("t1") res1: Boolean = true // t1 exists in the catalog // let's load it val t1 = spark.read.table("t1") |
Note
|
table simply passes the call to SparkSession.table after making sure that a user-defined schema has not been specified.
|
Loading Data From External Table using JDBC Data Source — jdbc
Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
jdbc(url: String, table: String, properties: Properties): DataFrame jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame |
jdbc
loads data from an external table using the JDBC data source.
Internally, jdbc
creates a JDBCOptions from the input url
, table
and extraOptions
with connectionProperties
.
jdbc
then creates one JDBCPartition
per predicates
.
In the end, jdbc
requests the SparkSession to create a DataFrame for a JDBCRelation (with JDBCPartitions
and JDBCOptions
created earlier).
Note
|
|
Note
|
jdbc method uses java.util.Properties (and appears overly Java-centric). Use format(“jdbc”) instead.
|
Tip
|
Review the exercise Creating DataFrames from Tables using JDBC and PostgreSQL. |
Loading Datasets From Text Files — textFile
Method
1 2 3 4 5 6 |
textFile(path: String): Dataset[String] textFile(paths: String*): Dataset[String] |
textFile
loads one or many text files into a typed Dataset[String].
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = ... import org.apache.spark.sql.Dataset val lines: Dataset[String] = spark .read .textFile("README.md") |
Note
|
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String] .
|
Loading Dataset (Data Source API V1) — loadV1Source
Internal Method
1 2 3 4 5 |
loadV1Source(paths: String*): DataFrame |
loadV1Source
creates a DataSource and requests it to resolve the underlying relation (as a BaseRelation).
In the end, loadV1Source
requests SparkSession to create a DataFrame from the BaseRelation.
Note
|
loadV1Source is used when DataFrameReader is requested to load (and the data source is neither of DataSourceV2 type nor a DataSourceReader could not be created).
|
Loading Dataset from Data Source — load
Method
1 2 3 4 5 6 7 |
load(): DataFrame load(path: String): DataFrame load(paths: String*): DataFrame |
load
loads a dataset from a data source (with optional support for multiple paths
) as an untyped DataFrame.
Internally, load
lookupDataSource for the source. load
then branches off per its type (i.e. whether it is of DataSourceV2
marker type or not).
For a “Data Source V2” data source, load
…FIXME
Otherwise, if the source is not a “Data Source V2” data source, load
simply loadV1Source.
load
throws a AnalysisException
when the source format is hive
.
1 2 3 4 5 |
Hive data source can only be used with tables, you can not read files of Hive data source directly. |
assertNoSpecifiedSchema
Internal Method
1 2 3 4 5 |
assertNoSpecifiedSchema(operation: String): Unit |
assertNoSpecifiedSchema
throws a AnalysisException
if the userSpecifiedSchema is defined.
1 2 3 4 5 |
User specified schema not supported with `[operation]` |