Custom Data Source Formats
|
Caution
|
FIXME |
See spark-mf-format project at GitHub for a complete solution.
|
Caution
|
FIXME |
See spark-mf-format project at GitHub for a complete solution.
DataSource is one of the main parts of Data Source API in Spark SQL (together with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider for creating streaming sources).
DataSource models a pluggable data provider framework with the extension points for Spark SQL integrators to expand the list of supported external data sources in Spark SQL.
DataSource takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:
The location URI of a HiveTableRelation (when HiveMetastoreCatalog is requested to convert a HiveTableRelation to a LogicalRelation)
The table name of a UnresolvedRelation (when ResolveSQLOnFile logical evaluation rule is executed)
The files in a directory when Spark Structured Streaming’s FileStreamSource is requested for batches
DataSource is created when:
DataFrameWriter is requested to save to a data source (per Data Source V1 contract)
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand, CreateTempViewUsing are executed
HiveMetastoreCatalog is requested to convertToLogicalRelation
Spark Structured Streaming’s FileStreamSource, DataStreamReader and DataStreamWriter
| Extension Point | Description |
|---|---|
|
Data source that saves the result of a structured query per save mode and returns the schema |
|
|
|
|
Data source that supports schema inference and can be accessed using SQL’s |
|
|
|
|
As a user, you interact with DataSource by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// Batch reading val people: DataFrame = spark.read .format("csv") .load("people.csv") // Streamed reading val messages: DataFrame = spark.readStream .format("kafka") .option("subscribe", "topic") .option("kafka.bootstrap.servers", "localhost:9092") .load |
DataSource uses a SparkSession, a class name, a collection of paths, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.
|
Note
|
Data source is also called a table provider. |
When requested to resolve a batch (non-streaming) FileFormat, DataSource creates a HadoopFsRelation with the optional bucketing specification.
writeAndRead Method|
1 2 3 4 5 |
writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation |
|
Caution
|
FIXME |
|
Note
|
writeAndRead is used exclusively when CreateDataSourceTableAsSelectCommand logical command is executed.
|
write Method|
1 2 3 4 5 |
write(mode: SaveMode, data: DataFrame): BaseRelation |
write writes the result of executing a structured query (as DataFrame) to a data source per save mode.
Internally, write looks up the data source and branches off per providingClass.
| providingClass | Description |
|---|---|
|
others |
Reports a |
|
Note
|
write does not support the internal CalendarIntervalType in the schema of data DataFrame and throws a AnalysisException when there is one.
|
|
Note
|
write is used exclusively when SaveIntoDataSourceCommand is executed.
|
writeInFileFormat Internal Method|
Caution
|
FIXME |
For FileFormat data sources, write takes all paths and path option and makes sure that there is only one.
|
Note
|
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.
|
write requests PartitioningUtils to validatePartitionColumn.
When appending to a table, …FIXME
In the end, write (for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand logical plan with executes it.
|
Caution
|
FIXME Is toRdd a job execution?
|
createSink Method|
Caution
|
FIXME |
sourceSchema Internal Method|
1 2 3 4 5 |
sourceSchema(): SourceInfo |
sourceSchema returns the name and schema of the data source for streamed reading.
|
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider data sources.
Internally, sourceSchema first creates an instance of the data source and…
|
Caution
|
FIXME Finish… |
For Structured Streaming’s StreamSourceProvider data sources, sourceSchema relays calls to StreamSourceProvider.sourceSchema.
For FileFormat data sources, sourceSchema makes sure that path option was specified.
|
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path, though.
|
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException exception in the logs:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> spark.read.load("the.file.does.not.exist.parquet") org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132) ... 48 elided |
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema is not specified, the following IllegalArgumentException exception is thrown:
|
1 2 3 4 5 |
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it. |
|
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo with FileSource[path] and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException exception:
|
1 2 3 4 5 |
Data source [className] does not support streamed reading |
|
Note
|
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.
|
inferFileFormatSchema Internal Method|
1 2 3 4 5 |
inferFileFormatSchema(format: FileFormat): StructType |
inferFileFormatSchema private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema if specified or uses FileFormat.inferSchema. It throws a AnalysisException when is unable to infer schema.
It uses path option for the list of directory paths.
|
Note
|
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed. |
resolveRelation Method|
1 2 3 4 5 |
resolveRelation(checkFilesExist: Boolean = true): BaseRelation |
resolveRelation resolves (i.e. creates) a BaseRelation.
Internally, resolveRelation tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.
| Provider | Behaviour |
|---|---|
|
Executes SchemaRelationProvider.createRelation with the provided schema |
|
|
Executes RelationProvider.createRelation |
|
|
Creates a HadoopFsRelation |
|
Note
|
|
buildStorageFormatFromOptions Method|
1 2 3 4 5 |
buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat |
buildStorageFormatFromOptions…FIXME
|
Note
|
buildStorageFormatFromOptions is used when…FIXME
|
DataSource takes the following when created:
(optional) User-specified schema (default: None, i.e. undefined)
Optional bucketing specification (default: None)
(optional) CatalogTable (default: None)
DataSource initializes the internal registries and counters.
lookupDataSource Method|
1 2 3 4 5 |
lookupDataSource(provider: String, conf: SQLConf): Class[_] |
lookupDataSource looks up the class name in the backwardCompatibilityMap and then replaces the class name exclusively for the orc provider per spark.sql.orc.impl internal configuration property:
For hive (default), lookupDataSource uses org.apache.spark.sql.hive.orc.OrcFileFormat
For native, lookupDataSource uses the canonical class name of OrcFileFormat, i.e. org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
With the provider’s class name (aka provider1 internally) lookupDataSource assumes another name variant of format [provider1].DefaultSource (aka provider2 internally).
lookupDataSource then uses Java’s ServiceLoader to find all DataSourceRegister provider classes on the CLASSPATH.
lookupDataSource filters out the DataSourceRegister provider classes (by their alias) that match the provider1 (case-insensitive), e.g. parquet or kafka.
If a single provider class was found for the alias, lookupDataSource simply returns the provider class.
If no DataSourceRegister could be found by the short name (alias), lookupDataSource considers the names of the format provider as the fully-qualified class names and tries to load them instead (using Java’s ClassLoader.loadClass).
|
Note
|
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or a fully-qualified class name.
|
|
Caution
|
FIXME Describe the other cases (orc and avro) |
If no provider class could be found, lookupDataSource throws a RuntimeException:
|
1 2 3 4 5 |
java.lang.ClassNotFoundException: Failed to find data source: [provider1]. Please find packages at http://spark.apache.org/third-party-projects.html |
If however, lookupDataSource found multiple registered aliases for the provider name…FIXME
planForWriting Method|
1 2 3 4 5 |
planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan |
planForWriting creates an instance of the providingClass and branches off per its type as follows:
For a CreatableRelationProvider, planForWriting creates a SaveIntoDataSourceCommand (with the input data and mode, the CreatableRelationProvider data source and the caseInsensitiveOptions)
For a FileFormat, planForWriting planForWritingFileFormat (with the FileFormat format and the input mode and data)
For other types, planForWriting simply throws a RuntimeException:
|
1 2 3 4 5 |
[providingClass] does not allow create table as select. |
|
Note
|
|
planForWritingFileFormat Internal Method|
1 2 3 4 5 6 7 8 |
planForWritingFileFormat( format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand |
planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
|
Note
|
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).
|
planForWritingFileFormat uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.
In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.
When the number of the paths is different than 1, planForWritingFileFormat throws an IllegalArgumentException:
|
1 2 3 4 5 |
Expected exactly one path to be specified, but got: [allPaths] |
|
Note
|
planForWritingFileFormat is used when DataSource is requested to write data to a data source per save mode followed by reading rows back (when CreateDataSourceTableAsSelectCommand logical command is executed) and for the logical command for writing.
|
getOrInferFileFormatSchema Internal Method|
1 2 3 4 5 6 7 |
getOrInferFileFormatSchema( format: FileFormat, fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) |
getOrInferFileFormatSchema…FIXME
|
Note
|
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolveRelation.
|
HiveUtils is used to create a HiveClientImpl that HiveExternalCatalog uses to interact with a Hive metastore.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
newClientForMetadata Method|
1 2 3 4 5 6 7 8 9 10 11 |
newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient (1) newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient |
Executes the other newClientForMetadata with time configurations formatted
Internally, newClientForMetadata creates a new SQLConf with spark.sql properties only (from the input SparkConf).
newClientForMetadata then creates a IsolatedClientLoader per the input parameters and the following configuration properties:
You should see one of the following INFO messages in the logs:
|
1 2 3 4 5 6 7 |
Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using Spark classes. Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using maven. Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using [jars] |
In the end, newClientForMetadata requests the IsolatedClientLoader to create a HiveClientImpl.
|
Note
|
newClientForMetadata is used exclusively when HiveExternalCatalog is requested for a HiveClient.
|
HiveClientImpl is the only available HiveClient in Spark SQL that does/uses…FIXME
HiveClientImpl is created exclusively when IsolatedClientLoader is requested to create a new Hive client. When created, HiveClientImpl is given the location of the default database for the Hive metastore warehouse (i.e. warehouseDir that is the value of hive.metastore.warehouse.dir Hive-specific Hadoop configuration property).
|
Note
|
The location of the default database for the Hive metastore warehouse is /user/hive/warehouse by default.
|
|
Note
|
You may be interested in SPARK-19664 put ‘hive.metastore.warehouse.dir’ in hadoopConf place if you use Spark before 2.1 (which you should not really as it is not supported anymore). |
|
Note
|
The Hadoop configuration is what HiveExternalCatalog was given when created (which is the default Hadoop configuration from Spark Core’s SparkContext.hadoopConfiguration with the Spark properties with spark.hadoop prefix).
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
renamePartitions Method|
1 2 3 4 5 6 7 8 9 |
renamePartitions( db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit |
|
Note
|
renamePartitions is part of HiveClient Contract to…FIXME.
|
renamePartitions…FIXME
alterPartitions Method|
1 2 3 4 5 6 7 8 |
alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit |
|
Note
|
alterPartitions is part of HiveClient Contract to…FIXME.
|
alterPartitions…FIXME
getPartitions Method|
1 2 3 4 5 6 7 |
getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] |
|
Note
|
getPartitions is part of HiveClient Contract to…FIXME.
|
getPartitions…FIXME
getPartitionsByFilter Method|
1 2 3 4 5 6 7 |
getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] |
|
Note
|
getPartitionsByFilter is part of HiveClient Contract to…FIXME.
|
getPartitionsByFilter…FIXME
getPartitionOption Method|
1 2 3 4 5 6 7 |
getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] |
|
Note
|
getPartitionOption is part of HiveClient Contract to…FIXME.
|
getPartitionOption…FIXME
HiveClientImpl takes the following when created:
HiveClientImpl initializes the internal registries and counters.
getTableOption Method|
1 2 3 4 5 |
def getTableOption(dbName: String, tableName: String): Option[CatalogTable] |
|
Note
|
getTableOption is part of HiveClient Contract to…FIXME.
|
When executed, getTableOption prints out the following DEBUG message to the logs:
|
1 2 3 4 5 |
Looking up [dbName].[tableName] |
getTableOption requests Hive client to retrieve the metadata of the table and creates a CatalogTable.
readHiveStats Internal Method|
1 2 3 4 5 |
readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] |
readHiveStats creates a CatalogStatistics from the input Hive table or partition parameters (if available and greater than 0).
| Hive Parameter | Table Statistics |
|---|---|
|
|
|
|
|
|
|
|
|
Note
|
totalSize Hive parameter has a higher precedence over rawDataSize for sizeInBytes table statistic.
|
|
Note
|
readHiveStats is used when HiveClientImpl is requested for the metadata of a table or table partition.
|
fromHivePartition Method|
1 2 3 4 5 |
fromHivePartition(hp: HivePartition): CatalogTablePartition |
fromHivePartition simply creates a CatalogTablePartition with the following:
spec from Hive’s Partition.getSpec if available
storage from Hive’s StorageDescriptor of the table partition
parameters from Hive’s Partition.getParameters if available
stats from Hive’s Partition.getParameters if available and converted to table statistics format
|
Note
|
fromHivePartition is used when HiveClientImpl is requested for getPartitionOption, getPartitions and getPartitionsByFilter.
|
toHiveTable Method|
1 2 3 4 5 |
toHiveTable(table: CatalogTable, userName: Option[String] = None): HiveTable |
toHiveTable simply creates a new Hive Table and copies the properties from the input CatalogTable.
|
Note
|
|
HiveClient is the contract for…FIXME
|
Note
|
HiveClientImpl is the only available HiveClient in Spark SQL.
|
HiveClient offers safe variants of many methods that do not report exceptions when a relational entity is not found in a Hive metastore, e.g. getTableOption for getTable.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package org.apache.spark.sql.hive.client trait HiveClient { // only required methods that have no implementation // FIXME List of the methods def alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit def getTableOption(dbName: String, tableName: String): Option[CatalogTable] def getPartitions( catalogTable: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] def getPartitionsByFilter( catalogTable: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] def renamePartitions( db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit } |
|
Note
|
HiveClient is a private[hive] contract.
|
| Method | Description | ||||
|---|---|---|---|---|---|
|
|
|||||
|
|
Returns the CatalogTablePartition of a table Used exclusively when |
||||
|
|
|||||
|
|
|||||
|
|
Retrieves a table metadata if available Used exclusively when
|
||||
|
|
getTable Method|
1 2 3 4 5 |
getTable(dbName: String, tableName: String): CatalogTable |
getTable retrieves the metadata of a table in a Hive metastore if available or reports a NoSuchTableException.
|
Note
|
|
HiveFileFormat is a FileFormat for writing Hive tables.
HiveFileFormat is a DataSourceRegister and registers itself as hive data source.
|
Note
|
Hive data source can only be used with tables and you cannot read or write files of Hive data source directly. Use DataFrameReader.table or DataFrameWriter.saveAsTable for loading from or writing data to Hive data source, respectively. |
HiveFileFormat is created exclusively when SaveAsHiveFile is requested to saveAsHiveFile (when InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands are executed).
HiveFileFormat takes a FileSinkDesc when created.
HiveFileFormat throws a UnsupportedOperationException when requested to inferSchema.
|
1 2 3 4 5 |
inferSchema is not supported for hive data source. |
prepareWrite Method|
1 2 3 4 5 6 7 8 9 |
prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory |
|
Note
|
prepareWrite is part of the FileFormat Contract to prepare a write job.
|
prepareWrite sets the mapred.output.format.class property to be the getOutputFileFormatClassName of the Hive TableDesc of the FileSinkDesc.
prepareWrite requests the HiveTableUtil helper object to configureJobPropertiesForStorageHandler.
prepareWrite requests the Hive Utilities helper object to copyTableJobPropertiesToConf.
In the end, prepareWrite creates a new OutputWriterFactory that creates a new HiveOutputWriter when requested for a new OutputWriter instance.
|
Caution
|
FIXME |
|
Caution
|
FIXME |
|
Tip
|
Read about Spark SQL CLI in Spark’s official documentation in Running the Spark SQL CLI. |
|
1 2 3 4 5 6 7 |
spark-sql> describe function `<>`; Function: <> Usage: a <> b - Returns TRUE if a is not equal to b |
|
Tip
|
Functions are registered in FunctionRegistry. |
|
1 2 3 4 5 |
spark-sql> show functions; |
|
1 2 3 4 5 |
spark-sql> explain extended show tables; |
Spark SQL uses a Hive metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access).
A Hive metastore warehouse (aka spark-warehouse) is the directory where Spark SQL persists tables whereas a Hive metastore (aka metastore_db) is a relational database to manage the metadata of the persistent relational entities, e.g. databases, tables, columns, partitions.
By default, Spark SQL uses the embedded deployment mode of a Hive metastore with a Apache Derby database.
|
Important
|
The default embedded deployment mode is not recommended for production use due to limitation of only one active SparkSession at a time. Read Cloudera’s Configuring the Hive Metastore for CDH document that explains the available deployment modes of a Hive metastore. |
When SparkSession is created with Hive support the external catalog (aka metastore) is HiveExternalCatalog. HiveExternalCatalog uses spark.sql.warehouse.dir directory for the location of the databases and javax.jdo.option properties for the connection to the Hive metastore database.
|
Note
|
The metadata of relational entities is persisted in a metastore database over JDBC and DataNucleus AccessPlatform that uses javax.jdo.option properties. Read Hive Metastore Administration to learn how to manage a Hive Metastore. |
| Name | Description | ||
|---|---|---|---|
|
The JDBC connection URL of a Hive metastore database to use
|
|||
|
The JDBC driver of a Hive metastore database to use
|
|||
|
The user name to use to connect to a Hive metastore database |
|||
|
The password to use to connect to a Hive metastore database |
You can configure javax.jdo.option properties in hive-site.xml or using options with spark.hadoop prefix.
You can access the current connection properties for a Hive metastore in a Spark SQL application using the Spark internal classes.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> :type spark org.apache.spark.sql.SparkSession scala> spark.sharedState.externalCatalog res1: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@79dd79eb // Use `:paste -raw` to paste the following code // This is to pass the private[spark] "gate" // BEGIN package org.apache.spark import org.apache.spark.sql.SparkSession object jacek { def open(spark: SparkSession) = { import org.apache.spark.sql.hive.HiveExternalCatalog spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client } } // END import org.apache.spark.jacek val hiveClient = jacek.open(spark) scala> hiveClient.getConf("javax.jdo.option.ConnectionURL", "") res2: String = jdbc:derby:;databaseName=metastore_db;create=true |
The benefits of using an external Hive metastore:
Allow multiple Spark applications (sessions) to access it concurrently
Allow a single Spark application to use table statistics without running “ANALYZE TABLE” every execution
|
Note
|
As of Spark 2.2 (see SPARK-18112 Spark2.x does not support read data from Hive 2.x metastore) Spark SQL supports reading data from Hive 2.1.1 metastore. |
|
Caution
|
FIXME Describe hive-site.xml vs config method vs --conf with spark.hadoop prefix.
|
Spark SQL uses the Hive-specific configuration properties that further fine-tune the Hive integration, e.g. spark.sql.hive.metastore.version or spark.sql.hive.metastore.jars.
spark.sql.warehouse.dir Configuration Propertyspark.sql.warehouse.dir is a static configuration property that sets Hive’s hive.metastore.warehouse.dir property, i.e. the location of the Hive local/embedded metastore database (using Derby).
|
Tip
|
Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive. See also the official Hive Metastore Administration document. |
In order to use an external Hive metastore you should do the following:
Enable Hive support in SparkSession (that makes sure that the Hive classes are on CLASSPATH and sets spark.sql.catalogImplementation internal configuration property to hive)
spark.sql.warehouse.dir required?
Define hive.metastore.warehouse.dir in hive-site.xml configuration resource
Check out warehousePath
Execute ./bin/run-example sql.hive.SparkHiveExample to verify Hive configuration
When not configured by the hive-site.xml, SparkSession automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started.
|
Note
|
You may need to grant write privilege to the user who starts the Spark application. |
| Name | Description | ||
|---|---|---|---|
|
The Thrift URI of a remote Hive metastore, i.e. one that is in a separate JVM process or on a remote node
|
|||
|
|
|||
|
Set to |
You may also want to use the following Hive configuration properties that (seem to) cause exceptions with an empty metastore database as of Hive 2.1.
datanucleus.schema.autoCreateAll set to true
|
Caution
|
FIXME Describe the purpose of spark.hadoop.* properties
|
You can specify any of the Hadoop configuration properties, e.g. hive.metastore.warehouse.dir with spark.hadoop prefix.
|
1 2 3 4 5 6 7 8 9 10 |
$ spark-shell --conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hive-warehouse ... scala> spark.sharedState 18/01/08 10:46:19 INFO SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/tmp/hive-warehouse'). 18/01/08 10:46:19 INFO SharedState: Warehouse path is '/tmp/hive-warehouse'. res1: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState@5a69b3cf |
hive-site.xml configures Hive clients (e.g. Spark SQL) with the Hive Metastore configuration.
hive-site.xml is loaded when SharedState is created (which is…FIXME).
Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration),
and hdfs-site.xml (for HDFS configuration) file in conf/ (that is automatically added to the CLASSPATH of a Spark application).
|
Tip
|
You can use --driver-class-path or spark.driver.extraClassPath to point to the directory with configuration resources, e.g. hive-site.xml.
|
|
1 2 3 4 5 6 7 8 9 10 11 |
<configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/tmp/hive-warehouse</value> <description>Hive Metastore location</description> </property> </configuration> |
|
Tip
|
Read Resources section in Hadoop’s Configuration javadoc to learn more about configuration resources. |
|
Tip
|
Use
Enable
|
The following steps are for Hive and Hadoop 2.7.5.
|
1 2 3 4 5 6 7 8 9 10 11 |
$ ./bin/hdfs version Hadoop 2.7.5 Subversion https://shv@git-wip-us.apache.org/repos/asf/hadoop.git -r 18065c2b6806ed4aa6a3187d77cbe21bb3dba075 Compiled by kshvachk on 2017-12-16T01:06Z Compiled with protoc 2.5.0 From source with checksum 9f118f95f47043332d51891e37f736e9 This command was run using /Users/jacek/dev/apps/hadoop-2.7.5/share/hadoop/common/hadoop-common-2.7.5.jar |
|
Tip
|
Read the section Pseudo-Distributed Operation about how to run Hadoop HDFS “on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.” |
|
Tip
|
Use
Use
|
Edit etc/hadoop/core-site.xml to add the following:
|
1 2 3 4 5 6 7 8 9 10 |
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration> |
./bin/hdfs namenode -format right after you’ve installed Hadoop and before starting any HDFS services (NameNode in particular)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
$ ./bin/hdfs namenode -format 18/01/09 15:48:28 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = japila.local/192.168.1.2 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 2.7.5 ... 18/01/09 15:48:28 INFO namenode.NameNode: createNameNode [-format] ... Formatting using clusterid: CID-bfdc81da-6941-4a93-8371-2c254d503a97 ... 18/01/09 15:48:29 INFO common.Storage: Storage directory /tmp/hadoop-jacek/dfs/name has been successfully formatted. 18/01/09 15:48:29 INFO namenode.FSImageFormatProtobuf: Saving image file /tmp/hadoop-jacek/dfs/name/current/fsimage.ckpt_0000000000000000000 using no compression 18/01/09 15:48:29 INFO namenode.FSImageFormatProtobuf: Image file /tmp/hadoop-jacek/dfs/name/current/fsimage.ckpt_0000000000000000000 of size 322 bytes saved in 0 seconds. 18/01/09 15:48:29 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 18/01/09 15:48:29 INFO util.ExitUtil: Exiting with status 0 |
|
Note
|
Use
|
Start Hadoop HDFS using ./sbin/start-dfs.sh (and tail -f logs/hadoop-*-datanode-*.log)
|
1 2 3 4 5 6 7 8 9 10 |
$ ./sbin/start-dfs.sh Starting namenodes on [localhost] localhost: starting namenode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-namenode-japila.local.out localhost: starting datanode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-datanode-japila.local.out Starting secondary namenodes [0.0.0.0] 0.0.0.0: starting secondarynamenode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-secondarynamenode-japila.local.out |
Use jps -lm to list Hadoop’s JVM processes.
|
1 2 3 4 5 6 7 8 |
$ jps -lm 26576 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode 26468 org.apache.hadoop.hdfs.server.datanode.DataNode 26381 org.apache.hadoop.hdfs.server.namenode.NameNode |
Create hive-site.xml in $SPARK_HOME/conf with the following:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
<?xml version="1.0"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://localhost:9000/jacek/hive_warehouse</value> <description>Warehouse Location</description> </property> </configuration> |
JdbcUtils is a Scala object with methods to support JDBCRDD, JDBCRelation and JdbcRelationProvider.
| Name | Description |
|---|---|
|
Used when:
|
|
|
Replaces data types in a table schema Used exclusively when |
|
|
Used when |
|
|
Used when |
|
|
Used when…FIXME |
|
|
Used when |
|
|
Used when |
|
|
Used when…FIXME |
createConnectionFactory Method|
1 2 3 4 5 |
createConnectionFactory(options: JDBCOptions): () => Connection |
createConnectionFactory…FIXME
|
Note
|
|
getCommonJDBCType Method|
1 2 3 4 5 |
getCommonJDBCType(dt: DataType): Option[JdbcType] |
getCommonJDBCType…FIXME
|
Note
|
getCommonJDBCType is used when…FIXME
|
getCatalystType Internal Method|
1 2 3 4 5 6 7 8 9 |
getCatalystType( sqlType: Int, precision: Int, scale: Int, signed: Boolean): DataType |
getCatalystType…FIXME
|
Note
|
getCatalystType is used when…FIXME
|
getSchemaOption Method|
1 2 3 4 5 |
getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] |
getSchemaOption…FIXME
|
Note
|
getSchemaOption is used when…FIXME
|
getSchema Method|
1 2 3 4 5 6 7 8 |
getSchema( resultSet: ResultSet, dialect: JdbcDialect, alwaysNullable: Boolean = false): StructType |
getSchema…FIXME
|
Note
|
getSchema is used when…FIXME
|
resultSetToRows Method|
1 2 3 4 5 |
resultSetToRows(resultSet: ResultSet, schema: StructType): Iterator[Row] |
resultSetToRows…FIXME
|
Note
|
resultSetToRows is used when…FIXME
|
resultSetToSparkInternalRows Method|
1 2 3 4 5 6 7 8 |
resultSetToSparkInternalRows( resultSet: ResultSet, schema: StructType, inputMetrics: InputMetrics): Iterator[InternalRow] |
resultSetToSparkInternalRows…FIXME
|
Note
|
resultSetToSparkInternalRows is used when…FIXME
|
schemaString Method|
1 2 3 4 5 6 7 8 |
schemaString( df: DataFrame, url: String, createTableColumnTypes: Option[String] = None): String |
schemaString…FIXME
|
Note
|
schemaString is used exclusively when JdbcUtils is requested to create a table.
|
parseUserSpecifiedCreateTableColumnTypes Internal Method|
1 2 3 4 5 6 7 |
parseUserSpecifiedCreateTableColumnTypes( df: DataFrame, createTableColumnTypes: String): Map[String, String] |
parseUserSpecifiedCreateTableColumnTypes…FIXME
|
Note
|
parseUserSpecifiedCreateTableColumnTypes is used exclusively when JdbcUtils is requested to schemaString.
|
saveTable Method|
1 2 3 4 5 6 7 8 9 |
saveTable( df: DataFrame, tableSchema: Option[StructType], isCaseSensitive: Boolean, options: JDBCOptions): Unit |
saveTable takes the url, table, batchSize, isolationLevel options and createConnectionFactory.
saveTable getInsertStatement.
saveTable takes the numPartitions option and applies coalesce operator to the input DataFrame if the number of partitions of its RDD is less than the numPartitions option.
In the end, saveTable requests the possibly-repartitioned DataFrame for its RDD (it may have changed after the coalesce operator) and executes savePartition for every partition (using RDD.foreachPartition).
|
Note
|
saveTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
getCustomSchema Method|
1 2 3 4 5 6 7 8 |
getCustomSchema( tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType |
getCustomSchema replaces the data type of the fields in the input tableSchema schema that are included in the input customSchema (if defined).
Internally, getCustomSchema branches off per the input customSchema.
If the input customSchema is undefined or empty, getCustomSchema simply returns the input tableSchema unchanged.
Otherwise, if the input customSchema is not empty, getCustomSchema requests CatalystSqlParser to parse it (i.e. create a new StructType for the given customSchema canonical schema representation).
getCustomSchema then uses SchemaUtils to checkColumnNameDuplication (in the column names of the user-defined customSchema schema with the input nameEquality).
In the end, getCustomSchema replaces the data type of the fields in the input tableSchema that are included in the input userSchema.
|
Note
|
getCustomSchema is used exclusively when JDBCRelation is created (and customSchema JDBC option was defined).
|
dropTable Method|
1 2 3 4 5 |
dropTable(conn: Connection, table: String): Unit |
dropTable…FIXME
|
Note
|
dropTable is used when…FIXME
|
createTable Method|
1 2 3 4 5 6 7 8 |
createTable( conn: Connection, df: DataFrame, options: JDBCOptions): Unit |
createTable builds the table schema (given the input DataFrame with the url and createTableColumnTypes options).
createTable uses the table and createTableOptions options.
In the end, createTable concatenates all the above texts into a CREATE TABLE
SQL DDL statement followed by executing it (using the input JDBC
([strSchema]) [createTableOptions]Connection).
|
Note
|
createTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
getInsertStatement Method|
1 2 3 4 5 6 7 8 9 10 |
getInsertStatement( table: String, rddSchema: StructType, tableSchema: Option[StructType], isCaseSensitive: Boolean, dialect: JdbcDialect): String |
getInsertStatement…FIXME
|
Note
|
getInsertStatement is used when…FIXME
|
getJdbcType Internal Method|
1 2 3 4 5 |
getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType |
getJdbcType…FIXME
|
Note
|
getJdbcType is used when…FIXME
|
tableExists Method|
1 2 3 4 5 |
tableExists(conn: Connection, options: JDBCOptions): Boolean |
tableExists…FIXME
|
Note
|
tableExists is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
truncateTable Method|
1 2 3 4 5 |
truncateTable(conn: Connection, options: JDBCOptions): Unit |
truncateTable…FIXME
|
Note
|
truncateTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
savePartition Method|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
savePartition( getConnection: () => Connection, table: String, iterator: Iterator[Row], rddSchema: StructType, insertStmt: String, batchSize: Int, dialect: JdbcDialect, isolationLevel: Int): Iterator[Byte] |
savePartition creates a JDBC Connection using the input getConnection function.
savePartition tries to set the input isolationLevel if it is different than TRANSACTION_NONE and the database supports transactions.
savePartition then writes rows (in the input Iterator[Row]) using batches that are submitted after batchSize rows where added.
|
Note
|
savePartition is used exclusively when JdbcUtils is requested to saveTable.
|