Custom Data Source Formats
Caution
|
FIXME |
See spark-mf-format project at GitHub for a complete solution.
Caution
|
FIXME |
See spark-mf-format project at GitHub for a complete solution.
DataSource
is one of the main parts of Data Source API in Spark SQL (together with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider
for creating streaming sources).
DataSource
models a pluggable data provider framework with the extension points for Spark SQL integrators to expand the list of supported external data sources in Spark SQL.
DataSource
takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:
The location URI of a HiveTableRelation (when HiveMetastoreCatalog
is requested to convert a HiveTableRelation to a LogicalRelation)
The table name of a UnresolvedRelation (when ResolveSQLOnFile logical evaluation rule is executed)
The files in a directory when Spark Structured Streaming’s FileStreamSource
is requested for batches
DataSource
is created when:
DataFrameWriter
is requested to save to a data source (per Data Source V1 contract)
FindDataSourceTable and ResolveSQLOnFile logical evaluation rules are executed
CreateDataSourceTableCommand, CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand, CreateTempViewUsing are executed
HiveMetastoreCatalog
is requested to convertToLogicalRelation
Spark Structured Streaming’s FileStreamSource
, DataStreamReader
and DataStreamWriter
Extension Point | Description |
---|---|
Data source that saves the result of a structured query per save mode and returns the schema |
|
|
|
Data source that supports schema inference and can be accessed using SQL’s |
|
|
|
As a user, you interact with DataSource
by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// Batch reading val people: DataFrame = spark.read .format("csv") .load("people.csv") // Streamed reading val messages: DataFrame = spark.readStream .format("kafka") .option("subscribe", "topic") .option("kafka.bootstrap.servers", "localhost:9092") .load |
DataSource
uses a SparkSession, a class name, a collection of paths
, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.
Note
|
Data source is also called a table provider. |
When requested to resolve a batch (non-streaming) FileFormat, DataSource
creates a HadoopFsRelation with the optional bucketing specification.
writeAndRead
Method
1 2 3 4 5 |
writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation |
Caution
|
FIXME |
Note
|
writeAndRead is used exclusively when CreateDataSourceTableAsSelectCommand logical command is executed.
|
write
Method
1 2 3 4 5 |
write(mode: SaveMode, data: DataFrame): BaseRelation |
write
writes the result of executing a structured query (as DataFrame) to a data source per save mode
.
Internally, write
looks up the data source and branches off per providingClass.
providingClass | Description |
---|---|
others |
Reports a |
Note
|
write does not support the internal CalendarIntervalType in the schema of data DataFrame and throws a AnalysisException when there is one.
|
Note
|
write is used exclusively when SaveIntoDataSourceCommand is executed.
|
writeInFileFormat
Internal Method
Caution
|
FIXME |
For FileFormat data sources, write
takes all paths
and path
option and makes sure that there is only one.
Note
|
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.
|
write
requests PartitioningUtils
to validatePartitionColumn.
When appending to a table, …FIXME
In the end, write
(for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand
logical plan with executes it.
Caution
|
FIXME Is toRdd a job execution?
|
createSink
Method
Caution
|
FIXME |
sourceSchema
Internal Method
1 2 3 4 5 |
sourceSchema(): SourceInfo |
sourceSchema
returns the name and schema of the data source for streamed reading.
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider
data sources.
Internally, sourceSchema
first creates an instance of the data source and…
Caution
|
FIXME Finish… |
For Structured Streaming’s StreamSourceProvider
data sources, sourceSchema
relays calls to StreamSourceProvider.sourceSchema
.
For FileFormat data sources, sourceSchema
makes sure that path
option was specified.
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path , though.
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException
exception in the logs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> spark.read.load("the.file.does.not.exist.parquet") org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132) ... 48 elided |
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema
is not specified, the following IllegalArgumentException
exception is thrown:
1 2 3 4 5 |
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it. |
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo
with FileSource[path]
and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException
exception:
1 2 3 4 5 |
Data source [className] does not support streamed reading |
Note
|
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.
|
inferFileFormatSchema
Internal Method
1 2 3 4 5 |
inferFileFormatSchema(format: FileFormat): StructType |
inferFileFormatSchema
private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema
if specified or uses FileFormat.inferSchema
. It throws a AnalysisException
when is unable to infer schema.
It uses path
option for the list of directory paths.
Note
|
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed. |
resolveRelation
Method
1 2 3 4 5 |
resolveRelation(checkFilesExist: Boolean = true): BaseRelation |
resolveRelation
resolves (i.e. creates) a BaseRelation.
Internally, resolveRelation
tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.
Provider | Behaviour |
---|---|
Executes SchemaRelationProvider.createRelation with the provided schema |
|
Executes RelationProvider.createRelation |
|
Creates a HadoopFsRelation |
Note
|
|
buildStorageFormatFromOptions
Method
1 2 3 4 5 |
buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat |
buildStorageFormatFromOptions
…FIXME
Note
|
buildStorageFormatFromOptions is used when…FIXME
|
DataSource
takes the following when created:
(optional) User-specified schema (default: None
, i.e. undefined)
Optional bucketing specification (default: None
)
(optional) CatalogTable (default: None
)
DataSource
initializes the internal registries and counters.
lookupDataSource
Method
1 2 3 4 5 |
lookupDataSource(provider: String, conf: SQLConf): Class[_] |
lookupDataSource
looks up the class name in the backwardCompatibilityMap and then replaces the class name exclusively for the orc
provider per spark.sql.orc.impl internal configuration property:
For hive
(default), lookupDataSource
uses org.apache.spark.sql.hive.orc.OrcFileFormat
For native
, lookupDataSource
uses the canonical class name of OrcFileFormat, i.e. org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
With the provider’s class name (aka provider1 internally) lookupDataSource
assumes another name variant of format [provider1].DefaultSource
(aka provider2 internally).
lookupDataSource
then uses Java’s ServiceLoader to find all DataSourceRegister provider classes on the CLASSPATH.
lookupDataSource
filters out the DataSourceRegister
provider classes (by their alias) that match the provider1 (case-insensitive), e.g. parquet
or kafka
.
If a single provider class was found for the alias, lookupDataSource
simply returns the provider class.
If no DataSourceRegister
could be found by the short name (alias), lookupDataSource
considers the names of the format provider as the fully-qualified class names and tries to load them instead (using Java’s ClassLoader.loadClass).
Note
|
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or a fully-qualified class name.
|
Caution
|
FIXME Describe the other cases (orc and avro) |
If no provider class could be found, lookupDataSource
throws a RuntimeException
:
1 2 3 4 5 |
java.lang.ClassNotFoundException: Failed to find data source: [provider1]. Please find packages at http://spark.apache.org/third-party-projects.html |
If however, lookupDataSource
found multiple registered aliases for the provider name…FIXME
planForWriting
Method
1 2 3 4 5 |
planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan |
planForWriting
creates an instance of the providingClass and branches off per its type as follows:
For a CreatableRelationProvider, planForWriting
creates a SaveIntoDataSourceCommand (with the input data
and mode
, the CreatableRelationProvider
data source and the caseInsensitiveOptions)
For a FileFormat, planForWriting
planForWritingFileFormat (with the FileFormat
format and the input mode
and data
)
For other types, planForWriting
simply throws a RuntimeException
:
1 2 3 4 5 |
[providingClass] does not allow create table as select. |
Note
|
|
planForWritingFileFormat
Internal Method
1 2 3 4 5 6 7 8 |
planForWritingFileFormat( format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand |
planForWritingFileFormat
takes the paths and the path
option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.
Note
|
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).
|
planForWritingFileFormat
uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.
In the end, planForWritingFileFormat
returns a new InsertIntoHadoopFsRelationCommand.
When the number of the paths is different than 1
, planForWritingFileFormat
throws an IllegalArgumentException
:
1 2 3 4 5 |
Expected exactly one path to be specified, but got: [allPaths] |
Note
|
planForWritingFileFormat is used when DataSource is requested to write data to a data source per save mode followed by reading rows back (when CreateDataSourceTableAsSelectCommand logical command is executed) and for the logical command for writing.
|
getOrInferFileFormatSchema
Internal Method
1 2 3 4 5 6 7 |
getOrInferFileFormatSchema( format: FileFormat, fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) |
getOrInferFileFormatSchema
…FIXME
Note
|
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolveRelation.
|
HiveUtils
is used to create a HiveClientImpl that HiveExternalCatalog uses to interact with a Hive metastore.
Tip
|
Enable Add the following line to
Refer to Logging. |
newClientForMetadata
Method
1 2 3 4 5 6 7 8 9 10 11 |
newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient (1) newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient |
Executes the other newClientForMetadata
with time configurations formatted
Internally, newClientForMetadata
creates a new SQLConf with spark.sql properties only (from the input SparkConf
).
newClientForMetadata
then creates a IsolatedClientLoader
per the input parameters and the following configuration properties:
You should see one of the following INFO messages in the logs:
1 2 3 4 5 6 7 |
Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using Spark classes. Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using maven. Initializing HiveMetastoreConnection version [hiveMetastoreVersion] using [jars] |
In the end, newClientForMetadata
requests the IsolatedClientLoader
to create a HiveClientImpl.
Note
|
newClientForMetadata is used exclusively when HiveExternalCatalog is requested for a HiveClient.
|
HiveClientImpl
is the only available HiveClient in Spark SQL that does/uses…FIXME
HiveClientImpl
is created exclusively when IsolatedClientLoader
is requested to create a new Hive client. When created, HiveClientImpl
is given the location of the default database for the Hive metastore warehouse (i.e. warehouseDir that is the value of hive.metastore.warehouse.dir Hive-specific Hadoop configuration property).
Note
|
The location of the default database for the Hive metastore warehouse is /user/hive/warehouse by default.
|
Note
|
You may be interested in SPARK-19664 put ‘hive.metastore.warehouse.dir’ in hadoopConf place if you use Spark before 2.1 (which you should not really as it is not supported anymore). |
Note
|
The Hadoop configuration is what HiveExternalCatalog was given when created (which is the default Hadoop configuration from Spark Core’s SparkContext.hadoopConfiguration with the Spark properties with spark.hadoop prefix).
|
Tip
|
Enable Add the following line to
Refer to Logging. |
renamePartitions
Method
1 2 3 4 5 6 7 8 9 |
renamePartitions( db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit |
Note
|
renamePartitions is part of HiveClient Contract to…FIXME.
|
renamePartitions
…FIXME
alterPartitions
Method
1 2 3 4 5 6 7 8 |
alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit |
Note
|
alterPartitions is part of HiveClient Contract to…FIXME.
|
alterPartitions
…FIXME
getPartitions
Method
1 2 3 4 5 6 7 |
getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] |
Note
|
getPartitions is part of HiveClient Contract to…FIXME.
|
getPartitions
…FIXME
getPartitionsByFilter
Method
1 2 3 4 5 6 7 |
getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] |
Note
|
getPartitionsByFilter is part of HiveClient Contract to…FIXME.
|
getPartitionsByFilter
…FIXME
getPartitionOption
Method
1 2 3 4 5 6 7 |
getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] |
Note
|
getPartitionOption is part of HiveClient Contract to…FIXME.
|
getPartitionOption
…FIXME
HiveClientImpl
takes the following when created:
HiveClientImpl
initializes the internal registries and counters.
getTableOption
Method
1 2 3 4 5 |
def getTableOption(dbName: String, tableName: String): Option[CatalogTable] |
Note
|
getTableOption is part of HiveClient Contract to…FIXME.
|
When executed, getTableOption
prints out the following DEBUG message to the logs:
1 2 3 4 5 |
Looking up [dbName].[tableName] |
getTableOption
requests Hive client to retrieve the metadata of the table and creates a CatalogTable.
readHiveStats
Internal Method
1 2 3 4 5 |
readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] |
readHiveStats
creates a CatalogStatistics from the input Hive table or partition parameters (if available and greater than 0).
Hive Parameter | Table Statistics |
---|---|
|
|
|
|
|
Note
|
totalSize Hive parameter has a higher precedence over rawDataSize for sizeInBytes table statistic.
|
Note
|
readHiveStats is used when HiveClientImpl is requested for the metadata of a table or table partition.
|
fromHivePartition
Method
1 2 3 4 5 |
fromHivePartition(hp: HivePartition): CatalogTablePartition |
fromHivePartition
simply creates a CatalogTablePartition with the following:
spec from Hive’s Partition.getSpec if available
storage from Hive’s StorageDescriptor of the table partition
parameters from Hive’s Partition.getParameters if available
stats from Hive’s Partition.getParameters if available and converted to table statistics format
Note
|
fromHivePartition is used when HiveClientImpl is requested for getPartitionOption, getPartitions and getPartitionsByFilter.
|
toHiveTable
Method
1 2 3 4 5 |
toHiveTable(table: CatalogTable, userName: Option[String] = None): HiveTable |
toHiveTable
simply creates a new Hive Table
and copies the properties from the input CatalogTable.
Note
|
|
HiveClient
is the contract for…FIXME
Note
|
HiveClientImpl is the only available HiveClient in Spark SQL.
|
HiveClient
offers safe variants of many methods that do not report exceptions when a relational entity is not found in a Hive metastore, e.g. getTableOption for getTable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package org.apache.spark.sql.hive.client trait HiveClient { // only required methods that have no implementation // FIXME List of the methods def alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit def getTableOption(dbName: String, tableName: String): Option[CatalogTable] def getPartitions( catalogTable: CatalogTable, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] def getPartitionsByFilter( catalogTable: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] def renamePartitions( db: String, table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit } |
Note
|
HiveClient is a private[hive] contract.
|
Method | Description | ||||
---|---|---|---|---|---|
|
|||||
|
Returns the CatalogTablePartition of a table Used exclusively when |
||||
|
|||||
|
|||||
|
Retrieves a table metadata if available Used exclusively when
|
||||
|
getTable
Method
1 2 3 4 5 |
getTable(dbName: String, tableName: String): CatalogTable |
getTable
retrieves the metadata of a table in a Hive metastore if available or reports a NoSuchTableException
.
Note
|
|
HiveFileFormat
is a FileFormat for writing Hive tables.
HiveFileFormat
is a DataSourceRegister and registers itself as hive data source.
Note
|
Hive data source can only be used with tables and you cannot read or write files of Hive data source directly. Use DataFrameReader.table or DataFrameWriter.saveAsTable for loading from or writing data to Hive data source, respectively. |
HiveFileFormat
is created exclusively when SaveAsHiveFile
is requested to saveAsHiveFile (when InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands are executed).
HiveFileFormat
takes a FileSinkDesc
when created.
HiveFileFormat
throws a UnsupportedOperationException
when requested to inferSchema.
1 2 3 4 5 |
inferSchema is not supported for hive data source. |
prepareWrite
Method
1 2 3 4 5 6 7 8 9 |
prepareWrite( sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory |
Note
|
prepareWrite is part of the FileFormat Contract to prepare a write job.
|
prepareWrite
sets the mapred.output.format.class property to be the getOutputFileFormatClassName
of the Hive TableDesc
of the FileSinkDesc.
prepareWrite
requests the HiveTableUtil
helper object to configureJobPropertiesForStorageHandler
.
prepareWrite
requests the Hive Utilities
helper object to copyTableJobPropertiesToConf
.
In the end, prepareWrite
creates a new OutputWriterFactory
that creates a new HiveOutputWriter
when requested for a new OutputWriter
instance.
Caution
|
FIXME |
Caution
|
FIXME |
Tip
|
Read about Spark SQL CLI in Spark’s official documentation in Running the Spark SQL CLI. |
1 2 3 4 5 6 7 |
spark-sql> describe function `<>`; Function: <> Usage: a <> b - Returns TRUE if a is not equal to b |
Tip
|
Functions are registered in FunctionRegistry. |
1 2 3 4 5 |
spark-sql> show functions; |
1 2 3 4 5 |
spark-sql> explain extended show tables; |
Spark SQL uses a Hive metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access).
A Hive metastore warehouse (aka spark-warehouse) is the directory where Spark SQL persists tables whereas a Hive metastore (aka metastore_db) is a relational database to manage the metadata of the persistent relational entities, e.g. databases, tables, columns, partitions.
By default, Spark SQL uses the embedded deployment mode of a Hive metastore with a Apache Derby database.
Important
|
The default embedded deployment mode is not recommended for production use due to limitation of only one active SparkSession at a time. Read Cloudera’s Configuring the Hive Metastore for CDH document that explains the available deployment modes of a Hive metastore. |
When SparkSession
is created with Hive support the external catalog (aka metastore) is HiveExternalCatalog. HiveExternalCatalog
uses spark.sql.warehouse.dir directory for the location of the databases and javax.jdo.option properties for the connection to the Hive metastore database.
Note
|
The metadata of relational entities is persisted in a metastore database over JDBC and DataNucleus AccessPlatform that uses javax.jdo.option properties. Read Hive Metastore Administration to learn how to manage a Hive Metastore. |
Name | Description | ||
---|---|---|---|
The JDBC connection URL of a Hive metastore database to use
|
|||
The JDBC driver of a Hive metastore database to use
|
|||
The user name to use to connect to a Hive metastore database |
|||
The password to use to connect to a Hive metastore database |
You can configure javax.jdo.option properties in hive-site.xml or using options with spark.hadoop prefix.
You can access the current connection properties for a Hive metastore in a Spark SQL application using the Spark internal classes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
scala> :type spark org.apache.spark.sql.SparkSession scala> spark.sharedState.externalCatalog res1: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@79dd79eb // Use `:paste -raw` to paste the following code // This is to pass the private[spark] "gate" // BEGIN package org.apache.spark import org.apache.spark.sql.SparkSession object jacek { def open(spark: SparkSession) = { import org.apache.spark.sql.hive.HiveExternalCatalog spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client } } // END import org.apache.spark.jacek val hiveClient = jacek.open(spark) scala> hiveClient.getConf("javax.jdo.option.ConnectionURL", "") res2: String = jdbc:derby:;databaseName=metastore_db;create=true |
The benefits of using an external Hive metastore:
Allow multiple Spark applications (sessions) to access it concurrently
Allow a single Spark application to use table statistics without running “ANALYZE TABLE” every execution
Note
|
As of Spark 2.2 (see SPARK-18112 Spark2.x does not support read data from Hive 2.x metastore) Spark SQL supports reading data from Hive 2.1.1 metastore. |
Caution
|
FIXME Describe hive-site.xml vs config method vs --conf with spark.hadoop prefix.
|
Spark SQL uses the Hive-specific configuration properties that further fine-tune the Hive integration, e.g. spark.sql.hive.metastore.version or spark.sql.hive.metastore.jars.
spark.sql.warehouse.dir
Configuration Propertyspark.sql.warehouse.dir is a static configuration property that sets Hive’s hive.metastore.warehouse.dir
property, i.e. the location of the Hive local/embedded metastore database (using Derby).
Tip
|
Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive. See also the official Hive Metastore Administration document. |
In order to use an external Hive metastore you should do the following:
Enable Hive support in SparkSession (that makes sure that the Hive classes are on CLASSPATH and sets spark.sql.catalogImplementation internal configuration property to hive
)
spark.sql.warehouse.dir required?
Define hive.metastore.warehouse.dir in hive-site.xml configuration resource
Check out warehousePath
Execute ./bin/run-example sql.hive.SparkHiveExample
to verify Hive configuration
When not configured by the hive-site.xml, SparkSession
automatically creates metastore_db
in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse
in the current directory that the Spark application is started.
Note
|
You may need to grant write privilege to the user who starts the Spark application. |
Name | Description | ||
---|---|---|---|
The Thrift URI of a remote Hive metastore, i.e. one that is in a separate JVM process or on a remote node
|
|||
|
|||
Set to |
You may also want to use the following Hive configuration properties that (seem to) cause exceptions with an empty metastore database as of Hive 2.1.
datanucleus.schema.autoCreateAll
set to true
Caution
|
FIXME Describe the purpose of spark.hadoop.* properties
|
You can specify any of the Hadoop configuration properties, e.g. hive.metastore.warehouse.dir with spark.hadoop prefix.
1 2 3 4 5 6 7 8 9 10 |
$ spark-shell --conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hive-warehouse ... scala> spark.sharedState 18/01/08 10:46:19 INFO SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/tmp/hive-warehouse'). 18/01/08 10:46:19 INFO SharedState: Warehouse path is '/tmp/hive-warehouse'. res1: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState@5a69b3cf |
hive-site.xml
configures Hive clients (e.g. Spark SQL) with the Hive Metastore configuration.
hive-site.xml
is loaded when SharedState is created (which is…FIXME).
Configuration of Hive is done by placing your hive-site.xml
, core-site.xml
(for security configuration),
and hdfs-site.xml
(for HDFS configuration) file in conf/
(that is automatically added to the CLASSPATH of a Spark application).
Tip
|
You can use --driver-class-path or spark.driver.extraClassPath to point to the directory with configuration resources, e.g. hive-site.xml .
|
1 2 3 4 5 6 7 8 9 10 11 |
<configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/tmp/hive-warehouse</value> <description>Hive Metastore location</description> </property> </configuration> |
Tip
|
Read Resources section in Hadoop’s Configuration javadoc to learn more about configuration resources. |
Tip
|
Use
Enable
|
The following steps are for Hive and Hadoop 2.7.5.
1 2 3 4 5 6 7 8 9 10 11 |
$ ./bin/hdfs version Hadoop 2.7.5 Subversion https://shv@git-wip-us.apache.org/repos/asf/hadoop.git -r 18065c2b6806ed4aa6a3187d77cbe21bb3dba075 Compiled by kshvachk on 2017-12-16T01:06Z Compiled with protoc 2.5.0 From source with checksum 9f118f95f47043332d51891e37f736e9 This command was run using /Users/jacek/dev/apps/hadoop-2.7.5/share/hadoop/common/hadoop-common-2.7.5.jar |
Tip
|
Read the section Pseudo-Distributed Operation about how to run Hadoop HDFS “on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.” |
Tip
|
Use
Use
|
Edit etc/hadoop/core-site.xml
to add the following:
1 2 3 4 5 6 7 8 9 10 |
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration> |
./bin/hdfs namenode -format
right after you’ve installed Hadoop and before starting any HDFS services (NameNode in particular)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
$ ./bin/hdfs namenode -format 18/01/09 15:48:28 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = japila.local/192.168.1.2 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 2.7.5 ... 18/01/09 15:48:28 INFO namenode.NameNode: createNameNode [-format] ... Formatting using clusterid: CID-bfdc81da-6941-4a93-8371-2c254d503a97 ... 18/01/09 15:48:29 INFO common.Storage: Storage directory /tmp/hadoop-jacek/dfs/name has been successfully formatted. 18/01/09 15:48:29 INFO namenode.FSImageFormatProtobuf: Saving image file /tmp/hadoop-jacek/dfs/name/current/fsimage.ckpt_0000000000000000000 using no compression 18/01/09 15:48:29 INFO namenode.FSImageFormatProtobuf: Image file /tmp/hadoop-jacek/dfs/name/current/fsimage.ckpt_0000000000000000000 of size 322 bytes saved in 0 seconds. 18/01/09 15:48:29 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 18/01/09 15:48:29 INFO util.ExitUtil: Exiting with status 0 |
Note
|
Use
|
Start Hadoop HDFS using ./sbin/start-dfs.sh
(and tail -f logs/hadoop-*-datanode-*.log
)
1 2 3 4 5 6 7 8 9 10 |
$ ./sbin/start-dfs.sh Starting namenodes on [localhost] localhost: starting namenode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-namenode-japila.local.out localhost: starting datanode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-datanode-japila.local.out Starting secondary namenodes [0.0.0.0] 0.0.0.0: starting secondarynamenode, logging to /Users/jacek/dev/apps/hadoop-2.7.5/logs/hadoop-jacek-secondarynamenode-japila.local.out |
Use jps -lm
to list Hadoop’s JVM processes.
1 2 3 4 5 6 7 8 |
$ jps -lm 26576 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode 26468 org.apache.hadoop.hdfs.server.datanode.DataNode 26381 org.apache.hadoop.hdfs.server.namenode.NameNode |
Create hive-site.xml
in $SPARK_HOME/conf
with the following:
1 2 3 4 5 6 7 8 9 10 11 12 |
<?xml version="1.0"?> <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://localhost:9000/jacek/hive_warehouse</value> <description>Warehouse Location</description> </property> </configuration> |
JdbcUtils
is a Scala object with methods to support JDBCRDD, JDBCRelation and JdbcRelationProvider.
Name | Description |
---|---|
Used when:
|
|
Replaces data types in a table schema Used exclusively when |
|
Used when |
|
Used when |
|
Used when…FIXME |
|
Used when |
|
Used when |
|
Used when…FIXME |
createConnectionFactory
Method
1 2 3 4 5 |
createConnectionFactory(options: JDBCOptions): () => Connection |
createConnectionFactory
…FIXME
Note
|
|
getCommonJDBCType
Method
1 2 3 4 5 |
getCommonJDBCType(dt: DataType): Option[JdbcType] |
getCommonJDBCType
…FIXME
Note
|
getCommonJDBCType is used when…FIXME
|
getCatalystType
Internal Method
1 2 3 4 5 6 7 8 9 |
getCatalystType( sqlType: Int, precision: Int, scale: Int, signed: Boolean): DataType |
getCatalystType
…FIXME
Note
|
getCatalystType is used when…FIXME
|
getSchemaOption
Method
1 2 3 4 5 |
getSchemaOption(conn: Connection, options: JDBCOptions): Option[StructType] |
getSchemaOption
…FIXME
Note
|
getSchemaOption is used when…FIXME
|
getSchema
Method
1 2 3 4 5 6 7 8 |
getSchema( resultSet: ResultSet, dialect: JdbcDialect, alwaysNullable: Boolean = false): StructType |
getSchema
…FIXME
Note
|
getSchema is used when…FIXME
|
resultSetToRows
Method
1 2 3 4 5 |
resultSetToRows(resultSet: ResultSet, schema: StructType): Iterator[Row] |
resultSetToRows
…FIXME
Note
|
resultSetToRows is used when…FIXME
|
resultSetToSparkInternalRows
Method
1 2 3 4 5 6 7 8 |
resultSetToSparkInternalRows( resultSet: ResultSet, schema: StructType, inputMetrics: InputMetrics): Iterator[InternalRow] |
resultSetToSparkInternalRows
…FIXME
Note
|
resultSetToSparkInternalRows is used when…FIXME
|
schemaString
Method
1 2 3 4 5 6 7 8 |
schemaString( df: DataFrame, url: String, createTableColumnTypes: Option[String] = None): String |
schemaString
…FIXME
Note
|
schemaString is used exclusively when JdbcUtils is requested to create a table.
|
parseUserSpecifiedCreateTableColumnTypes
Internal Method
1 2 3 4 5 6 7 |
parseUserSpecifiedCreateTableColumnTypes( df: DataFrame, createTableColumnTypes: String): Map[String, String] |
parseUserSpecifiedCreateTableColumnTypes
…FIXME
Note
|
parseUserSpecifiedCreateTableColumnTypes is used exclusively when JdbcUtils is requested to schemaString.
|
saveTable
Method
1 2 3 4 5 6 7 8 9 |
saveTable( df: DataFrame, tableSchema: Option[StructType], isCaseSensitive: Boolean, options: JDBCOptions): Unit |
saveTable
takes the url, table, batchSize, isolationLevel options and createConnectionFactory.
saveTable
getInsertStatement.
saveTable
takes the numPartitions option and applies coalesce operator to the input DataFrame
if the number of partitions of its RDD is less than the numPartitions
option.
In the end, saveTable
requests the possibly-repartitioned DataFrame
for its RDD (it may have changed after the coalesce operator) and executes savePartition for every partition (using RDD.foreachPartition
).
Note
|
saveTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
getCustomSchema
Method
1 2 3 4 5 6 7 8 |
getCustomSchema( tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType |
getCustomSchema
replaces the data type of the fields in the input tableSchema
schema that are included in the input customSchema
(if defined).
Internally, getCustomSchema
branches off per the input customSchema
.
If the input customSchema
is undefined or empty, getCustomSchema
simply returns the input tableSchema
unchanged.
Otherwise, if the input customSchema
is not empty, getCustomSchema
requests CatalystSqlParser
to parse it (i.e. create a new StructType for the given customSchema
canonical schema representation).
getCustomSchema
then uses SchemaUtils
to checkColumnNameDuplication (in the column names of the user-defined customSchema
schema with the input nameEquality
).
In the end, getCustomSchema
replaces the data type of the fields in the input tableSchema
that are included in the input userSchema
.
Note
|
getCustomSchema is used exclusively when JDBCRelation is created (and customSchema JDBC option was defined).
|
dropTable
Method
1 2 3 4 5 |
dropTable(conn: Connection, table: String): Unit |
dropTable
…FIXME
Note
|
dropTable is used when…FIXME
|
createTable
Method
1 2 3 4 5 6 7 8 |
createTable( conn: Connection, df: DataFrame, options: JDBCOptions): Unit |
createTable
builds the table schema (given the input DataFrame
with the url and createTableColumnTypes options).
createTable
uses the table and createTableOptions options.
In the end, createTable
concatenates all the above texts into a CREATE TABLE
SQL DDL statement followed by executing it (using the input JDBC
([strSchema]) [createTableOptions]Connection
).
Note
|
createTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
getInsertStatement
Method
1 2 3 4 5 6 7 8 9 10 |
getInsertStatement( table: String, rddSchema: StructType, tableSchema: Option[StructType], isCaseSensitive: Boolean, dialect: JdbcDialect): String |
getInsertStatement
…FIXME
Note
|
getInsertStatement is used when…FIXME
|
getJdbcType
Internal Method
1 2 3 4 5 |
getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType |
getJdbcType
…FIXME
Note
|
getJdbcType is used when…FIXME
|
tableExists
Method
1 2 3 4 5 |
tableExists(conn: Connection, options: JDBCOptions): Boolean |
tableExists
…FIXME
Note
|
tableExists is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
truncateTable
Method
1 2 3 4 5 |
truncateTable(conn: Connection, options: JDBCOptions): Unit |
truncateTable
…FIXME
Note
|
truncateTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.
|
savePartition
Method
1 2 3 4 5 6 7 8 9 10 11 12 13 |
savePartition( getConnection: () => Connection, table: String, iterator: Iterator[Row], rddSchema: StructType, insertStmt: String, batchSize: Int, dialect: JdbcDialect, isolationLevel: Int): Iterator[Byte] |
savePartition
creates a JDBC Connection
using the input getConnection
function.
savePartition
tries to set the input isolationLevel
if it is different than TRANSACTION_NONE
and the database supports transactions.
savePartition
then writes rows (in the input Iterator[Row]
) using batches that are submitted after batchSize
rows where added.
Note
|
savePartition is used exclusively when JdbcUtils is requested to saveTable.
|