HadoopRDD
HadoopRDD is an RDD that provides core functionality for reading data stored in HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI using the older MapReduce API (org.apache.hadoop.mapred).
HadoopRDD is created as a result of calling the following methods in SparkContext:
-
hadoopFile
-
textFile
(the most often used in examples!) -
sequenceFile
Partitions are of type HadoopPartition
.
When an HadoopRDD is computed, i.e. an action is called, you should see the INFO message Input split:
in the logs.
1 2 3 4 5 6 7 8 9 |
scala> sc.textFile("README.md").count ... 15/10/10 18:03:21 INFO HadoopRDD: Input split: file:/Users/jacek/dev/oss/spark/README.md:0+1784 15/10/10 18:03:21 INFO HadoopRDD: Input split: file:/Users/jacek/dev/oss/spark/README.md:1784+1784 ... |
The following properties are set upon partition execution:
-
mapred.tip.id – task id of this task’s attempt
-
mapred.task.id – task attempt’s id
-
mapred.task.is.map as
true
-
mapred.task.partition – split id
-
mapred.job.id
Spark settings for HadoopRDD
:
-
spark.hadoop.cloneConf (default:
false
) – shouldCloneJobConf – should a Hadoop job configurationJobConf
object be cloned before spawning a Hadoop job. Refer to [SPARK-2546] Configuration object thread safety issue. Whentrue
, you should see a DEBUG messageCloning Hadoop Configuration
.
You can register callbacks on TaskContext.
HadoopRDDs are not checkpointed. They do nothing when checkpoint()
is called.
Caution
|
|
getPreferredLocations
Method
Caution
|
FIXME |
getPartitions
Method
The number of partition for HadoopRDD, i.e. the return value of getPartitions
, is calculated using InputFormat.getSplits(jobConf, minPartitions)
where minPartitions
is only a hint of how many partitions one may want at minimum. As a hint it does not mean the number of partitions will be exactly the number given.
For SparkContext.textFile
the input format class is org.apache.hadoop.mapred.TextInputFormat.
FileInputFormat is the base class for all file-based InputFormats. This provides a generic implementation of getSplits(JobConf, int). Subclasses of FileInputFormat can also override the isSplitable(FileSystem, Path) method to ensure input-files are not split-up and are processed as a whole by Mappers.
Tip
|
You may find the sources of org.apache.hadoop.mapred.FileInputFormat.getSplits enlightening. |