关注 spark技术分享,
撸spark源码 玩spark最佳实践

DataFrameStatFunctions — Working With Statistic Functions

admin阅读(1608)

DataFrameStatFunctions — Working With Statistic Functions

DataFrameStatFunctions is used to work with statistic functions in a structured query (a DataFrame).

Table 1. DataFrameStatFunctions API
Method Description

approxQuantile

bloomFilter

corr

countMinSketch

cov

crosstab

freqItems

sampleBy

DataFrameStatFunctions is available using stat untyped transformation.

approxQuantile Method

approxQuantile…​FIXME

bloomFilter Method

bloomFilter…​FIXME

buildBloomFilter Internal Method

buildBloomFilter…​FIXME

Note
convertToDouble is used when…​FIXME

corr Method

corr…​FIXME

countMinSketch Method

countMinSketch…​FIXME

cov Method

cov…​FIXME

crosstab Method

crosstab…​FIXME

freqItems Method

freqItems…​FIXME

sampleBy Method

sampleBy…​FIXME

DataFrameNaFunctions — Working With Missing Data

admin阅读(1883)

DataFrameNaFunctions — Working With Missing Data

DataFrameNaFunctions is used to work with missing data in a structured query (a DataFrame).

Table 1. DataFrameNaFunctions API
Method Description

drop

fill

replace

DataFrameNaFunctions is available using na untyped transformation.

convertToDouble Internal Method

convertToDouble…​FIXME

Note
convertToDouble is used when…​FIXME

drop Method

drop…​FIXME

fill Method

fill…​FIXME

fillCol Internal Method

fillCol…​FIXME

Note
fillCol is used when…​FIXME

fillMap Internal Method

fillMap…​FIXME

Note
fillMap is used when…​FIXME

fillValue Internal Method

fillValue…​FIXME

Note
fillValue is used when…​FIXME

replace0 Internal Method

replace0…​FIXME

Note
replace0 is used when…​FIXME

replace Method

replace…​FIXME

replaceCol Internal Method

replaceCol…​FIXME

Note
replaceCol is used when…​FIXME

Actions

admin阅读(2485)

Dataset API — Actions

Actions are part of the Dataset API for…​FIXME

Note
Actions are the methods in the Dataset Scala class that are grouped in action group name, i.e. @group action.
Table 1. Dataset API’s Actions
Action Description

collect

count

describe

first

foreach

foreachPartition

head

reduce

show

summary

Computes specified statistics for numeric and string columns. The default statistics are: count, mean, stddev, min, max and 25%, 50%, 75% percentiles.

Note
summary is an extended version of the describe action that simply calculates count, mean, stddev, min and max statistics.

take

toLocalIterator

collect Action

collect…​FIXME

count Action

count…​FIXME

Calculating Basic Statistics — describe Action

describe…​FIXME

first Action

first…​FIXME

foreach Action

foreach…​FIXME

foreachPartition Action

foreachPartition…​FIXME

head Action

  1. Calls the other head with n as 1 and takes the first element

head…​FIXME

reduce Action

reduce…​FIXME

show Action

show…​FIXME

Calculating Statistics — summary Action

summary calculates specified statistics for numeric and string columns.

The default statistics are: count, mean, stddev, min, max and 25%, 50%, 75% percentiles.

Note
summary accepts arbitrary approximate percentiles specified as a percentage (e.g. 10%).

Internally, summary uses the StatFunctions to calculate the requested summaries for the Dataset.

Taking First Records — take Action

take is an action on a Dataset that returns a collection of n records.

Warning
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError.

Internally, take creates a new Dataset with Limit logical plan for Literal expression and the current LogicalPlan. It then runs the SparkPlan that produces a Array[InternalRow] that is in turn decoded to Array[T] using a bounded encoder.

toLocalIterator Action

toLocalIterator…​FIXME

Basic Actions

admin阅读(1525)

Dataset API — Basic Actions

Basic actions are part of the Dataset API for transforming a Dataset into a session-scoped or global temporary view and other basic actions (FIXME).

Note
Basic actions are the methods in the Dataset Scala class that are grouped in basic group name, i.e. @group basic.
Table 1. Dataset API’s Basic Actions
Action Description

cache

Caches the Dataset

checkpoint

Checkpoints the Dataset in a reliable way (using a reliable HDFS-compliant file system, e.g. Hadoop HDFS or Amazon S3)

columns

createGlobalTempView

createOrReplaceGlobalTempView

createOrReplaceTempView

createTempView

dtypes

explain

Displays the logical and physical plans of the Dataset, i.e. displays the logical and physical plans (with optional cost and codegen summaries) to the standard output

hint

inputFiles

isEmpty

(New in 2.4.0)

isLocal

localCheckpoint

Checkpoints the Dataset locally on executors (and therefore unreliably)

persist

Persists the Dataset

printSchema

rdd

schema

storageLevel

toDF

unpersist

Unpersists the Dataset

write

Returns a DataFrameWriter for saving the content of the (non-streaming) Dataset out to an external storage

Caching Dataset — cache Basic Action

cache merely executes the no-argument persist basic action.

Reliably Checkpointing Dataset — checkpoint Basic Action

  1. eager and reliableCheckpoint flags enabled

  2. reliableCheckpoint flag enabled

Note
checkpoint is an experimental operator and the API is evolving towards becoming stable.

checkpoint simply requests the Dataset to checkpoint with the given eager flag and the reliableCheckpoint flag enabled.

createTempView Basic Action

createTempView…​FIXME

Note
createTempView is used when…​FIXME

createOrReplaceTempView Basic Action

createOrReplaceTempView…​FIXME

Note
createOrReplaceTempView is used when…​FIXME

createGlobalTempView Basic Action

createGlobalTempView…​FIXME

Note
createGlobalTempView is used when…​FIXME

createOrReplaceGlobalTempView Basic Action

createOrReplaceGlobalTempView…​FIXME

Note
createOrReplaceGlobalTempView is used when…​FIXME

createTempViewCommand Internal Method

createTempViewCommand…​FIXME

Note
createTempViewCommand is used when the following Dataset operators are used: Dataset.createTempView, Dataset.createOrReplaceTempView, Dataset.createGlobalTempView and Dataset.createOrReplaceGlobalTempView.

Displaying Logical and Physical Plans, Their Cost and Codegen — explain Basic Action

  1. Turns the extended flag on

explain prints the logical and (with extended flag enabled) physical plans, their cost and codegen to the console.

Tip
Use explain to review the structured queries and optimizations applied.

Internally, explain creates a ExplainCommand logical command and requests SessionState to execute it (to get a QueryExecution back).

Note
explain uses ExplainCommand logical command that, when executed, gives different text representations of QueryExecution (for the Dataset’s LogicalPlan) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).

explain then requests QueryExecution for the optimized physical query plan and collects the records (as InternalRow objects).

Note

explain uses Dataset’s SparkSession to access the current SessionState.

In the end, explain goes over the InternalRow records and converts them to lines to display to console.

Note
explain “converts” an InternalRow record to a line using getString at position 0.
Tip
If you are serious about query debugging you could also use the Debugging Query Execution facility.

Specifying Hint — hint Basic Action

hint operator is part of Hint Framework to specify a hint (by name and parameters) for a Dataset.

Internally, hint simply attaches UnresolvedHint unary logical operator to an “analyzed” Dataset (i.e. the analyzed logical plan of a Dataset).

Note
hint adds an UnresolvedHint unary logical operator to an analyzed logical plan that indirectly triggers analysis phase that executes logical commands and their unions as well as resolves all hints that have already been added to a logical plan.

Locally Checkpointing Dataset — localCheckpoint Basic Action

  1. eager flag enabled

localCheckpoint simply uses Dataset.checkpoint operator with the input eager flag and reliableCheckpoint flag disabled (false).

checkpoint Internal Method

checkpoint requests QueryExecution (of the Dataset) to generate an RDD of internal binary rows (aka internalRdd) and then requests the RDD to make a copy of all the rows (by adding a MapPartitionsRDD).

Depending on reliableCheckpoint flag, checkpoint marks the RDD for (reliable) checkpointing (true) or local checkpointing (false).

With eager flag on, checkpoint counts the number of records in the RDD (by executing RDD.count) that gives the effect of immediate eager checkpointing.

checkpoint requests QueryExecution (of the Dataset) for optimized physical query plan (the plan is used to get the outputPartitioning and outputOrdering for the result Dataset).

Note
checkpoint is used in the Dataset untyped transformations, i.e. checkpoint and localCheckpoint.

Persisting Dataset — persist Basic Action

persist caches the Dataset using the default storage level MEMORY_AND_DISK or newLevel and returns it.

Internally, persist requests CacheManager to cache the structured query (that is accessible through SharedState of the current SparkSession).

Caution
FIXME

Generating RDD of Internal Binary Rows — rdd Basic Action

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.

Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.

Note
A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.

It then executes a DeserializeToObject logical operator that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.

Note
It is a lazy operation that “produces” a RDD[T].

Accessing Schema — schema Basic Action

A Dataset has a schema.

Tip

You may also use the following methods to learn about the schema:

Converting Typed Dataset to Untyped DataFrame — toDF Basic Action

toDF converts a Dataset into a DataFrame.

Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset‘s SparkSession and QueryExecution with the encoder being RowEncoder.

Caution
FIXME Describe toDF(colNames: String*)

Unpersisting Cached Dataset — unpersist Basic Action

unpersist uncache the Dataset possibly by blocking the call.

Internally, unpersist requests CacheManager to uncache the query.

Caution
FIXME

Accessing DataFrameWriter (to Describe Writing Dataset) — write Basic Action

write gives DataFrameWriter for records of type T.

isEmpty Typed Transformation

isEmpty…​FIXME

isLocal Typed Transformation

isLocal…​FIXME

Untyped Transformations

admin阅读(1401)

Dataset API — Untyped Transformations

Untyped transformations are part of the Dataset API for transforming a Dataset to a DataFrame, a Column, a RelationalGroupedDataset, a DataFrameNaFunctions or a DataFrameStatFunctions (and hence untyped).

Note
Untyped transformations are the methods in the Dataset Scala class that are grouped in untypedrel group name, i.e. @group untypedrel.
Table 1. Dataset API’s Untyped Transformations
Transformation Description

agg

apply

Selects a column based on the column name (i.e. maps a Dataset onto a Column)

col

Selects a column based on the column name (i.e. maps a Dataset onto a Column)

colRegex

Selects a column based on the column name specified as a regex (i.e. maps a Dataset onto a Column)

crossJoin

cube

drop

groupBy

join

na

rollup

select

selectExpr

stat

withColumn

withColumnRenamed

agg Untyped Transformation

agg…​FIXME

apply Untyped Transformation

apply selects a column based on the column name (i.e. maps a Dataset onto a Column).

col Untyped Transformation

col selects a column based on the column name (i.e. maps a Dataset onto a Column).

Internally, col branches off per the input column name.

If the column name is * (a star), col simply creates a Column with ResolvedStar expression (with the schema output attributes of the analyzed logical plan of the QueryExecution).

Otherwise, col uses colRegex untyped transformation when spark.sql.parser.quotedRegexColumnNames configuration property is enabled.

In the case when the column name is not * and spark.sql.parser.quotedRegexColumnNames configuration property is disabled, col creates a Column with the column name resolved (as a NamedExpression).

colRegex Untyped Transformation

colRegex selects a column based on the column name specified as a regex (i.e. maps a Dataset onto a Column).

Note
colRegex is used in col when spark.sql.parser.quotedRegexColumnNames configuration property is enabled (and the column name is not *).

Internally, colRegex matches the input column name to different regular expressions (in the order):

  1. For column names with quotes without a qualifier, colRegex simply creates a Column with a UnresolvedRegex (with no table)

  2. For column names with quotes with a qualifier, colRegex simply creates a Column with a UnresolvedRegex (with a table specified)

  3. For other column names, colRegex (behaves like col and) creates a Column with the column name resolved (as a NamedExpression)

crossJoin Untyped Transformation

crossJoin…​FIXME

cube Untyped Transformation

cube…​FIXME

Dropping One or More Columns — drop Untyped Transformation

drop…​FIXME

groupBy Untyped Transformation

groupBy…​FIXME

join Untyped Transformation

join…​FIXME

na Untyped Transformation

na simply creates a DataFrameNaFunctions to work with missing data.

rollup Untyped Transformation

rollup…​FIXME

select Untyped Transformation

select…​FIXME

Projecting Columns using SQL Statements — selectExpr Untyped Transformation

selectExpr is like select, but accepts SQL statements.

Internally, it executes select with every expression in exprs mapped to Column (using SparkSqlParser.parseExpression).

stat Untyped Transformation

stat simply creates a DataFrameStatFunctions to work with statistic functions.

withColumn Untyped Transformation

withColumn…​FIXME

withColumnRenamed Untyped Transformation

withColumnRenamed…​FIXME

Typed Transformations

admin阅读(1977)

Dataset API — Typed Transformations

Typed transformations are part of the Dataset API for transforming a Dataset with an Encoder (except the RowEncoder).

Note
Typed transformations are the methods in the Dataset Scala class that are grouped in typedrel group name, i.e. @group typedrel.
Table 1. Dataset API’s Typed Transformations
Transformation Description

alias

as

as

coalesce

Repartitions a Dataset

distinct

dropDuplicates

except

filter

flatMap

groupByKey

intersect

joinWith

limit

map

mapPartitions

orderBy

randomSplit

repartition

repartitionByRange

sample

select

sort

sortWithinPartitions

toJSON

transform

union

unionByName

where

as Typed Transformation

as…​FIXME

Enforcing Type — as Typed Transformation

as[T] allows for converting from a weakly-typed Dataset of Rows to Dataset[T] with T being a domain class (that can enforce a stronger schema).

Repartitioning Dataset with Shuffle Disabled — coalesce Typed Transformation

coalesce operator repartitions the Dataset to exactly numPartitions partitions.

Internally, coalesce creates a Repartition logical operator with shuffle disabled (which is marked as false in the below explain‘s output).

dropDuplicates Typed Transformation

dropDuplicates…​FIXME

except Typed Transformation

except…​FIXME

exceptAll Typed Transformation

exceptAll…​FIXME

filter Typed Transformation

filter…​FIXME

Creating Zero or More Records — flatMap Typed Transformation

flatMap returns a new Dataset (of type U) with all records (of type T) mapped over using the function func and then flattening the results.

Note
flatMap can create new records. It deprecated explode.

Internally, flatMap calls mapPartitions with the partitions flatMap(ped).

intersect Typed Transformation

intersect…​FIXME

intersectAll Typed Transformation

intersectAll…​FIXME

joinWith Typed Transformation

joinWith…​FIXME

limit Typed Transformation

limit…​FIXME

map Typed Transformation

map…​FIXME

mapPartitions Typed Transformation

mapPartitions…​FIXME

Randomly Split Dataset Into Two or More Datasets Per Weight — randomSplit Typed Transformation

randomSplit randomly splits the Dataset per weights.

weights doubles should sum up to 1 and will be normalized if they do not.

You can define seed and if you don’t, a random seed will be used.

Note
randomSplit is commonly used in Spark MLlib to split an input Dataset into two datasets for training and validation.

Repartitioning Dataset (Shuffle Enabled) — repartition Typed Transformation

repartition operators repartition the Dataset to exactly numPartitions partitions or using partitionExprs expressions.

Internally, repartition creates a Repartition or RepartitionByExpression logical operators with shuffle enabled (which is true in the below explain‘s output beside Repartition).

Note
repartition methods correspond to SQL’s DISTRIBUTE BY or CLUSTER BY clauses.

repartitionByRange Typed Transformation

  1. Uses spark.sql.shuffle.partitions configuration property for the number of partitions to use

repartitionByRange simply creates a Dataset with a RepartitionByExpression logical operator.

repartitionByRange uses a SortOrder with the Ascending sort order, i.e. ascending nulls first, when no explicit sort order is specified.

repartitionByRange throws a IllegalArgumentException when no partitionExprs partition-by expression is specified.

sample Typed Transformation

sample…​FIXME

select Typed Transformation

select…​FIXME

sort Typed Transformation

sort…​FIXME

sortWithinPartitions Typed Transformation

sortWithinPartitions simply calls the internal sortInternal method with the global flag disabled (false).

toJSON Typed Transformation

toJSON maps the content of Dataset to a Dataset of strings in JSON format.

Internally, toJSON grabs the RDD[InternalRow] (of the QueryExecution of the Dataset) and maps the records (per RDD partition) into JSON.

Note
toJSON uses Jackson’s JSON parser — jackson-module-scala.

Transforming Datasets — transform Typed Transformation

transform applies t function to the source Dataset[T] to produce a result Dataset[U]. It is for chaining custom transformations.

Internally, transform executes t function on the current Dataset[T].

union Typed Transformation

union…​FIXME

unionByName Typed Transformation

unionByName creates a new Dataset that is an union of the rows in this and the other Datasets column-wise, i.e. the order of columns in Datasets does not matter as long as their names and number match.

Internally, unionByName creates a Union logical operator for this Dataset and Project logical operator with the other Dataset.

In the end, unionByName applies the CombineUnions logical optimization to the Union logical operator and requests the result LogicalPlan to wrap the child operators with AnalysisBarriers.

unionByName throws an AnalysisException if there are duplicate columns in either Dataset.

unionByName throws an AnalysisException if there are columns in this Dataset has a column that is not available in the other Dataset.

where Typed Transformation

where is simply a synonym of the filter operator, i.e. passes the input parameters along to filter.

Creating Streaming Dataset with EventTimeWatermark Logical Operator — withWatermark Streaming Typed Transformation

Internally, withWatermark creates a Dataset with EventTimeWatermark logical plan for streaming Datasets.

Note
withWatermark uses EliminateEventTimeWatermark logical rule to eliminate EventTimeWatermark logical plan for non-streaming batch Datasets.

Note

delayThreshold is parsed using CalendarInterval.fromString with interval formatted as described in TimeWindow unary expression.

Note
delayThreshold must not be negative (and milliseconds and months should both be equal or greater than 0).
Note
withWatermark is used when…​FIXME

Dataset API — Dataset Operators

admin阅读(1139)

Dataset API — Dataset Operators

Dataset API is a set of operators with typed and untyped transformations, and actions to work with a structured query (as a Dataset) as a whole.

Table 1. Dataset Operators (Transformations and Actions)
Operator Description

agg

An untyped transformation

alias

A typed transformation that is a mere synonym of as.

apply

An untyped transformation to select a column based on the column name (i.e. maps a Dataset onto a Column)

as

A typed transformation

as

A typed transformation to enforce a type, i.e. marking the records in the Dataset as of a given data type (data type conversion). as simply changes the view of the data that is passed into typed operations (e.g. map) and does not eagerly project away any columns that are not present in the specified class.

cache

A basic action that is a mere synonym of persist.

checkpoint

A basic action to checkpoint the Dataset in a reliable way (using a reliable HDFS-compliant file system, e.g. Hadoop HDFS or Amazon S3)

coalesce

A typed transformation to repartition a Dataset

col

An untyped transformation to create a column (reference) based on the column name

collect

An action

colRegex

An untyped transformation to create a column (reference) based on the column name specified as a regex

columns

A basic action

count

An action to count the number of rows

createGlobalTempView

A basic action

createOrReplaceGlobalTempView

A basic action

createOrReplaceTempView

A basic action

createTempView

A basic action

crossJoin

An untyped transformation

cube

An untyped transformation

describe

An action

distinct

A typed transformation that is a mere synonym of dropDuplicates (with all the columns of the Dataset)

drop

An untyped transformation

dropDuplicates

A typed transformation

dtypes

A basic action

except

A typed transformation

exceptAll

(New in 2.4.0) A typed transformation

explain

A basic action to display the logical and physical plans of the Dataset, i.e. displays the logical and physical plans (with optional cost and codegen summaries) to the standard output

filter

A typed transformation

first

An action that is a mere synonym of head

flatMap

A typed transformation

foreach

An action

foreachPartition

An action

groupBy

An untyped transformation

groupByKey

A typed transformation

head

  1. Uses 1 for n

An action

hint

A basic action to specify a hint (and optional parameters)

inputFiles

A basic action

intersect

A typed transformation

intersectAll

(New in 2.4.0) A typed transformation

isEmpty

(New in 2.4.0) A basic action

isLocal

A basic action

isStreaming

join

An untyped transformation

joinWith

A typed transformation

limit

A typed transformation

localCheckpoint

A basic action to checkpoint the Dataset locally on executors (and therefore unreliably)

map

A typed transformation

mapPartitions

A typed transformation

na

An untyped transformation

orderBy

A typed transformation

persist

A basic action to persist the Dataset

printSchema

A basic action

randomSplit

A typed transformation to split a Dataset randomly into two Datasets

rdd

A basic action

reduce

An action to reduce the records of the Dataset using the specified binary function.

repartition

A typed transformation to repartition a Dataset

repartitionByRange

A typed transformation

rollup

An untyped transformation

sample

A typed transformation

schema

A basic action

select

An (untyped and typed) transformation

selectExpr

An untyped transformation

show

An action

sort

A typed transformation to sort elements globally (across partitions). Use sortWithinPartitions transformation for partition-local sort

sortWithinPartitions

A typed transformation to sort elements within partitions (aka local sort). Use sort transformation for global sort (across partitions)

stat

An untyped transformation

storageLevel

A basic action

summary

An action to calculate statistics (e.g. count, mean, stddev, min, max and 25%, 50%, 75% percentiles)

take

An action to take the first records of a Dataset

toDF

A basic action to convert a Dataset to a DataFrame

toJSON

A typed transformation

toLocalIterator

An action that returns an iterator with all rows in the Dataset. The iterator will consume as much memory as the largest partition in the Dataset.

transform

A typed transformation for chaining custom transformations

union

A typed transformation

unionByName

A typed transformation

unpersist

  1. Uses unpersist with blocking disabled (false)

A basic action to unpersist the Dataset

where

A typed transformation

withColumn

An untyped transformation

withColumnRenamed

An untyped transformation

write

A basic action that returns a DataFrameWriter for saving the content of the (non-streaming) Dataset out to an external storage

DataFrameWriter — Saving Data To External Data Sources

admin阅读(1653)

DataFrameWriter — Saving Data To External Data Sources

DataFrameWriter is the interface to describe how data (as the result of executing a structured query) should be saved to an external data source.

Table 1. DataFrameWriter API / Writing Operators
Method Description

bucketBy

csv

format

insertInto

Inserts (the results of) a DataFrame into a table

jdbc

json

mode

option

options

orc

parquet

partitionBy

save

saveAsTable

sortBy

text

DataFrameWriter is available using Dataset.write operator.

DataFrameWriter supports many file formats and JDBC databases. It also allows for plugging in new formats.

DataFrameWriter defaults to parquet data source format. You can change the default format using spark.sql.sources.default configuration property or format or the format-specific methods.

In the end, you trigger the actual saving of the content of a Dataset (i.e. the result of executing a structured query) using save method.

DataFrameWriter uses internal mutable attributes to build a properly-defined “write specification” for insertInto, save and saveAsTable methods.

Table 2. Internal Attributes and Corresponding Setters
Attribute Setters

source

format

mode

mode

extraOptions

option, options, save

partitioningColumns

partitionBy

bucketColumnNames

bucketBy

numBuckets

bucketBy

sortColumnNames

sortBy

Note
DataFrameWriter is a type constructor in Scala that keeps an internal reference to the source DataFrame for the whole lifecycle (starting right from the moment it was created).
Note
Spark Structured Streaming’s DataStreamWriter is responsible for writing the content of streaming Datasets in a streaming fashion.

Executing Logical Command(s) — runCommand Internal Method

runCommand uses the input SparkSession to access the SessionState that is in turn requested to execute the logical command (that simply creates a QueryExecution).

runCommand records the current time (start time) and uses the SQLExecution helper object to execute the action (under a new execution id) that simply requests the QueryExecution for the RDD[InternalRow] (and triggers execution of logical commands).

Tip
Use web UI’s SQL tab to see the execution or a SparkListener to be notified when the execution is started and finished. The SparkListener should intercept SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.

runCommand records the current time (end time).

In the end, runCommand uses the input SparkSession to access the ExecutionListenerManager and requests it to onSuccess (with the input name, the QueryExecution and the duration).

In case of any exceptions, runCommand requests the ExecutionListenerManager to onFailure (with the exception) and (re)throws it.

Saving Rows of Structured Streaming (DataFrame) to Table — saveAsTable Method

saveAsTable saves the content of a DataFrame to the tableName table.

Internally, saveAsTable requests the current ParserInterface to parse the input table name.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the ParserInterface.

saveAsTable then requests the SessionCatalog to check whether the table exists or not.

Note
saveAsTable uses the internal DataFrame to access the SparkSession that is used to access the SessionState and in the end the SessionCatalog.

In the end, saveAsTable branches off per whether the table exists or not and the save mode.

Table 3. saveAsTable’s Behaviour per Save Mode
Does table exist? Save Mode Behaviour

yes

Ignore

Does nothing

yes

ErrorIfExists

Reports an AnalysisException with Table [tableIdent] already exists. error message

yes

Overwrite

FIXME

anything

anything

createTable

Saving Rows of Structured Query (DataFrame) to Data Source — save Method

save saves the rows of a structured query (a Dataset) to a data source.

Internally, save uses DataSource to look up the class of the requested data source (for the source option and the SQLConf).

Note

save uses SparkSession to access the SessionState that is in turn used to access the SQLConf.

If the class is a DataSourceV2…​FIXME

Otherwise, if not a DataSourceV2, save simply saveToV1Source.

save does not support saving to Hive (i.e. the source is hive) and throws an AnalysisException when requested so.

save does not support bucketing (i.e. when the numBuckets or sortColumnNames options are defined) and throws an AnalysisException when requested so.

Saving Data to Table Using JDBC Data Source — jdbc Method

jdbc method saves the content of the DataFrame to an external database table via JDBC.

You can use mode to control save mode, i.e. what happens when an external table exists when save is executed.

It is assumed that the jdbc save pipeline is not partitioned and bucketed.

All options are overriden by the input connectionProperties.

The required options are:

  • driver which is the class name of the JDBC driver (that is passed to Spark’s own DriverRegistry.register and later used to connect(url, properties)).

When table exists and the override save mode is in use, DROP TABLE table is executed.

It creates the input table (using CREATE TABLE table (schema) where schema is the schema of the DataFrame).

bucketBy Method

bucketBy simply sets the internal numBuckets and bucketColumnNames to the input numBuckets and colName with colNames, respectively.

partitionBy Method

Caution
FIXME

Specifying Save Mode — mode Method

mode defines the behaviour of save when an external file or table (Spark writes to) already exists, i.e. SaveMode.

Table 4. Types of SaveMode
Name Description

Append

Records are appended to existing data.

ErrorIfExists

Exception is thrown.

Ignore

Do not save the records and not change the existing data in any way.

Overwrite

Existing data is overwritten by new records.

Specifying Sorting Columns — sortBy Method

sortBy simply sets sorting columns to the input colName and colNames column names.

Note
sortBy must be used together with bucketBy or DataFrameWriter reports an IllegalArgumentException.
Note
assertNotBucketed asserts that bucketing is not used by some methods.

Specifying Writer Configuration — option Method

option…​FIXME

Specifying Writer Configuration — options Method

options…​FIXME

Writing DataFrames to Files

Caution
FIXME

Specifying Data Source (by Alias or Fully-Qualified Class Name) — format Method

format simply sets the source internal property.

Parquet

Caution
FIXME
Note
Parquet is the default data source format.

Inserting Rows of Structured Streaming (DataFrame) into Table — insertInto Method

  1. Parses tableName and calls the other insertInto with a TableIdentifier

insertInto inserts the content of the DataFrame to the specified tableName table.

Note
insertInto ignores column names and just uses a position-based resolution, i.e. the order (not the names!) of the columns in (the output of) the Dataset matters.

Internally, insertInto creates an InsertIntoTable logical operator (with UnresolvedRelation operator as the only child) and executes it right away (that submits a Spark job).

spark sql DataFrameWrite insertInto webui query details.png
Figure 1. DataFrameWrite.insertInto Executes SQL Command (as a Spark job)

insertInto reports a AnalysisException for bucketed DataFrames, i.e. buckets or sortColumnNames are defined.

insertInto reports a AnalysisException for partitioned DataFrames, i.e. partitioningColumns is defined.

getBucketSpec Internal Method

getBucketSpec returns a new BucketSpec if numBuckets was defined (with bucketColumnNames and sortColumnNames).

getBucketSpec throws an IllegalArgumentException when numBuckets are not defined when sortColumnNames are.

Note
getBucketSpec is used exclusively when DataFrameWriter is requested to create a table.

Creating Table — createTable Internal Method

createTable assumes CatalogTableType.EXTERNAL when location URI of CatalogStorageFormat is defined and CatalogTableType.MANAGED otherwise.

createTable creates a CatalogTable (with the bucketSpec per getBucketSpec).

In the end, createTable creates a CreateTable logical command (with the CatalogTable, mode and the logical query plan of the dataset) and runs it.

Note
createTable is used when DataFrameWriter is requested to saveAsTable.

assertNotBucketed Internal Method

assertNotBucketed simply throws an AnalysisException if either numBuckets or sortColumnNames internal property is defined:

Note
assertNotBucketed is used when DataFrameWriter is requested to save, insertInto and jdbc.

Executing Logical Command for Writing to Data Source V1 — saveToV1Source Internal Method

saveToV1Source creates a DataSource (for the source class name, the partitioningColumns and the extraOptions) and requests it for the logical command for writing (with the mode and the analyzed logical plan of the structured query).

Note
While requesting the analyzed logical plan of the structured query, saveToV1Source triggers execution of logical commands.

In the end, saveToV1Source runs the logical command for writing.

Note
saveToV1Source is used exclusively when DataFrameWriter is requested to save the rows of a structured query (a DataFrame) to a data source (for all but DataSourceV2 writers with WriteSupport).

assertNotPartitioned Internal Method

assertNotPartitioned…​FIXME

Note
assertNotPartitioned is used when…​FIXME

csv Method

csv…​FIXME

json Method

json…​FIXME

orc Method

orc…​FIXME

parquet Method

parquet…​FIXME

text Method

text…​FIXME

partitionBy Method

partitionBy simply sets the partitioningColumns internal property.

DataFrameReader — Loading Data From External Data Sources

admin阅读(1094)

DataFrameReader — Loading Data From External Data Sources

DataFrameReader is the public interface to describe how to load data from an external data source (e.g. files, tables, JDBC or Dataset[String]).

Table 1. DataFrameReader API
Method Description

csv

format

jdbc

json

load

option

options

orc

parquet

schema

table

text

textFile

DataFrameReader is available using SparkSession.read.

DataFrameReader supports many file formats natively and offers the interface to define custom formats.

Note
DataFrameReader assumes parquet data source file format by default that you can change using spark.sql.sources.default Spark property.

After you have described the loading pipeline (i.e. the “Extract” part of ETL in Spark SQL), you eventually “trigger” the loading using format-agnostic load or format-specific (e.g. json, csv, jdbc) operators.

Note
All methods of DataFrameReader merely describe a process of loading a data and do not trigger a Spark job (until an action is called).

DataFrameReader can read text files using textFile methods that return typed Datasets.

Note
Loading datasets using textFile methods allows for additional preprocessing before final processing of the string values as json or csv lines.

(New in Spark 2.2) DataFrameReader can load datasets from Dataset[String] (with lines being complete “files”) using format-specific csv and json operators.

Table 2. DataFrameReader’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

extraOptions

Used when…​FIXME

source

Name of the input data source (aka format or provider) with the default format per spark.sql.sources.default configuration property (default: parquet).

source can be changed using format method.

Used exclusively when DataFrameReader is requested to load.

userSpecifiedSchema

Optional used-specified schema (default: None, i.e. undefined)

Set when DataFrameReader is requested to set a schema, load a data from an external data source, loadV1Source (when creating a DataSource), and load a data using json and csv file formats

Used when DataFrameReader is requested to assertNoSpecifiedSchema (while loading data using jdbc, table and textFile)

Specifying Format Of Input Data Source — format method

You use format to configure DataFrameReader to use appropriate source format.

Supported data formats:

  • json

  • csv (since 2.0.0)

  • parquet (see Parquet)

  • orc

  • text

  • jdbc

  • libsvm — only when used in format("libsvm")

Note
Spark SQL allows for developing custom data source formats.

Specifying Schema — schema method

schema allows for specyfing the schema of a data source (that the DataFrameReader is about to read a dataset from).

Note
Some formats can infer schema from datasets (e.g. csv or json) using inferSchema option.
Tip
Read up on Schema.

Specifying Load Options — option and options Methods

You can also use options method to describe different options in a single Map.

Loading Datasets from Files (into DataFrames) Using Format-Specific Load Operators

DataFrameReader supports the following file formats:

json method

New in 2.0.0: prefersDecimal

csv method

parquet method

The supported options:

New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.

The compressions supported:

  • none or uncompressed

  • snappy – the default codec in Spark 2.0.0.

  • gzip – the default codec in Spark before 2.0.0

  • lzo

orc method

Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.

Tip
Read ORC Files document to learn about the ORC file format.

text method

text method loads a text file.

Example

Loading Table to DataFrame — table Method

table loads the content of the tableName table into an untyped DataFrame.

Note
table simply passes the call to SparkSession.table after making sure that a user-defined schema has not been specified.

Loading Data From External Table using JDBC Data Source — jdbc Method

jdbc loads data from an external table using the JDBC data source.

Internally, jdbc creates a JDBCOptions from the input url, table and extraOptions with connectionProperties.

jdbc then creates one JDBCPartition per predicates.

In the end, jdbc requests the SparkSession to create a DataFrame for a JDBCRelation (with JDBCPartitions and JDBCOptions created earlier).

Note

jdbc does not support a custom schema and throws an AnalysisException if defined:

Note
jdbc method uses java.util.Properties (and appears overly Java-centric). Use format(“jdbc”) instead.

Loading Datasets From Text Files — textFile Method

textFile loads one or many text files into a typed Dataset[String].

Note
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String].

Internally, textFile passes calls on to text method and selects the only value column before it applies Encoders.STRING encoder.

Creating DataFrameReader Instance

DataFrameReader takes the following when created:

Loading Dataset (Data Source API V1) — loadV1Source Internal Method

loadV1Source creates a DataSource and requests it to resolve the underlying relation (as a BaseRelation).

In the end, loadV1Source requests SparkSession to create a DataFrame from the BaseRelation.

Note
loadV1Source is used when DataFrameReader is requested to load (and the data source is neither of DataSourceV2 type nor a DataSourceReader could not be created).

Loading Dataset from Data Source — load Method

load loads a dataset from a data source (with optional support for multiple paths) as an untyped DataFrame.

Internally, load lookupDataSource for the source. load then branches off per its type (i.e. whether it is of DataSourceV2 marker type or not).

For a “Data Source V2” data source, load…​FIXME

Otherwise, if the source is not a “Data Source V2” data source, load simply loadV1Source.

load throws a AnalysisException when the source format is hive.

assertNoSpecifiedSchema Internal Method

assertNoSpecifiedSchema throws a AnalysisException if the userSpecifiedSchema is defined.

Note
assertNoSpecifiedSchema is used when DataFrameReader is requested to load data using jdbc, table and textFile.

verifyColumnNameOfCorruptRecord Internal Method

verifyColumnNameOfCorruptRecord…​FIXME

Note
verifyColumnNameOfCorruptRecord is used when DataFrameReader is requested to load data using json and csv.

DataSource API — Managing Datasets in External Data Sources

admin阅读(1217)

DataSource API — Managing Datasets in External Data Sources

Reading Datasets

Spark SQL can read data from external storage systems like files, Hive tables and JDBC databases through DataFrameReader interface.

You use SparkSession to access DataFrameReader using read operation.

DataFrameReader is an interface to create DataFrames (aka Dataset[Row]) from files, Hive tables or tables using JDBC.

As of Spark 2.0, DataFrameReader can read text files using textFile methods that return Dataset[String] (not DataFrames).

There are two operation modes in Spark SQL, i.e. batch and streaming (part of Spark Structured Streaming).

You can access DataStreamReader for reading streaming datasets through SparkSession.readStream method.

The available methods in DataStreamReader are similar to DataFrameReader.

Saving Datasets

Spark SQL can save data to external storage systems like files, Hive tables and JDBC databases through DataFrameWriter interface.

You use write method on a Dataset to access DataFrameWriter.

DataFrameWriter is an interface to persist a Datasets to an external storage system in a batch fashion.

You can access DataStreamWriter for writing streaming datasets through Dataset.writeStream method.

The available methods in DataStreamWriter are similar to DataFrameWriter.

关注公众号:spark技术分享

联系我们联系我们