import org.apache.spark.sql.functions.rand
val nums = spark.range(5).withColumn("random", rand()).filter($"random" > 0.5)
scala> nums.show
+---+------------------+
| id| random|
+---+------------------+
| 0| 0.752877642067488|
| 1|0.5271005540026181|
+---+------------------+
scala> println(nums.queryExecution.toRdd.toDebugString)
(8) MapPartitionsRDD[7] at toRdd at <console>:27 []
| MapPartitionsRDD[6] at toRdd at <console>:27 []
| ParallelCollectionRDD[5] at toRdd at <console>:27 []
// Remember to set the checkpoint directory
scala> nums.checkpoint
org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext
at org.apache.spark.rdd.RDD.checkpoint(RDD.scala:1548)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:594)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:539)
... 49 elided
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
scala> println(checkpointDir)
file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0
val numsCheckpointed = nums.checkpoint
scala> println(numsCheckpointed.queryExecution.toRdd.toDebugString)
(8) MapPartitionsRDD[11] at toRdd at <console>:27 []
| MapPartitionsRDD[9] at checkpoint at <console>:26 []
| ReliableCheckpointRDD[10] at checkpoint at <console>:26 []
// Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO
// to see what happens while an RDD is checkpointed
// Let's use log4j API
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)
scala> nums.checkpoint
18/03/23 00:05:15 INFO ReliableRDDCheckpointData: Done checkpointing RDD 12 to file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0/rdd-12, new parent is RDD 13
res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]
// Save the schema as it is going to use to reconstruct nums dataset from a RDD
val schema = nums.schema
// Recover nums dataset from the checkpoint files
// Start from recovering the underlying RDD
// And create a Dataset based on the RDD
// Get the path to the checkpoint files of the checkpointed RDD of the Dataset
import org.apache.spark.sql.execution.LogicalRDD
val logicalRDD = numsCheckpointed.queryExecution.optimizedPlan.asInstanceOf[LogicalRDD]
val checkpointFiles = logicalRDD.rdd.getCheckpointFile.get
scala> println(checkpointFiles)
file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0/rdd-9
// SparkContext.checkpointFile is a `protected[spark]` method
// Use :paste -raw mode in Spark shell and define a helper object to "escape" the package lock-in
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package org.apache.spark
object my {
import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
sc.checkpointFile[T](path)
}
}
// Exiting paste mode, now interpreting.
// Make sure to use the same checkpoint directory
import org.apache.spark.my
import org.apache.spark.sql.catalyst.InternalRow
val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, checkpointFiles)
scala> :type numsRddRecovered
org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
// We have to convert RDD[InternalRow] to DataFrame
// Use :paste -raw again as we use `private[sql]` method
scala> :pa -raw
// Entering paste mode (ctrl-D to finish)
package org.apache.spark.sql
object my2 {
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
spark.internalCreateDataFrame(catalystRows, schema)
}
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.my2
val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema)
scala> numsRecovered.show
+---+------------------+
| id| random|
+---+------------------+
| 0| 0.752877642067488|
| 1|0.5271005540026181|
+---+------------------+