关注 spark技术分享,
撸spark源码 玩spark最佳实践

DataSource — Pluggable Data Provider Framework

admin阅读(12238)

DataSource — Pluggable Data Provider Framework

DataSource is one of the main parts of Data Source API in Spark SQL (together with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider for creating streaming sources).

DataSource models a pluggable data provider framework with the extension points for Spark SQL integrators to expand the list of supported external data sources in Spark SQL.

DataSource takes a list of file system paths that hold data. The list is empty by default, but can be different per data source:

DataSource is created when:

Table 1. DataSource’s Provider (and Format) Contracts
Extension Point Description

CreatableRelationProvider

Data source that saves the result of a structured query per save mode and returns the schema

FileFormat

Used in:

  • sourceSchema for streamed reading

  • write for writing a DataFrame to a DataSource (as part of creating a table as select)

RelationProvider

Data source that supports schema inference and can be accessed using SQL’s USING clause

SchemaRelationProvider

Data source that requires a user-defined schema

StreamSourceProvider

Used in:

As a user, you interact with DataSource by DataFrameReader (when you execute spark.read or spark.readStream) or SQL’s CREATE TABLE USING.

DataSource uses a SparkSession, a class name, a collection of paths, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.

Note
Data source is also called a table provider.

When requested to resolve a batch (non-streaming) FileFormat, DataSource creates a HadoopFsRelation with the optional bucketing specification.

Table 2. DataSource’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

providingClass

The Java class (java.lang.Class) that…​FIXME

Used when…​FIXME

sourceInfo

SourceInfo

Used when…​FIXME

caseInsensitiveOptions

FIXME

Used when…​FIXME

equality

FIXME

Used when…​FIXME

backwardCompatibilityMap

FIXME

Used when…​FIXME

Writing Data to Data Source per Save Mode Followed by Reading Rows Back (as BaseRelation) — writeAndRead Method

Caution
FIXME
Note
writeAndRead is used exclusively when CreateDataSourceTableAsSelectCommand logical command is executed.

Writing DataFrame to Data Source Per Save Mode — write Method

write writes the result of executing a structured query (as DataFrame) to a data source per save mode.

Internally, write looks up the data source and branches off per providingClass.

Table 3. write’s Branches per Supported providingClass (in execution order)
providingClass Description

CreatableRelationProvider

Executes CreatableRelationProvider.createRelation

FileFormat

writeInFileFormat

others

Reports a RuntimeException

Note
write does not support the internal CalendarIntervalType in the schema of data DataFrame and throws a AnalysisException when there is one.
Note
write is used exclusively when SaveIntoDataSourceCommand is executed.

writeInFileFormat Internal Method

Caution
FIXME

For FileFormat data sources, write takes all paths and path option and makes sure that there is only one.

Note
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.

write requests PartitioningUtils to validatePartitionColumn.

When appending to a table, …​FIXME

In the end, write (for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand logical plan with executes it.

Caution
FIXME Is toRdd a job execution?

createSource Method

Caution
FIXME

createSink Method

Caution
FIXME

sourceSchema Internal Method

sourceSchema returns the name and schema of the data source for streamed reading.

Caution
FIXME Why is the method called? Why does this bother with streamed reading and data sources?!

It supports two class hierarchies, i.e. FileFormat and Structured Streaming’s StreamSourceProvider data sources.

Internally, sourceSchema first creates an instance of the data source and…​

Caution
FIXME Finish…​

For Structured Streaming’s StreamSourceProvider data sources, sourceSchema relays calls to StreamSourceProvider.sourceSchema.

For FileFormat data sources, sourceSchema makes sure that path option was specified.

Tip
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path, though.
Note
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.

It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException exception in the logs:

If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema is not specified, the following IllegalArgumentException exception is thrown:

Caution
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When?

Eventually, it returns a SourceInfo with FileSource[path] and the schema (as calculated using the inferFileFormatSchema internal method).

For any other data source, it throws UnsupportedOperationException exception:

Note
sourceSchema is used exclusively when DataSource is requested for the sourceInfo.

inferFileFormatSchema Internal Method

inferFileFormatSchema private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema if specified or uses FileFormat.inferSchema. It throws a AnalysisException when is unable to infer schema.

It uses path option for the list of directory paths.

Note
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed.

Resolving Relation (Creating BaseRelation) — resolveRelation Method

resolveRelation resolves (i.e. creates) a BaseRelation.

Internally, resolveRelation tries to create an instance of the providingClass and branches off per its type and whether the optional user-specified schema was specified or not.

Table 4. Resolving BaseRelation per Provider and User-Specified Schema
Provider Behaviour

SchemaRelationProvider

Executes SchemaRelationProvider.createRelation with the provided schema

RelationProvider

Executes RelationProvider.createRelation

FileFormat

Creates a HadoopFsRelation

Note

resolveRelation is used when:

buildStorageFormatFromOptions Method

buildStorageFormatFromOptions…​FIXME

Note
buildStorageFormatFromOptions is used when…​FIXME

Creating DataSource Instance

DataSource takes the following when created:

  • SparkSession

  • Name of the provider class (aka input data source format)

  • A list of file system paths that hold data (default: empty)

  • (optional) User-specified schema (default: None, i.e. undefined)

  • (optional) Names of the partition columns (default: empty)

  • Optional bucketing specification (default: None)

  • Options (default: empty)

  • (optional) CatalogTable (default: None)

DataSource initializes the internal registries and counters.

Looking Up Class By Name Of Data Source Provider — lookupDataSource Method

lookupDataSource looks up the class name in the backwardCompatibilityMap and then replaces the class name exclusively for the orc provider per spark.sql.orc.impl internal configuration property:

  • For hive (default), lookupDataSource uses org.apache.spark.sql.hive.orc.OrcFileFormat

  • For native, lookupDataSource uses the canonical class name of OrcFileFormat, i.e. org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

With the provider’s class name (aka provider1 internally) lookupDataSource assumes another name variant of format [provider1].DefaultSource (aka provider2 internally).

lookupDataSource then uses Java’s ServiceLoader to find all DataSourceRegister provider classes on the CLASSPATH.

lookupDataSource filters out the DataSourceRegister provider classes (by their alias) that match the provider1 (case-insensitive), e.g. parquet or kafka.

If a single provider class was found for the alias, lookupDataSource simply returns the provider class.

If no DataSourceRegister could be found by the short name (alias), lookupDataSource considers the names of the format provider as the fully-qualified class names and tries to load them instead (using Java’s ClassLoader.loadClass).

Note
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or a fully-qualified class name.
Caution
FIXME Describe the other cases (orc and avro)

If no provider class could be found, lookupDataSource throws a RuntimeException:

If however, lookupDataSource found multiple registered aliases for the provider name…​FIXME

Creating Logical Command for Writing (for CreatableRelationProvider and FileFormat Data Sources) — planForWriting Method

planForWriting creates an instance of the providingClass and branches off per its type as follows:

Note

planForWriting is used when:

Planning for Writing (Using FileFormat) — planForWritingFileFormat Internal Method

planForWritingFileFormat takes the paths and the path option (from the caseInsensitiveOptions) together and (assuming that there is only one path available among the paths combined) creates a fully-qualified HDFS-compatible output path for writing.

Note
planForWritingFileFormat uses Hadoop HDFS’s Path to requests for the FileSystem that owns it (using Hadoop Configuration).

planForWritingFileFormat uses the PartitioningUtils helper object to validate partition columns in the partitionColumns.

In the end, planForWritingFileFormat returns a new InsertIntoHadoopFsRelationCommand.

When the number of the paths is different than 1, planForWritingFileFormat throws an IllegalArgumentException:

Note
planForWritingFileFormat is used when DataSource is requested to write data to a data source per save mode followed by reading rows back (when CreateDataSourceTableAsSelectCommand logical command is executed) and for the logical command for writing.

getOrInferFileFormatSchema Internal Method

getOrInferFileFormatSchema…​FIXME

Note
getOrInferFileFormatSchema is used when DataSource is requested for the sourceSchema and to resolveRelation.

HiveUtils

admin阅读(2881)

HiveUtils

HiveUtils is used to create a HiveClientImpl that HiveExternalCatalog uses to interact with a Hive metastore.

Tip

Enable INFO logging level for org.apache.spark.sql.hive.HiveUtils logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating HiveClientImpl — newClientForMetadata Method

  1. Executes the other newClientForMetadata with time configurations formatted

Internally, newClientForMetadata creates a new SQLConf with spark.sql properties only (from the input SparkConf).

newClientForMetadata then creates a IsolatedClientLoader per the input parameters and the following configuration properties:

You should see one of the following INFO messages in the logs:

In the end, newClientForMetadata requests the IsolatedClientLoader to create a HiveClientImpl.

Note
newClientForMetadata is used exclusively when HiveExternalCatalog is requested for a HiveClient.

inferSchema Method

inferSchema…​FIXME

Note
inferSchema is used when…​FIXME

HiveClientImpl — The One and Only HiveClient

admin阅读(1519)

HiveClientImpl — The One and Only HiveClient

HiveClientImpl is the only available HiveClient in Spark SQL that does/uses…​FIXME

HiveClientImpl is created exclusively when IsolatedClientLoader is requested to create a new Hive client. When created, HiveClientImpl is given the location of the default database for the Hive metastore warehouse (i.e. warehouseDir that is the value of hive.metastore.warehouse.dir Hive-specific Hadoop configuration property).

Note
The location of the default database for the Hive metastore warehouse is /user/hive/warehouse by default.
Note
You may be interested in SPARK-19664 put ‘hive.metastore.warehouse.dir’ in hadoopConf place if you use Spark before 2.1 (which you should not really as it is not supported anymore).
Note
The Hadoop configuration is what HiveExternalCatalog was given when created (which is the default Hadoop configuration from Spark Core’s SparkContext.hadoopConfiguration with the Spark properties with spark.hadoop prefix).
Tip

Enable DEBUG logging level for org.apache.spark.sql.hive.client.HiveClientImpl logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

renamePartitions Method

Note
renamePartitions is part of HiveClient Contract to…​FIXME.

renamePartitions…​FIXME

alterPartitions Method

Note
alterPartitions is part of HiveClient Contract to…​FIXME.

alterPartitions…​FIXME

client Internal Method

client…​FIXME

Note
client is used…​FIXME

getPartitions Method

Note
getPartitions is part of HiveClient Contract to…​FIXME.

getPartitions…​FIXME

getPartitionsByFilter Method

Note
getPartitionsByFilter is part of HiveClient Contract to…​FIXME.

getPartitionsByFilter…​FIXME

getPartitionOption Method

Note
getPartitionOption is part of HiveClient Contract to…​FIXME.

getPartitionOption…​FIXME

Creating HiveClientImpl Instance

HiveClientImpl takes the following when created:

  • HiveVersion

  • Location of the default database for the Hive metastore warehouse if defined (aka warehouseDir)

  • SparkConf

  • Hadoop configuration

  • Extra configuration

  • Initial ClassLoader

  • IsolatedClientLoader

HiveClientImpl initializes the internal registries and counters.

Retrieving Table Metadata If Available — getTableOption Method

Note
getTableOption is part of HiveClient Contract to…​FIXME.

When executed, getTableOption prints out the following DEBUG message to the logs:

getTableOption requests Hive client to retrieve the metadata of the table and creates a CatalogTable.

Creating Table Statistics from Hive’s Table or Partition Parameters — readHiveStats Internal Method

readHiveStats creates a CatalogStatistics from the input Hive table or partition parameters (if available and greater than 0).

Table 1. Table Statistics and Hive Parameters
Hive Parameter Table Statistics

totalSize

sizeInBytes

rawDataSize

sizeInBytes

numRows

rowCount

Note
totalSize Hive parameter has a higher precedence over rawDataSize for sizeInBytes table statistic.
Note
readHiveStats is used when HiveClientImpl is requested for the metadata of a table or table partition.

Retrieving Table Partition Metadata (Converting Table Partition Metadata from Hive Format to Spark SQL Format) — fromHivePartition Method

fromHivePartition simply creates a CatalogTablePartition with the following:

Note
fromHivePartition is used when HiveClientImpl is requested for getPartitionOption, getPartitions and getPartitionsByFilter.

Converting Native Table Metadata to Hive’s Table — toHiveTable Method

toHiveTable simply creates a new Hive Table and copies the properties from the input CatalogTable.

Note

toHiveTable is used when:

HiveClient

admin阅读(1543)

HiveClient — Contract for Retrieving Metadata from Hive Metastore

HiveClient is the contract for…​FIXME

Note
HiveClientImpl is the only available HiveClient in Spark SQL.

HiveClient offers safe variants of many methods that do not report exceptions when a relational entity is not found in a Hive metastore, e.g. getTableOption for getTable.

Note
HiveClient is a private[hive] contract.
Table 1. (Subset of) HiveClient Contract
Method Description

alterPartitions

getPartitions

Returns the CatalogTablePartition of a table

Used exclusively when HiveExternalCatalog is requested to list the partitions of a table.

getPartitionsByFilter

Used when…​FIXME

getPartitionOption

Used when…​FIXME

getTableOption

Retrieves a table metadata if available

Used exclusively when HiveClient is requested for a table metadata

Note
getTableOption is a safe version of getTable as it does not throw a NoSuchTableException, but simply returns None.

renamePartitions

Used when…​FIXME

Retrieving Table Metadata If Available or Throwing NoSuchTableException — getTable Method

getTable retrieves the metadata of a table in a Hive metastore if available or reports a NoSuchTableException.

Note

getTable is used when:

HiveFileFormat

admin阅读(1545)

HiveFileFormat — FileFormat For Writing Hive Tables

HiveFileFormat is a FileFormat for writing Hive tables.

HiveFileFormat is a DataSourceRegister and registers itself as hive data source.

Note
Hive data source can only be used with tables and you cannot read or write files of Hive data source directly. Use DataFrameReader.table or DataFrameWriter.saveAsTable for loading from or writing data to Hive data source, respectively.

HiveFileFormat is created exclusively when SaveAsHiveFile is requested to saveAsHiveFile (when InsertIntoHiveDirCommand and InsertIntoHiveTable logical commands are executed).

HiveFileFormat takes a FileSinkDesc when created.

HiveFileFormat throws a UnsupportedOperationException when requested to inferSchema.

Preparing Write Job — prepareWrite Method

Note
prepareWrite is part of the FileFormat Contract to prepare a write job.

prepareWrite sets the mapred.output.format.class property to be the getOutputFileFormatClassName of the Hive TableDesc of the FileSinkDesc.

prepareWrite requests the HiveTableUtil helper object to configureJobPropertiesForStorageHandler.

prepareWrite requests the Hive Utilities helper object to copyTableJobPropertiesToConf.

In the end, prepareWrite creates a new OutputWriterFactory that creates a new HiveOutputWriter when requested for a new OutputWriter instance.

Spark SQL CLI — spark-sql

admin阅读(1312)

Spark SQL CLI — spark-sql

Caution
FIXME
Tip
Read about Spark SQL CLI in Spark’s official documentation in Running the Spark SQL CLI.

Tip
Functions are registered in FunctionRegistry.

Hive Metastore

admin阅读(1131)

Hive Metastore

Spark SQL uses a Hive metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access).

A Hive metastore warehouse (aka spark-warehouse) is the directory where Spark SQL persists tables whereas a Hive metastore (aka metastore_db) is a relational database to manage the metadata of the persistent relational entities, e.g. databases, tables, columns, partitions.

By default, Spark SQL uses the embedded deployment mode of a Hive metastore with a Apache Derby database.

Important

The default embedded deployment mode is not recommended for production use due to limitation of only one active SparkSession at a time.

Read Cloudera’s Configuring the Hive Metastore for CDH document that explains the available deployment modes of a Hive metastore.

When SparkSession is created with Hive support the external catalog (aka metastore) is HiveExternalCatalog. HiveExternalCatalog uses spark.sql.warehouse.dir directory for the location of the databases and javax.jdo.option properties for the connection to the Hive metastore database.

Note

The metadata of relational entities is persisted in a metastore database over JDBC and DataNucleus AccessPlatform that uses javax.jdo.option properties.

Read Hive Metastore Administration to learn how to manage a Hive Metastore.

Table 1. Hive Metastore Database Connection Properties
Name Description

javax.jdo.option.ConnectionURL

The JDBC connection URL of a Hive metastore database to use

javax.jdo.option.ConnectionDriverName

The JDBC driver of a Hive metastore database to use

javax.jdo.option.ConnectionUserName

The user name to use to connect to a Hive metastore database

javax.jdo.option.ConnectionPassword

The password to use to connect to a Hive metastore database

You can configure javax.jdo.option properties in hive-site.xml or using options with spark.hadoop prefix.

You can access the current connection properties for a Hive metastore in a Spark SQL application using the Spark internal classes.

The benefits of using an external Hive metastore:

  1. Allow multiple Spark applications (sessions) to access it concurrently

  2. Allow a single Spark application to use table statistics without running “ANALYZE TABLE” every execution

Note
As of Spark 2.2 (see SPARK-18112 Spark2.x does not support read data from Hive 2.x metastore) Spark SQL supports reading data from Hive 2.1.1 metastore.
Caution
FIXME Describe hive-site.xml vs config method vs --conf with spark.hadoop prefix.

Spark SQL uses the Hive-specific configuration properties that further fine-tune the Hive integration, e.g. spark.sql.hive.metastore.version or spark.sql.hive.metastore.jars.

spark.sql.warehouse.dir Configuration Property

spark.sql.warehouse.dir is a static configuration property that sets Hive’s hive.metastore.warehouse.dir property, i.e. the location of the Hive local/embedded metastore database (using Derby).

Tip

Refer to SharedState to learn about (the low-level details of) Spark SQL support for Apache Hive.

See also the official Hive Metastore Administration document.

Hive Metastore Deployment Modes

Configuring External Hive Metastore in Spark SQL

In order to use an external Hive metastore you should do the following:

  1. Enable Hive support in SparkSession (that makes sure that the Hive classes are on CLASSPATH and sets spark.sql.catalogImplementation internal configuration property to hive)

  2. spark.sql.warehouse.dir required?

  3. Define hive.metastore.warehouse.dir in hive-site.xml configuration resource

  4. Check out warehousePath

  5. Execute ./bin/run-example sql.hive.SparkHiveExample to verify Hive configuration

When not configured by the hive-site.xml, SparkSession automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse.dir, which defaults to the directory spark-warehouse in the current directory that the Spark application is started.

Note

hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Use spark.sql.warehouse.dir to specify the default location of the databases in a Hive warehouse.

You may need to grant write privilege to the user who starts the Spark application.

Hadoop Configuration Properties for Hive

Table 2. Hadoop Configuration Properties for Hive
Name Description

hive.metastore.uris

The Thrift URI of a remote Hive metastore, i.e. one that is in a separate JVM process or on a remote node

hive.metastore.warehouse.dir

SharedState uses hive.metastore.warehouse.dir to set spark.sql.warehouse.dir if the latter is undefined.

Caution
FIXME How is hive.metastore.warehouse.dir related to spark.sql.warehouse.dir? SharedState.warehousePath? Review https://github.com/apache/spark/pull/16996/files

hive.metastore.schema.verification

Set to false (as seems to cause exceptions with an empty metastore database as of Hive 2.1)

You may also want to use the following Hive configuration properties that (seem to) cause exceptions with an empty metastore database as of Hive 2.1.

  • datanucleus.schema.autoCreateAll set to true

spark.hadoop Configuration Properties

Caution
FIXME Describe the purpose of spark.hadoop.* properties

You can specify any of the Hadoop configuration properties, e.g. hive.metastore.warehouse.dir with spark.hadoop prefix.

hive-site.xml Configuration Resource

hive-site.xml configures Hive clients (e.g. Spark SQL) with the Hive Metastore configuration.

hive-site.xml is loaded when SharedState is created (which is…​FIXME).

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration),
and hdfs-site.xml (for HDFS configuration) file in conf/ (that is automatically added to the CLASSPATH of a Spark application).

Tip
You can use --driver-class-path or spark.driver.extraClassPath to point to the directory with configuration resources, e.g. hive-site.xml.

Tip
Read Resources section in Hadoop’s Configuration javadoc to learn more about configuration resources.
Tip

Use SparkContext.hadoopConfiguration to know which configuration resources have already been registered.

Enable org.apache.spark.sql.internal.SharedState logger to INFO logging level to know where hive-site.xml comes from.

Starting Hive

The following steps are for Hive and Hadoop 2.7.5.

Tip
Read the section Pseudo-Distributed Operation about how to run Hadoop HDFS “on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process.”
Tip

Use hadoop.tmp.dir configuration property as the base for temporary directories.

Use ./bin/hdfs getconf -confKey hadoop.tmp.dir to check out the value

  1. Edit etc/hadoop/core-site.xml to add the following:

  2. ./bin/hdfs namenode -format right after you’ve installed Hadoop and before starting any HDFS services (NameNode in particular)

    Note

    Use ./bin/hdfs namenode to start a NameNode that will tell you that the local filesystem is not ready.

  3. Start Hadoop HDFS using ./sbin/start-dfs.sh (and tail -f logs/hadoop-*-datanode-*.log)

  4. Use jps -lm to list Hadoop’s JVM processes.

  5. Create hive-site.xml in $SPARK_HOME/conf with the following:

JdbcUtils Helper Object

admin阅读(1453)

JdbcUtils Helper Object

JdbcUtils is a Scala object with methods to support JDBCRDD, JDBCRelation and JdbcRelationProvider.

Table 1. JdbcUtils API
Name Description

createConnectionFactory

Used when:

createTable

dropTable

getCommonJDBCType

getCustomSchema

Replaces data types in a table schema

Used exclusively when JDBCRelation is created (and customSchema JDBC option was defined)

getInsertStatement

getSchema

Used when JDBCRDD is requested to resolveTable

getSchemaOption

Used when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table

resultSetToRows

Used when…​FIXME

resultSetToSparkInternalRows

Used when JDBCRDD is requested to compute a partition

schemaString

saveTable

tableExists

Used when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table

truncateTable

Used when…​FIXME

createConnectionFactory Method

createConnectionFactory…​FIXME

Note

createConnectionFactory is used when:

getCommonJDBCType Method

getCommonJDBCType…​FIXME

Note
getCommonJDBCType is used when…​FIXME

getCatalystType Internal Method

getCatalystType…​FIXME

Note
getCatalystType is used when…​FIXME

getSchemaOption Method

getSchemaOption…​FIXME

Note
getSchemaOption is used when…​FIXME

getSchema Method

getSchema…​FIXME

Note
getSchema is used when…​FIXME

resultSetToRows Method

resultSetToRows…​FIXME

Note
resultSetToRows is used when…​FIXME

resultSetToSparkInternalRows Method

resultSetToSparkInternalRows…​FIXME

Note
resultSetToSparkInternalRows is used when…​FIXME

schemaString Method

schemaString…​FIXME

Note
schemaString is used exclusively when JdbcUtils is requested to create a table.

parseUserSpecifiedCreateTableColumnTypes Internal Method

parseUserSpecifiedCreateTableColumnTypes…​FIXME

Note
parseUserSpecifiedCreateTableColumnTypes is used exclusively when JdbcUtils is requested to schemaString.

saveTable Method

saveTable takes the url, table, batchSize, isolationLevel options and createConnectionFactory.

saveTable getInsertStatement.

saveTable takes the numPartitions option and applies coalesce operator to the input DataFrame if the number of partitions of its RDD is less than the numPartitions option.

In the end, saveTable requests the possibly-repartitioned DataFrame for its RDD (it may have changed after the coalesce operator) and executes savePartition for every partition (using RDD.foreachPartition).

Note
saveTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.

Replacing Data Types In Table Schema — getCustomSchema Method

getCustomSchema replaces the data type of the fields in the input tableSchema schema that are included in the input customSchema (if defined).

Internally, getCustomSchema branches off per the input customSchema.

If the input customSchema is undefined or empty, getCustomSchema simply returns the input tableSchema unchanged.

Otherwise, if the input customSchema is not empty, getCustomSchema requests CatalystSqlParser to parse it (i.e. create a new StructType for the given customSchema canonical schema representation).

getCustomSchema then uses SchemaUtils to checkColumnNameDuplication (in the column names of the user-defined customSchema schema with the input nameEquality).

In the end, getCustomSchema replaces the data type of the fields in the input tableSchema that are included in the input userSchema.

Note
getCustomSchema is used exclusively when JDBCRelation is created (and customSchema JDBC option was defined).

dropTable Method

dropTable…​FIXME

Note
dropTable is used when…​FIXME

Creating Table Using JDBC — createTable Method

createTable builds the table schema (given the input DataFrame with the url and createTableColumnTypes options).

createTable uses the table and createTableOptions options.

In the end, createTable concatenates all the above texts into a CREATE TABLE

([strSchema]) [createTableOptions] SQL DDL statement followed by executing it (using the input JDBC Connection).

Note
createTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.

getInsertStatement Method

getInsertStatement…​FIXME

Note
getInsertStatement is used when…​FIXME

getJdbcType Internal Method

getJdbcType…​FIXME

Note
getJdbcType is used when…​FIXME

tableExists Method

tableExists…​FIXME

Note
tableExists is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.

truncateTable Method

truncateTable…​FIXME

Note
truncateTable is used exclusively when JdbcRelationProvider is requested to write the rows of a structured query (a DataFrame) to a table.

Saving Rows (Per Partition) to Table — savePartition Method

savePartition creates a JDBC Connection using the input getConnection function.

savePartition tries to set the input isolationLevel if it is different than TRANSACTION_NONE and the database supports transactions.

savePartition then writes rows (in the input Iterator[Row]) using batches that are submitted after batchSize rows where added.

Note
savePartition is used exclusively when JdbcUtils is requested to saveTable.

关注公众号:spark技术分享

联系我们联系我们