DataFrameWriter — Saving Data To External Data Sources
DataFrameWriter
is the interface to describe how data (as the result of executing a structured query) should be saved to an external data source.
Method | Description | ||
---|---|---|---|
|
|||
|
|||
|
|||
Inserts (the results of) a |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
DataFrameWriter
is available using Dataset.write operator.
1 2 3 4 5 6 7 8 9 10 11 |
scala> :type df org.apache.spark.sql.DataFrame val writer = df.write scala> :type writer org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] |
DataFrameWriter
supports many file formats and JDBC databases. It also allows for plugging in new formats.
DataFrameWriter
defaults to parquet data source format. You can change the default format using spark.sql.sources.default configuration property or format or the format-specific methods.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// see above for writer definition // Save dataset in Parquet format writer.save(path = "nums") // Save dataset in JSON format writer.format("json").save(path = "nums-json") // Alternatively, use format-specific method write.json(path = "nums-json") |
In the end, you trigger the actual saving of the content of a Dataset
(i.e. the result of executing a structured query) using save method.
1 2 3 4 5 |
writer.save |
DataFrameWriter
uses internal mutable attributes to build a properly-defined “write specification” for insertInto, save and saveAsTable methods.
Attribute | Setters |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
Note
|
DataFrameWriter is a type constructor in Scala that keeps an internal reference to the source DataFrame for the whole lifecycle (starting right from the moment it was created).
|
Note
|
Spark Structured Streaming’s DataStreamWriter is responsible for writing the content of streaming Datasets in a streaming fashion.
|
Executing Logical Command(s) — runCommand
Internal Method
1 2 3 4 5 |
runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit |
runCommand
uses the input SparkSession
to access the SessionState that is in turn requested to execute the logical command (that simply creates a QueryExecution).
runCommand
records the current time (start time) and uses the SQLExecution
helper object to execute the action (under a new execution id) that simply requests the QueryExecution
for the RDD[InternalRow] (and triggers execution of logical commands).
Tip
|
Use web UI’s SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.
|
runCommand
records the current time (end time).
In the end, runCommand
uses the input SparkSession
to access the ExecutionListenerManager and requests it to onSuccess (with the input name
, the QueryExecution
and the duration).
In case of any exceptions, runCommand
requests the ExecutionListenerManager
to onFailure (with the exception) and (re)throws it.
Note
|
runCommand is used when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source (and indirectly executing a logical command for writing to a data source V1), insert the rows of a structured streaming (a DataFrame) into a table and create a table (that is used exclusively for saveAsTable).
|
Saving Rows of Structured Streaming (DataFrame) to Table — saveAsTable
Method
1 2 3 4 5 6 7 |
saveAsTable(tableName: String): Unit // PRIVATE API saveAsTable(tableIdent: TableIdentifier): Unit |
saveAsTable
saves the content of a DataFrame
to the tableName
table.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
val ids = spark.range(5) ids.write. option("path", "/tmp/five_ids"). saveAsTable("five_ids") // Check out if saveAsTable as five_ids was successful val q = spark.catalog.listTables.filter($"name" === "five_ids") scala> q.show +--------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +--------+--------+-----------+---------+-----------+ |five_ids| default| null| EXTERNAL| false| +--------+--------+-----------+---------+-----------+ |
Internally, saveAsTable
requests the current ParserInterface
to parse the input table name.
Note
|
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the ParserInterface.
|
saveAsTable
then requests the SessionCatalog
to check whether the table exists or not.
Note
|
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the SessionCatalog.
|
In the end, saveAsTable
branches off per whether the table exists or not and the save mode.
Does table exist? | Save Mode | Behaviour |
---|---|---|
yes |
|
Does nothing |
yes |
|
Reports an |
yes |
|
FIXME |
anything |
anything |
Saving Rows of Structured Query (DataFrame) to Data Source — save
Method
1 2 3 4 5 |
save(): Unit |
save
saves the rows of a structured query (a Dataset) to a data source.
Internally, save
uses DataSource
to look up the class of the requested data source (for the source option and the SQLConf).
Note
|
|
If the class is a DataSourceV2…FIXME
Otherwise, if not a DataSourceV2, save
simply saveToV1Source.
save
does not support saving to Hive (i.e. the source is hive
) and throws an AnalysisException
when requested so.
1 2 3 4 5 |
Hive data source can only be used with tables, you can not write files of Hive data source directly. |
save
does not support bucketing (i.e. when the numBuckets or sortColumnNames options are defined) and throws an AnalysisException
when requested so.
1 2 3 4 5 |
'[operation]' does not support bucketing right now |
Saving Data to Table Using JDBC Data Source — jdbc
Method
1 2 3 4 5 |
jdbc(url: String, table: String, connectionProperties: Properties): Unit |
jdbc
method saves the content of the DataFrame
to an external database table via JDBC.
You can use mode to control save mode, i.e. what happens when an external table exists when save
is executed.
It is assumed that the jdbc
save pipeline is not partitioned and bucketed.
All options are overriden by the input connectionProperties
.
The required options are:
-
driver
which is the class name of the JDBC driver (that is passed to Spark’s ownDriverRegistry.register
and later used toconnect(url, properties)
).
When table
exists and the override save mode is in use, DROP TABLE table
is executed.
It creates the input table
(using CREATE TABLE table (schema)
where schema
is the schema of the DataFrame
).
bucketBy
Method
1 2 3 4 5 |
bucketBy(numBuckets: Int, colName: String, colNames: String*): DataFrameWriter[T] |
bucketBy
simply sets the internal numBuckets and bucketColumnNames to the input numBuckets
and colName
with colNames
, respectively.
1 2 3 4 5 6 7 8 9 10 11 12 |
val df = spark.range(5) import org.apache.spark.sql.DataFrameWriter val writer: DataFrameWriter[java.lang.Long] = df.write val bucketedTable = writer.bucketBy(numBuckets = 8, "col1", "col2") scala> :type bucketedTable org.apache.spark.sql.DataFrameWriter[Long] |
Specifying Save Mode — mode
Method
1 2 3 4 5 6 |
mode(saveMode: String): DataFrameWriter[T] mode(saveMode: SaveMode): DataFrameWriter[T] |
mode
defines the behaviour of save when an external file or table (Spark writes to) already exists, i.e. SaveMode
.
Name | Description |
---|---|
|
|
|
|
|
Do not save the records and not change the existing data in any way. |
|
Specifying Sorting Columns — sortBy
Method
1 2 3 4 5 |
sortBy(colName: String, colNames: String*): DataFrameWriter[T] |
sortBy
simply sets sorting columns to the input colName
and colNames
column names.
Note
|
sortBy must be used together with bucketBy or DataFrameWriter reports an IllegalArgumentException .
|
Note
|
assertNotBucketed asserts that bucketing is not used by some methods. |
Specifying Writer Configuration — option
Method
1 2 3 4 5 6 7 8 |
option(key: String, value: Boolean): DataFrameWriter[T] option(key: String, value: Double): DataFrameWriter[T] option(key: String, value: Long): DataFrameWriter[T] option(key: String, value: String): DataFrameWriter[T] |
option
…FIXME
Specifying Writer Configuration — options
Method
1 2 3 4 5 |
options(options: scala.collection.Map[String, String]): DataFrameWriter[T] |
options
…FIXME
Specifying Data Source (by Alias or Fully-Qualified Class Name) — format
Method
1 2 3 4 5 |
format(source: String): DataFrameWriter[T] |
format
simply sets the source internal property.
Inserting Rows of Structured Streaming (DataFrame) into Table — insertInto
Method
1 2 3 4 5 6 |
insertInto(tableName: String): Unit (1) insertInto(tableIdent: TableIdentifier): Unit |
-
Parses
tableName
and calls the otherinsertInto
with aTableIdentifier
insertInto
inserts the content of the DataFrame
to the specified tableName
table.
Note
|
insertInto ignores column names and just uses a position-based resolution, i.e. the order (not the names!) of the columns in (the output of) the Dataset matters.
|
Internally, insertInto
creates an InsertIntoTable logical operator (with UnresolvedRelation operator as the only child) and executes it right away (that submits a Spark job).
insertInto
reports a AnalysisException
for bucketed DataFrames, i.e. buckets or sortColumnNames are defined.
1 2 3 4 5 |
'insertInto' does not support bucketing right now |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
val writeSpec = spark.range(4). write. bucketBy(numBuckets = 3, colName = "id") scala> writeSpec.insertInto("t1") org.apache.spark.sql.AnalysisException: 'insertInto' does not support bucketing right now; at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:334) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:302) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:298) ... 49 elided |
insertInto
reports a AnalysisException
for partitioned DataFrames, i.e. partitioningColumns is defined.
1 2 3 4 5 |
insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy(). |
1 2 3 4 5 6 7 8 9 10 11 12 |
val writeSpec = spark.range(4). write. partitionBy("id") scala> writeSpec.insertInto("t1") org.apache.spark.sql.AnalysisException: insertInto() can't be used together with partitionBy(). Partition columns have already be defined for the table. It is not necessary to use partitionBy().; at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:305) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:298) ... 49 elided |
getBucketSpec
Internal Method
1 2 3 4 5 |
getBucketSpec: Option[BucketSpec] |
getBucketSpec
returns a new BucketSpec if numBuckets was defined (with bucketColumnNames and sortColumnNames).
getBucketSpec
throws an IllegalArgumentException
when numBuckets are not defined when sortColumnNames are.
1 2 3 4 5 |
sortBy must be used together with bucketBy |
Note
|
getBucketSpec is used exclusively when DataFrameWriter is requested to create a table.
|
Creating Table — createTable
Internal Method
1 2 3 4 5 |
createTable(tableIdent: TableIdentifier): Unit |
createTable
builds a CatalogStorageFormat per extraOptions.
createTable
assumes CatalogTableType.EXTERNAL
when location URI of CatalogStorageFormat
is defined and CatalogTableType.MANAGED
otherwise.
createTable
creates a CatalogTable (with the bucketSpec per getBucketSpec).
In the end, createTable
creates a CreateTable logical command (with the CatalogTable
, mode and the logical query plan of the dataset) and runs it.
Note
|
createTable is used when DataFrameWriter is requested to saveAsTable.
|
assertNotBucketed
Internal Method
1 2 3 4 5 |
assertNotBucketed(operation: String): Unit |
assertNotBucketed
simply throws an AnalysisException
if either numBuckets or sortColumnNames internal property is defined:
1 2 3 4 5 |
'[operation]' does not support bucketing right now |
Note
|
assertNotBucketed is used when DataFrameWriter is requested to save, insertInto and jdbc.
|
Executing Logical Command for Writing to Data Source V1 — saveToV1Source
Internal Method
1 2 3 4 5 |
saveToV1Source(): Unit |
saveToV1Source
creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).
Note
|
While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.
|
In the end, saveToV1Source
runs the logical command for writing.
Note
|
The logical command for writing can be one of the following: |
Note
|
saveToV1Source is used exclusively when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source (for all but DataSourceV2 writers with WriteSupport ).
|
assertNotPartitioned
Internal Method
1 2 3 4 5 |
assertNotPartitioned(operation: String): Unit |
assertNotPartitioned
…FIXME
Note
|
assertNotPartitioned is used when…FIXME
|
partitionBy
Method
1 2 3 4 5 |
partitionBy(colNames: String*): DataFrameWriter[T] |
partitionBy
simply sets the partitioningColumns internal property.