DataSource — Pluggable Data Provider Framework
DataSource
is one of the main parts of Data Source API in Spark SQL (together with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider
for creating streaming sources).
DataSource
models a pluggable data provider framework with the extension points for Spark SQL integrators to expand the list of supported external data sources in Spark SQL.
DataSource
takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:
-
The location URI of a HiveTableRelation (when
HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation) -
The table name of a UnresolvedRelation (when ResolveSQLOnFile logical evaluation rule is executed)
-
The files in a directory when Spark Structured Streaming’s
FileStreamSource
is requested for batches
DataSource
is created when:
-
DataFrameWriter
is requested to save to a data source (per Data Source V1 contract) -
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
-
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand, CreateTempViewUsing are executed
-
HiveMetastoreCatalog
is requested to convertToLogicalRelation -
Spark Structured Streaming’s
FileStreamSource
,DataStreamReader
andDataStreamWriter
Extension Point | Description |
---|---|
Data source that saves the result of a structured query per save mode and returns the schema |
|
|
|
Data source that supports schema inference and can be accessed using SQL’s |
|
|
|
As a user, you interact with DataSource
by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// Batch reading val people: DataFrame = spark.read .format("csv") .load("people.csv") // Streamed reading val messages: DataFrame = spark.readStream .format("kafka") .option("subscribe", "topic") .option("kafka.bootstrap.servers", "localhost:9092") .load |
DataSource
uses a SparkSession, a class name, a collection of paths
, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.
Note
|
Data source is also called a table provider. |
When requested to resolve a batch (non-streaming) FileFormat, DataSource
creates a HadoopFsRelation with the optional bucketing specification.
Writing Data to Data Source per Save Mode Followed by Reading Rows Back (as BaseRelation) — writeAndRead
Method
1 2 3 4 5 |
writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation |
Caution
|
FIXME |
Note
|
writeAndRead is used exclusively when CreateDataSourceTableAsSelectCommand logical command is executed.
|
Writing DataFrame to Data Source Per Save Mode — write
Method
1 2 3 4 5 |
write(mode: SaveMode, data: DataFrame): BaseRelation |
write
writes the result of executing a structured query (as DataFrame) to a data source per save mode
.
Internally, write
looks up the data source and branches off per providingClass.
providingClass | Description |
---|---|
others |
Reports a |
Note
|
write does not support the internal CalendarIntervalType in the schema of data DataFrame and throws a AnalysisException when there is one.
|
Note
|
write is used exclusively when SaveIntoDataSourceCommand is executed.
|
writeInFileFormat
Internal Method
Caution
|
FIXME |
For FileFormat data sources, write
takes all paths
and path
option and makes sure that there is only one.
Note
|
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.
|
write
requests PartitioningUtils
to validatePartitionColumn.
When appending to a table, …FIXME
In the end, write
(for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand
logical plan with executes it.
Caution
|
FIXME Is toRdd a job execution?
|
createSink
Method
Caution
|
FIXME |
sourceSchema
Internal Method
1 2 3 4 5 |
sourceSchema(): SourceInfo |
sourceSchema
returns the name and schema of the data source for streamed reading.
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider
data sources.
Internally, sourceSchema
first creates an instance of the data source and…
Caution
|
FIXME Finish… |
For Structured Streaming’s StreamSourceProvider
data sources, sourceSchema
relays calls to StreamSourceProvider.sourceSchema
.
For FileFormat data sources, sourceSchema
makes sure that path
option was specified.
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path , though.
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException
exception in the logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> spark.read.load("the.file.does.not.exist.parquet") org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132) ... 48 elided |
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema
is not specified, the following IllegalArgumentException
exception is thrown:
1 2 3 4 5 |
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it. |
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo
with FileSource[path]
and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException
exception:
1 2 3 4 5 |
Data source [className] does not support streamed reading |
Note
|
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.
|
inferFileFormatSchema
Internal Method
1 2 3 4 5 |
inferFileFormatSchema(format: FileFormat): StructType |
inferFileFormatSchema
private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema
if specified or uses FileFormat.inferSchema
. It throws a AnalysisException
when is unable to infer schema.
It uses path
option for the list of directory paths.
Note
|
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed. |
Resolving Relation (Creating BaseRelation) — resolveRelation
Method
1 2 3 4 5 |
resolveRelation(checkFilesExist: Boolean = true): BaseRelation |
resolveRelation
resolves (i.e. creates) a BaseRelation.
Internally, resolveRelation
tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.
Provider | Behaviour |
---|---|
Executes SchemaRelationProvider.createRelation with the provided schema |
|
Executes RelationProvider.createRelation |
|
Creates a HadoopFsRelation |
Note
|
|
buildStorageFormatFromOptions
Method
1 2 3 4 5 |
buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat |
buildStorageFormatFromOptions
…FIXME
Note
|
buildStorageFormatFromOptions is used when…FIXME
|
Creating DataSource Instance
DataSource
takes the following when created:
-
(optional) User-specified schema (default:
None
, i.e. undefined) -
Optional bucketing specification (default:
None
) -
(optional) CatalogTable (default:
None
)
DataSource
initializes the internal registries and counters.
Looking Up Class By Name Of Data Source Provider — lookupDataSource
Method
1 2 3 4 5 |
lookupDataSource(provider: String, conf: SQLConf): Class[_] |
lookupDataSource
looks up the class name in the backwardCompatibilityMap and then replaces the class name exclusively for the orc
provider per spark.sql.orc.impl internal configuration property:
-
For
hive
(default),lookupDataSource
usesorg.apache.spark.sql.hive.orc.OrcFileFormat
-
For
native
,lookupDataSource
uses the canonical class name of OrcFileFormat, i.e.org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
With the provider’s class name (aka provider1 internally) lookupDataSource
assumes another name variant of format [provider1].DefaultSource
(aka provider2 internally).
lookupDataSource
then uses Java’s ServiceLoader to find all DataSourceRegister provider classes on the CLASSPATH.
lookupDataSource
filters out the DataSourceRegister
provider classes (by their alias) that match the provider1 (case-insensitive), e.g. parquet
or kafka
.
If a single provider class was found for the alias, lookupDataSource
simply returns the provider class.
If no DataSourceRegister
could be found by the short name (alias), lookupDataSource
considers the names of the format provider as the fully-qualified class names and tries to load them instead (using Java’s ClassLoader.loadClass).
Note
|
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or a fully-qualified class name.
|
Caution
|
FIXME Describe the other cases (orc and avro) |
If no provider class could be found, lookupDataSource
throws a RuntimeException
:
1 2 3 4 5 |
java.lang.ClassNotFoundException: Failed to find data source: [provider1]. Please find packages at http://spark.apache.org/third-party-projects.html |
If however, lookupDataSource
found multiple registered aliases for the provider name…FIXME
Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources) — planForWriting
Method
1 2 3 4 5 |
planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan |
planForWriting
creates an instance of the providingClass and branches off per its type as follows:
-
For a CreatableRelationProvider,
planForWriting
creates a SaveIntoDataSourceCommand (with the inputdata
andmode
, theCreatableRelationProvider
data source and the caseInsensitiveOptions) -
For a FileFormat,
planForWriting
planForWritingFileFormat (with theFileFormat
format and the inputmode
anddata
) -
For other types,
planForWriting
simply throws aRuntimeException
:12345[providingClass] does not allow create table as select.
Note
|
|
Planning for Writing (Using FileFormat) — planForWritingFileFormat
Internal Method
1 2 3 4 5 6 7 8 |
planForWritingFileFormat( format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand |
planForWritingFileFormat
takes the paths and the path
option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
Note
|
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).
|
planForWritingFileFormat
uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.
In the end, planForWritingFileFormat
returns a new InsertIntoHadoopFsRelationCommand.
When the number of the paths is different than 1
, planForWritingFileFormat
throws an IllegalArgumentException
:
1 2 3 4 5 |
Expected exactly one path to be specified, but got: [allPaths] |
Note
|
planForWritingFileFormat is used when DataSource is requested to write data to a data source per save mode followed by reading rows back (when CreateDataSourceTableAsSelectCommand logical command is executed) and for the logical command for writing.
|
getOrInferFileFormatSchema
Internal Method
1 2 3 4 5 6 7 |
getOrInferFileFormatSchema( format: FileFormat, fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) |
getOrInferFileFormatSchema
…FIXME
Note
|
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolveRelation.
|