关注 spark技术分享,
撸spark源码 玩spark最佳实践

DataFrameWriter — Saving Data To External Data Sources

DataFrameWriter — Saving Data To External Data Sources

DataFrameWriter is the interface to describe how data (as the result of executing a structured query) should be saved to an external data source.

Table 1. DataFrameWriter API / Writing Operators
Method Description

bucketBy

csv

format

insertInto

Inserts (the results of) a DataFrame into a table

jdbc

json

mode

option

options

orc

parquet

partitionBy

save

saveAsTable

sortBy

text

DataFrameWriter is available using Dataset.write operator.

DataFrameWriter supports many file formats and JDBC databases. It also allows for plugging in new formats.

DataFrameWriter defaults to parquet data source format. You can change the default format using spark.sql.sources.default configuration property or format or the format-specific methods.

In the end, you trigger the actual saving of the content of a Dataset (i.e. the result of executing a structured query) using save method.

DataFrameWriter uses internal mutable attributes to build a properly-defined “write specification” for insertInto, save and saveAsTable methods.

Table 2. Internal Attributes and Corresponding Setters
Attribute Setters

source

format

mode

mode

extraOptions

option, options, save

partitioningColumns

partitionBy

bucketColumnNames

bucketBy

numBuckets

bucketBy

sortColumnNames

sortBy

Note
DataFrameWriter is a type constructor in Scala that keeps an internal reference to the source DataFrame for the whole lifecycle (starting right from the moment it was created).
Note
Spark Structured Streaming’s DataStreamWriter is responsible for writing the content of streaming Datasets in a streaming fashion.

Executing Logical Command(s) — runCommand Internal Method

runCommand uses the input SparkSession to access the SessionState that is in turn requested to execute the logical command (that simply creates a QueryExecution).

runCommand records the current time (start time) and uses the SQLExecution helper object to execute the action (under a new execution id) that simply requests the QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).

Tip
Use web UI’s SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.

runCommand records the current time (end time).

In the end, runCommand uses the input SparkSession to access the ExecutionListenerManager and requests it to onSuccess (with the input name, the QueryExecution and the duration).

In case of any exceptions, runCommand requests the ExecutionListenerManager to onFailure (with the exception) and (re)throws it.

Saving Rows of Structured Streaming (DataFrame) to Table — saveAsTable Method

saveAsTable saves the content of a DataFrame to the tableName table.

Internally, saveAsTable requests the current ParserInterface to parse the input table name.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the ParserInterface.

saveAsTable then requests the SessionCatalog to check whether the table exists or not.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the SessionCatalog.

In the end, saveAsTable branches off per whether the table exists or not and the save mode.

Table 3. saveAsTable’s Behaviour per Save Mode
Does table exist? Save Mode Behaviour

yes

Ignore

Does nothing

yes

ErrorIfExists

Reports an AnalysisException with Table [tableIdent] already exists. error message

yes

Overwrite

FIXME

anything

anything

createTable

Saving Rows of Structured Query (DataFrame) to Data Source — save Method

save saves the rows of a structured query (a Dataset) to a data source.

Internally, save uses DataSource to look up the class of the requested data source (for the source option and the SQLConf).

Note

save uses SparkSession to access the SessionState that is in turn used to access the SQLConf.

If the class is a DataSourceV2…​FIXME

Otherwise, if not a DataSourceV2, save simply saveToV1Source.

save does not support saving to Hive (i.e. the source is hive) and throws an AnalysisException when requested so.

save does not support bucketing (i.e. when the numBuckets or sortColumnNames options are defined) and throws an AnalysisException when requested so.

Saving Data to Table Using JDBC Data Source — jdbc Method

jdbc method saves the content of the DataFrame to an external database table via JDBC.

You can use mode to control save mode, i.e. what happens when an external table exists when save is executed.

It is assumed that the jdbc save pipeline is not partitioned and bucketed.

All options are overriden by the input connectionProperties.

The required options are:

  • driver which is the class name of the JDBC driver (that is passed to Spark’s own DriverRegistry.register and later used to connect(url, properties)).

When table exists and the override save mode is in use, DROP TABLE table is executed.

It creates the input table (using CREATE TABLE table (schema) where schema is the schema of the DataFrame).

bucketBy Method

bucketBy simply sets the internal numBuckets and bucketColumnNames to the input numBuckets and colName with colNames, respectively.

partitionBy Method

Caution
FIXME

Specifying Save Mode — mode Method

mode defines the behaviour of save when an external file or table (Spark writes to) already exists, i.e. SaveMode.

Table 4. Types of SaveMode
Name Description

Append

Records are appended to existing data.

ErrorIfExists

Exception is thrown.

Ignore

Do not save the records and not change the existing data in any way.

Overwrite

Existing data is overwritten by new records.

Specifying Sorting Columns — sortBy Method

sortBy simply sets sorting columns to the input colName and colNames column names.

Note
sortBy must be used together with bucketBy or DataFrameWriter reports an IllegalArgumentException.
Note
assertNotBucketed asserts that bucketing is not used by some methods.

Specifying Writer Configuration — option Method

option…​FIXME

Specifying Writer Configuration — options Method

options…​FIXME

Writing DataFrames to Files

Caution
FIXME

Specifying Data Source (by Alias or Fully-Qualified Class Name) — format Method

format simply sets the source internal property.

Parquet

Caution
FIXME
Note
Parquet is the default data source format.

Inserting Rows of Structured Streaming (DataFrame) into Table — insertInto Method

  1. Parses tableName and calls the other insertInto with a TableIdentifier

insertInto inserts the content of the DataFrame to the specified tableName table.

Note
insertInto ignores column names and just uses a position-based resolution, i.e. the order (not the names!) of the columns in (the output of) the Dataset matters.

Internally, insertInto creates an InsertIntoTable logical operator (with UnresolvedRelation operator as the only child) and executes it right away (that submits a Spark job).

spark sql DataFrameWrite insertInto webui query details.png
Figure 1. DataFrameWrite.insertInto Executes SQL Command (as a Spark job)

insertInto reports a AnalysisException for bucketed DataFrames, i.e. buckets or sortColumnNames are defined.

insertInto reports a AnalysisException for partitioned DataFrames, i.e. partitioningColumns is defined.

getBucketSpec Internal Method

getBucketSpec returns a new BucketSpec if numBuckets was defined (with bucketColumnNames and sortColumnNames).

getBucketSpec throws an IllegalArgumentException when numBuckets are not defined when sortColumnNames are.

Note
getBucketSpec is used exclusively when DataFrameWriter is requested to create a table.

Creating Table — createTable Internal Method

createTable assumes CatalogTableType.EXTERNAL when location URI of CatalogStorageFormat is defined and CatalogTableType.MANAGED otherwise.

createTable creates a CatalogTable (with the bucketSpec per getBucketSpec).

In the end, createTable creates a CreateTable logical command (with the CatalogTable, mode and the logical query plan of the dataset) and runs it.

Note
createTable is used when DataFrameWriter is requested to saveAsTable.

assertNotBucketed Internal Method

assertNotBucketed simply throws an AnalysisException if either numBuckets or sortColumnNames internal property is defined:

Note
assertNotBucketed is used when DataFrameWriter is requested to save, insertInto and jdbc.

Executing Logical Command for Writing to Data Source V1 — saveToV1Source Internal Method

saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).

Note
While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.

In the end, saveToV1Source runs the logical command for writing.

Note
saveToV1Source is used exclusively when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source (for all but DataSourceV2 writers with WriteSupport).

assertNotPartitioned Internal Method

assertNotPartitioned…​FIXME

Note
assertNotPartitioned is used when…​FIXME

csv Method

csv…​FIXME

json Method

json…​FIXME

orc Method

orc…​FIXME

parquet Method

parquet…​FIXME

text Method

text…​FIXME

partitionBy Method

partitionBy simply sets the partitioningColumns internal property.

赞(0) 打赏
未经允许不得转载:spark技术分享 » DataFrameWriter — Saving Data To External Data Sources
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏