Dataset API — Basic Actions
Basic actions are part of the Dataset API for transforming a Dataset
into a session-scoped or global temporary view and other basic actions (FIXME).
Note
|
Basic actions are the methods in the Dataset Scala class that are grouped in basic group name, i.e. @group basic .
|
Action | Description | ||
---|---|---|---|
Caches the |
|||
Checkpoints the |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
Displays the logical and physical plans of the |
|||
|
|||
|
|||
(New in 2.4.0) |
|||
|
|||
Checkpoints the |
|||
Persists the |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
Unpersists the |
|||
Returns a DataFrameWriter for saving the content of the (non-streaming) |
Caching Dataset — cache
Basic Action
1 2 3 4 5 |
cache(): this.type |
cache
merely executes the no-argument persist basic action.
1 2 3 4 5 |
val ds = spark.range(5).cache |
Reliably Checkpointing Dataset — checkpoint
Basic Action
1 2 3 4 5 6 |
checkpoint(): Dataset[T] (1) checkpoint(eager: Boolean): Dataset[T] (2) |
-
eager
andreliableCheckpoint
flags enabled -
reliableCheckpoint
flag enabled
Note
|
checkpoint is an experimental operator and the API is evolving towards becoming stable.
|
checkpoint
simply requests the Dataset
to checkpoint with the given eager
flag and the reliableCheckpoint
flag enabled.
createTempView
Basic Action
1 2 3 4 5 |
createTempView(viewName: String): Unit |
createTempView
…FIXME
Note
|
createTempView is used when…FIXME
|
createOrReplaceTempView
Basic Action
1 2 3 4 5 |
createOrReplaceTempView(viewName: String): Unit |
createOrReplaceTempView
…FIXME
Note
|
createOrReplaceTempView is used when…FIXME
|
createGlobalTempView
Basic Action
1 2 3 4 5 |
createGlobalTempView(viewName: String): Unit |
createGlobalTempView
…FIXME
Note
|
createGlobalTempView is used when…FIXME
|
createOrReplaceGlobalTempView
Basic Action
1 2 3 4 5 |
createOrReplaceGlobalTempView(viewName: String): Unit |
createOrReplaceGlobalTempView
…FIXME
Note
|
createOrReplaceGlobalTempView is used when…FIXME
|
createTempViewCommand
Internal Method
1 2 3 4 5 6 7 8 |
createTempViewCommand( viewName: String, replace: Boolean, global: Boolean): CreateViewCommand |
createTempViewCommand
…FIXME
Note
|
createTempViewCommand is used when the following Dataset operators are used: Dataset.createTempView, Dataset.createOrReplaceTempView, Dataset.createGlobalTempView and Dataset.createOrReplaceGlobalTempView.
|
Displaying Logical and Physical Plans, Their Cost and Codegen — explain
Basic Action
1 2 3 4 5 6 |
explain(): Unit (1) explain(extended: Boolean): Unit |
-
Turns the
extended
flag on
explain
prints the logical and (with extended
flag enabled) physical plans, their cost and codegen to the console.
Tip
|
Use explain to review the structured queries and optimizations applied.
|
Internally, explain
creates a ExplainCommand logical command and requests SessionState
to execute it (to get a QueryExecution back).
Note
|
explain uses ExplainCommand logical command that, when executed, gives different text representations of QueryExecution (for the Dataset’s LogicalPlan) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).
|
explain
then requests QueryExecution
for the optimized physical query plan and collects the records (as InternalRow objects).
Note
|
|
In the end, explain
goes over the InternalRow
records and converts them to lines to display to console.
Note
|
explain “converts” an InternalRow record to a line using getString at position 0 .
|
Tip
|
If you are serious about query debugging you could also use the Debugging Query Execution facility. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> spark.range(10).explain(extended = true) == Parsed Logical Plan == Range (0, 10, step=1, splits=Some(8)) == Analyzed Logical Plan == id: bigint Range (0, 10, step=1, splits=Some(8)) == Optimized Logical Plan == Range (0, 10, step=1, splits=Some(8)) == Physical Plan == *Range (0, 10, step=1, splits=Some(8)) |
Specifying Hint — hint
Basic Action
1 2 3 4 5 |
hint(name: String, parameters: Any*): Dataset[T] |
hint
operator is part of Hint Framework to specify a hint (by name
and parameters
) for a Dataset
.
Internally, hint
simply attaches UnresolvedHint unary logical operator to an “analyzed” Dataset
(i.e. the analyzed logical plan of a Dataset
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
val ds = spark.range(3) val plan = ds.queryExecution.logical scala> println(plan.numberedTreeString) 00 Range (0, 3, step=1, splits=Some(8)) // Attach a hint val dsHinted = ds.hint("myHint", 100, true) val plan = dsHinted.queryExecution.logical scala> println(plan.numberedTreeString) 00 'UnresolvedHint myHint, [100, true] 01 +- Range (0, 3, step=1, splits=Some(8)) |
Note
|
hint adds an UnresolvedHint unary logical operator to an analyzed logical plan that indirectly triggers analysis phase that executes logical commands and their unions as well as resolves all hints that have already been added to a logical plan.
|
1 2 3 4 5 |
// FIXME Demo with UnresolvedHint |
Locally Checkpointing Dataset — localCheckpoint
Basic Action
1 2 3 4 5 6 |
localCheckpoint(): Dataset[T] (1) localCheckpoint(eager: Boolean): Dataset[T] |
-
eager
flag enabled
localCheckpoint
simply uses Dataset.checkpoint operator with the input eager
flag and reliableCheckpoint
flag disabled (false
).
checkpoint
Internal Method
1 2 3 4 5 |
checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] |
checkpoint
requests QueryExecution (of the Dataset
) to generate an RDD of internal binary rows (aka internalRdd
) and then requests the RDD to make a copy of all the rows (by adding a MapPartitionsRDD
).
Depending on reliableCheckpoint
flag, checkpoint
marks the RDD for (reliable) checkpointing (true
) or local checkpointing (false
).
With eager
flag on, checkpoint
counts the number of records in the RDD (by executing RDD.count
) that gives the effect of immediate eager checkpointing.
checkpoint
requests QueryExecution (of the Dataset
) for optimized physical query plan (the plan is used to get the outputPartitioning and outputOrdering for the result Dataset
).
In the end, checkpoint
creates a DataFrame with a new logical plan node for scanning data from an RDD of InternalRows (LogicalRDD
).
Note
|
checkpoint is used in the Dataset untyped transformations, i.e. checkpoint and localCheckpoint.
|
Persisting Dataset — persist
Basic Action
1 2 3 4 5 6 |
persist(): this.type persist(newLevel: StorageLevel): this.type |
persist
caches the Dataset
using the default storage level MEMORY_AND_DISK
or newLevel
and returns it.
Internally, persist
requests CacheManager
to cache the structured query (that is accessible through SharedState of the current SparkSession).
Caution
|
FIXME |
Generating RDD of Internal Binary Rows — rdd
Basic Action
1 2 3 4 5 |
rdd: RDD[T] |
Whenever you are in need to convert a Dataset
into a RDD
, executing rdd
method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset
.
1 2 3 4 5 6 |
scala> val rdd = tokens.rdd rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30 |
Internally, it looks ExpressionEncoder (for the Dataset
) up and accesses the deserializer
expression. That gives the DataType of the result of evaluating the expression.
Note
|
A deserializer expression is used to decode an InternalRow to an object of type T . See ExpressionEncoder.
|
It then executes a DeserializeToObject
logical operator that will produce a RDD[InternalRow]
that is converted into the proper RDD[T]
using the DataType
and T
.
Note
|
It is a lazy operation that “produces” a RDD[T] .
|
Accessing Schema — schema
Basic Action
A Dataset
has a schema.
1 2 3 4 5 |
schema: StructType |
Tip
|
You may also use the following methods to learn about the schema:
|
Converting Typed Dataset to Untyped DataFrame — toDF
Basic Action
1 2 3 4 5 6 |
toDF(): DataFrame toDF(colNames: String*): DataFrame |
Internally, the empty-argument toDF
creates a Dataset[Row]
using the Dataset
‘s SparkSession and QueryExecution with the encoder being RowEncoder.
Caution
|
FIXME Describe toDF(colNames: String*)
|
Unpersisting Cached Dataset — unpersist
Basic Action
1 2 3 4 5 6 |
unpersist(): this.type unpersist(blocking: Boolean): this.type |
unpersist
uncache the Dataset
possibly by blocking
the call.
Internally, unpersist
requests CacheManager
to uncache the query.
Caution
|
FIXME |
Accessing DataFrameWriter (to Describe Writing Dataset) — write
Basic Action
1 2 3 4 5 |
write: DataFrameWriter[T] |
write
gives DataFrameWriter for records of type T
.
1 2 3 4 5 6 7 |
import org.apache.spark.sql.{DataFrameWriter, Dataset} val ints: Dataset[Int] = (0 to 5).toDS val writer: DataFrameWriter[Int] = ints.write |