Dataset Checkpointing
Dataset Checkpointing is a feature of Spark SQL to truncate a logical query plan that could specifically be useful for highly iterative data algorithms (e.g. Spark MLlib that uses Spark SQL’s Dataset
API for data manipulation).
Note
|
Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS. Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a |
Checkpointing can be eager or lazy per eager
flag of checkpoint operator. Eager checkpointing is the default checkpointing and happens immediately when requested. Lazy checkpointing does not and will only happen when an action is executed.
Using Dataset checkpointing requires that you specify the checkpoint directory. The directory stores the checkpoint files for RDDs to be checkpointed. Use SparkContext.setCheckpointDir to set the path to a checkpoint directory.
Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable. Reliable checkpointing uses a reliable data storage like Hadoop HDFS.
Eager | Lazy | |
---|---|---|
Reliable |
||
Local |
A RDD can be recovered from a checkpoint files using SparkContext.checkpointFile. You can use SparkSession.internalCreateDataFrame method to (re)create the DataFrame from the RDD of internal binary rows.
Tip
|
Enable Add the following line to
Refer to Logging. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
import org.apache.spark.sql.functions.rand val nums = spark.range(5).withColumn("random", rand()).filter($"random" > 0.5) scala> nums.show +---+------------------+ | id| random| +---+------------------+ | 0| 0.752877642067488| | 1|0.5271005540026181| +---+------------------+ scala> println(nums.queryExecution.toRdd.toDebugString) (8) MapPartitionsRDD[7] at toRdd at <console>:27 [] | MapPartitionsRDD[6] at toRdd at <console>:27 [] | ParallelCollectionRDD[5] at toRdd at <console>:27 [] // Remember to set the checkpoint directory scala> nums.checkpoint org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext at org.apache.spark.rdd.RDD.checkpoint(RDD.scala:1548) at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:594) at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:539) ... 49 elided spark.sparkContext.setCheckpointDir("/tmp/checkpoints") val checkpointDir = spark.sparkContext.getCheckpointDir.get scala> println(checkpointDir) file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0 val numsCheckpointed = nums.checkpoint scala> println(numsCheckpointed.queryExecution.toRdd.toDebugString) (8) MapPartitionsRDD[11] at toRdd at <console>:27 [] | MapPartitionsRDD[9] at checkpoint at <console>:26 [] | ReliableCheckpointRDD[10] at checkpoint at <console>:26 [] // Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO // to see what happens while an RDD is checkpointed // Let's use log4j API import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO) scala> nums.checkpoint 18/03/23 00:05:15 INFO ReliableRDDCheckpointData: Done checkpointing RDD 12 to file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0/rdd-12, new parent is RDD 13 res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double] // Save the schema as it is going to use to reconstruct nums dataset from a RDD val schema = nums.schema // Recover nums dataset from the checkpoint files // Start from recovering the underlying RDD // And create a Dataset based on the RDD // Get the path to the checkpoint files of the checkpointed RDD of the Dataset import org.apache.spark.sql.execution.LogicalRDD val logicalRDD = numsCheckpointed.queryExecution.optimizedPlan.asInstanceOf[LogicalRDD] val checkpointFiles = logicalRDD.rdd.getCheckpointFile.get scala> println(checkpointFiles) file:/tmp/checkpoints/b1f413dc-3eaf-46a0-99de-d795252035e0/rdd-9 // SparkContext.checkpointFile is a `protected[spark]` method // Use :paste -raw mode in Spark shell and define a helper object to "escape" the package lock-in scala> :paste -raw // Entering paste mode (ctrl-D to finish) package org.apache.spark object my { import scala.reflect.ClassTag import org.apache.spark.rdd.RDD def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = { sc.checkpointFile[T](path) } } // Exiting paste mode, now interpreting. // Make sure to use the same checkpoint directory import org.apache.spark.my import org.apache.spark.sql.catalyst.InternalRow val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, checkpointFiles) scala> :type numsRddRecovered org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] // We have to convert RDD[InternalRow] to DataFrame // Use :paste -raw again as we use `private[sql]` method scala> :pa -raw // Entering paste mode (ctrl-D to finish) package org.apache.spark.sql object my2 { import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = { spark.internalCreateDataFrame(catalystRows, schema) } } // Exiting paste mode, now interpreting. import org.apache.spark.sql.my2 val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema) scala> numsRecovered.show +---+------------------+ | id| random| +---+------------------+ | 0| 0.752877642067488| | 1|0.5271005540026181| +---+------------------+ |
Specyfing Checkpoint Directory — SparkContext.setCheckpointDir
Method
1 2 3 4 5 |
SparkContext.setCheckpointDir(directory: String) |
setCheckpointDir
sets the checkpoint directory.
Internally, setCheckpointDir
…FIXME
Recovering RDD From Checkpoint Files — SparkContext.checkpointFile
Method
1 2 3 4 5 |
SparkContext.checkpointFile(directory: String) |
checkpointFile
reads (recovers) a RDD from a checkpoint directory.
Note
|
SparkContext.checkpointFile is a protected[spark] method so the code to access it has to be in org.apache.spark package.
|
Internally, checkpointFile
creates a ReliableCheckpointRDD
in a scope.