SparkSession — The Entry Point to Spark SQL
SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.
As a Spark developer, you create a SparkSession using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .appName("My Spark Application") // optional and will be autogenerated if not specified .master("local[*]") // only for demo and testing purposes, use spark-submit instead .enableHiveSupport() // self-explanatory, isn't it? .config("spark.sql.warehouse.dir", "target/spark-warehouse") .withExtensions { extensions => extensions.injectResolutionRule { session => ... } extensions.injectOptimizerRule { session => ... } } .getOrCreate |
Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala Seq), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).
You can enable Apache Hive support with support for an external Hive metastore.
|
Note
|
In order to disable the pre-configured Hive support in the
|
You can have as many SparkSessions as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession.
In the end, you stop a SparkSession using SparkSession.stop method.
|
1 2 3 4 5 |
spark.stop |
| Method | Description | ||
|---|---|---|---|
|
|
(New in 2.4.0) |
||
Object method to create a Builder to get the current |
|||
Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views. |
|||
Object method |
|||
Object method |
|||
|
|||
Access to the current runtime configuration |
|||
|
|||
|
|||
|
|||
|
|||
Access to the current ExperimentalMethods |
|||
Object method |
|||
Object method |
|||
|
|||
Access to the current ExecutionListenerManager |
|||
Creates a new |
|||
Creates a |
|||
Access to the current DataFrameReader to load data from external data sources |
|||
Access to the current SessionState Internally,
|
|||
Object method |
|||
Object method |
|||
Access to the current SharedState |
|||
Access to the underlying |
|||
“Executes” a SQL query |
|||
|
|
Access to the underlying SQLContext |
||
Stops the associated SparkContext |
|||
Loads data from a table |
|||
Executes a code block and prints out (to standard output) the time taken to execute it |
|||
Access to the current UDFRegistration |
|||
Returns the version of Apache Spark |
|
Note
|
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.
|
Creating SparkSession Using Builder Pattern — builder Object Method
|
1 2 3 4 5 |
builder(): Builder |
builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.
|
1 2 3 4 5 6 |
import org.apache.spark.sql.SparkSession val builder = SparkSession.builder |
|
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version Method
|
1 2 3 4 5 |
version: String |
version returns the version of Apache Spark in use.
Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder) — emptyDataset Operator
|
1 2 3 4 5 |
emptyDataset[T: Encoder]: Dataset[T] |
emptyDataset creates an empty Dataset (assuming that future records being of type T).
|
1 2 3 4 5 6 7 8 9 10 |
scala> val strings = spark.emptyDataset[String] strings: org.apache.spark.sql.Dataset[String] = [value: string] scala> strings.printSchema root |-- value: string (nullable = true) |
emptyDataset creates a LocalRelation logical query plan.
Creating Dataset from Local Collections or RDDs — createDataset methods
|
1 2 3 4 5 6 |
createDataset[T : Encoder](data: RDD[T]): Dataset[T] createDataset[T : Encoder](data: Seq[T]): Dataset[T] |
createDataset is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
scala> val one = spark.createDataset(Seq(1)) one: org.apache.spark.sql.Dataset[Int] = [value: int] scala> one.show +-----+ |value| +-----+ | 1| +-----+ |
createDataset creates a LocalRelation (for the input data collection) or LogicalRDD (for the input RDD[T]) logical operators.
|
Tip
|
You may want to consider implicits object and
|
Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).
|
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.
Creating Dataset With Single Long Column — range Operator
|
1 2 3 4 5 6 7 8 |
range(end: Long): Dataset[java.lang.Long] range(start: Long, end: Long): Dataset[java.lang.Long] range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] |
range family of methods create a Dataset of Long numbers.
|
1 2 3 4 5 6 7 8 9 10 11 |
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show +---+ | id| +---+ | 0| | 2| +---+ |
|
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.
|
Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.
Creating Empty DataFrame — emptyDataFrame method
|
1 2 3 4 5 |
emptyDataFrame: DataFrame |
emptyDataFrame creates an empty DataFrame (with no rows and columns).
It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).
Creating DataFrames from Local Collections or RDDs — createDataFrame Method
|
1 2 3 4 5 6 7 8 9 10 |
createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame // private[sql] createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame |
createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.
|
Caution
|
FIXME |
Executing SQL Queries (aka SQL Mode) — sql Method
|
1 2 3 4 5 |
sql(sqlText: String): DataFrame |
sql executes the sqlText SQL statement and creates a DataFrame.
|
Note
|
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
scala> sql("SHOW TABLES") res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean] scala> sql("DROP TABLE IF EXISTS testData") res1: org.apache.spark.sql.DataFrame = [] // Let's create a table to SHOW it spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData") scala> sql("SHOW TABLES").show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | testdata| false| +---------+-----------+ |
Internally, sql requests the current ParserInterface to execute a SQL query that gives a LogicalPlan.
|
Note
|
sql uses SessionState to access the current ParserInterface.
|
sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.
|
Tip
|
spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).
|
Accessing UDFRegistration — udf Attribute
|
1 2 3 4 5 |
udf: UDFRegistration |
udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
val spark: SparkSession = ... spark.udf.register("myUpper", (s: String) => s.toUpperCase) val strs = ('a' to 'c').map(_.toString).toDS strs.registerTempTable("strs") scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show +-----+-----+ |value|UPPER| +-----+-----+ | a| A| | b| B| | c| C| +-----+-----+ |
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table — table Method
|
1 2 3 4 5 6 7 |
table(tableName: String): DataFrame (1) // private[sql] table(tableIdent: TableIdentifier): DataFrame |
-
Parses
tableNameto aTableIdentifierand calls the othertable
table creates a DataFrame (wrapper) from the input tableName table (but only if available in the session catalog).
|
1 2 3 4 5 6 7 8 9 10 |
scala> spark.catalog.tableExists("t1") res1: Boolean = true // t1 exists in the catalog // let's load it val t1 = spark.table("t1") |
Accessing Metastore — catalog Attribute
|
1 2 3 4 5 |
catalog: Catalog |
catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).
|
Tip
|
All methods in Catalog return Datasets.
|
|
1 2 3 4 5 6 7 8 9 10 11 |
scala> spark.catalog.listTables.show +------------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +------------------+--------+-----------+---------+-----------+ |my_permanent_table| default| null| MANAGED| false| | strs| null| null|TEMPORARY| true| +------------------+--------+-----------+---------+-----------+ |
Internally, catalog creates a CatalogImpl (that uses the current SparkSession).
Accessing DataFrameReader — read method
|
1 2 3 4 5 |
read: DataFrameReader |
read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.
|
1 2 3 4 5 6 |
val spark: SparkSession = // create instance val dfReader: DataFrameReader = spark.read |
Getting Runtime Configuration — conf Attribute
|
1 2 3 4 5 |
conf: RuntimeConfig |
conf returns the current RuntimeConfig.
Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.
streams Attribute
|
1 2 3 4 5 |
streams: StreamingQueryManager |
streams attribute gives access to StreamingQueryManager (through SessionState).
|
1 2 3 4 5 6 |
val spark: SparkSession = ... spark.streams.active.foreach(println) |
experimentalMethods Attribute
|
1 2 3 4 5 |
experimental: ExperimentalMethods |
experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.
|
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
Creating SparkSession Instance — newSession method
|
1 2 3 4 5 |
newSession(): SparkSession |
newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).
|
1 2 3 4 5 6 |
scala> val newSession = spark.newSession newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a |
Stopping SparkSession — stop Method
|
1 2 3 4 5 |
stop(): Unit |
stop stops the SparkSession, i.e. stops the underlying SparkContext.
Create DataFrame from BaseRelation — baseRelationToDataFrame Method
|
1 2 3 4 5 |
baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame |
Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
|
Note
|
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
|
|
Note
|
|
Creating SessionState Instance — instantiateSessionState Internal Method
|
1 2 3 4 5 |
instantiateSessionState(className: String, sparkSession: SparkSession): SessionState |
instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.
instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:
|
1 2 3 4 5 |
Error while instantiating '[className]' |
|
Note
|
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).
|
sessionStateClassName Internal Method
|
1 2 3 4 5 |
sessionStateClassName(conf: SparkConf): String |
sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
|
Note
|
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).
|
Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame Internal Method
|
1 2 3 4 5 6 7 8 |
internalCreateDataFrame( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame |
internalCreateDataFrame creates a DataFrame with a LogicalRDD.
|
Note
|
|
Creating SparkSession Instance
SparkSession takes the following when created:
-
Optional SharedState
-
Optional SessionState
Accessing ExperimentalMethods — experimental Method
|
1 2 3 4 5 |
experimental: ExperimentalMethods |
experimental…FIXME
getDefaultSession Object Method
|
1 2 3 4 5 |
getDefaultSession: Option[SparkSession] |
getDefaultSession…FIXME
Accessing ExecutionListenerManager — listenerManager Method
|
1 2 3 4 5 |
listenerManager: ExecutionListenerManager |
listenerManager…FIXME
Accessing SessionState — sessionState Lazy Attribute
|
1 2 3 4 5 |
sessionState: SessionState |
sessionState…FIXME
setActiveSession Object Method
|
1 2 3 4 5 |
setActiveSession(session: SparkSession): Unit |
setActiveSession…FIXME
spark技术分享