SparkSession — The Entry Point to Spark SQL
SparkSession
is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.
As a Spark developer, you create a SparkSession
using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .appName("My Spark Application") // optional and will be autogenerated if not specified .master("local[*]") // only for demo and testing purposes, use spark-submit instead .enableHiveSupport() // self-explanatory, isn't it? .config("spark.sql.warehouse.dir", "target/spark-warehouse") .withExtensions { extensions => extensions.injectResolutionRule { session => ... } extensions.injectOptimizerRule { session => ... } } .getOrCreate |
Once created, SparkSession
allows for creating a DataFrame (based on an RDD or a Scala Seq
), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).
You can enable Apache Hive support with support for an external Hive metastore.
Note
|
In order to disable the pre-configured Hive support in the
|
You can have as many SparkSessions
as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession
.
In the end, you stop a SparkSession
using SparkSession.stop method.
1 2 3 4 5 |
spark.stop |
Method | Description | ||
---|---|---|---|
|
(New in 2.4.0) |
||
Object method to create a Builder to get the current |
|||
Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views. |
|||
Object method |
|||
Object method |
|||
|
|||
Access to the current runtime configuration |
|||
|
|||
|
|||
|
|||
|
|||
Access to the current ExperimentalMethods |
|||
Object method |
|||
Object method |
|||
|
|||
Access to the current ExecutionListenerManager |
|||
Creates a new |
|||
Creates a |
|||
Access to the current DataFrameReader to load data from external data sources |
|||
Access to the current SessionState Internally,
|
|||
Object method |
|||
Object method |
|||
Access to the current SharedState |
|||
Access to the underlying |
|||
“Executes” a SQL query |
|||
|
Access to the underlying SQLContext |
||
Stops the associated SparkContext |
|||
Loads data from a table |
|||
Executes a code block and prints out (to standard output) the time taken to execute it |
|||
Access to the current UDFRegistration |
|||
Returns the version of Apache Spark |
Note
|
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.
|
Creating SparkSession Using Builder Pattern — builder
Object Method
1 2 3 4 5 |
builder(): Builder |
builder
creates a new Builder that you use to build a fully-configured SparkSession
using a fluent API.
1 2 3 4 5 6 |
import org.apache.spark.sql.SparkSession val builder = SparkSession.builder |
Tip
|
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia. |
Accessing Version of Spark — version
Method
1 2 3 4 5 |
version: String |
version
returns the version of Apache Spark in use.
Internally, version
uses spark.SPARK_VERSION
value that is the version
property in spark-version-info.properties
properties file on CLASSPATH.
Creating Empty Dataset (Given Encoder) — emptyDataset
Operator
1 2 3 4 5 |
emptyDataset[T: Encoder]: Dataset[T] |
emptyDataset
creates an empty Dataset (assuming that future records being of type T
).
1 2 3 4 5 6 7 8 9 10 |
scala> val strings = spark.emptyDataset[String] strings: org.apache.spark.sql.Dataset[String] = [value: string] scala> strings.printSchema root |-- value: string (nullable = true) |
emptyDataset
creates a LocalRelation
logical query plan.
Creating Dataset from Local Collections or RDDs — createDataset
methods
1 2 3 4 5 6 |
createDataset[T : Encoder](data: RDD[T]): Dataset[T] createDataset[T : Encoder](data: Seq[T]): Dataset[T] |
createDataset
is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T]
, Java’s List[T]
, or a distributed RDD[T]
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
scala> val one = spark.createDataset(Seq(1)) one: org.apache.spark.sql.Dataset[Int] = [value: int] scala> one.show +-----+ |value| +-----+ | 1| +-----+ |
createDataset
creates a LocalRelation (for the input data
collection) or LogicalRDD (for the input RDD[T]
) logical operators.
Tip
|
You may want to consider implicits object and
|
Internally, createDataset
first looks up the implicit expression encoder in scope to access the AttributeReference
s (of the schema).
Note
|
Only unresolved expression encoders are currently supported. |
The expression encoder is then used to map elements (of the input Seq[T]
) into a collection of InternalRows. With the references and rows, createDataset
returns a Dataset with a LocalRelation
logical query plan.
Creating Dataset With Single Long Column — range
Operator
1 2 3 4 5 6 7 8 |
range(end: Long): Dataset[java.lang.Long] range(start: Long, end: Long): Dataset[java.lang.Long] range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] |
range
family of methods create a Dataset of Long
numbers.
1 2 3 4 5 6 7 8 9 10 11 |
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show +---+ | id| +---+ | 0| | 2| +---+ |
Note
|
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions .
|
Internally, range
creates a new Dataset[Long]
with Range
logical plan and Encoders.LONG
encoder.
Creating Empty DataFrame — emptyDataFrame
method
1 2 3 4 5 |
emptyDataFrame: DataFrame |
emptyDataFrame
creates an empty DataFrame
(with no rows and columns).
It calls createDataFrame with an empty RDD[Row]
and an empty schema StructType(Nil).
Creating DataFrames from Local Collections or RDDs — createDataFrame
Method
1 2 3 4 5 6 7 8 9 10 |
createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame // private[sql] createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean): DataFrame |
createDataFrame
creates a DataFrame
using RDD[Row]
and the input schema
. It is assumed that the rows in rowRDD
all match the schema
.
Caution
|
FIXME |
Executing SQL Queries (aka SQL Mode) — sql
Method
1 2 3 4 5 |
sql(sqlText: String): DataFrame |
sql
executes the sqlText
SQL statement and creates a DataFrame.
Note
|
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
scala> sql("SHOW TABLES") res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean] scala> sql("DROP TABLE IF EXISTS testData") res1: org.apache.spark.sql.DataFrame = [] // Let's create a table to SHOW it spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData") scala> sql("SHOW TABLES").show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | testdata| false| +---------+-----------+ |
Internally, sql
requests the current ParserInterface
to execute a SQL query that gives a LogicalPlan.
Note
|
sql uses SessionState to access the current ParserInterface .
|
sql
then creates a DataFrame using the current SparkSession
(itself) and the LogicalPlan.
Tip
|
spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).
|
Accessing UDFRegistration — udf
Attribute
1 2 3 4 5 |
udf: UDFRegistration |
udf
attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
val spark: SparkSession = ... spark.udf.register("myUpper", (s: String) => s.toUpperCase) val strs = ('a' to 'c').map(_.toString).toDS strs.registerTempTable("strs") scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show +-----+-----+ |value|UPPER| +-----+-----+ | a| A| | b| B| | c| C| +-----+-----+ |
Internally, it is simply an alias for SessionState.udfRegistration.
Loading Data From Table — table
Method
1 2 3 4 5 6 7 |
table(tableName: String): DataFrame (1) // private[sql] table(tableIdent: TableIdentifier): DataFrame |
-
Parses
tableName
to aTableIdentifier
and calls the othertable
table
creates a DataFrame (wrapper) from the input tableName
table (but only if available in the session catalog).
1 2 3 4 5 6 7 8 9 10 |
scala> spark.catalog.tableExists("t1") res1: Boolean = true // t1 exists in the catalog // let's load it val t1 = spark.table("t1") |
Accessing Metastore — catalog
Attribute
1 2 3 4 5 |
catalog: Catalog |
catalog
attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).
Tip
|
All methods in Catalog return Datasets .
|
1 2 3 4 5 6 7 8 9 10 11 |
scala> spark.catalog.listTables.show +------------------+--------+-----------+---------+-----------+ | name|database|description|tableType|isTemporary| +------------------+--------+-----------+---------+-----------+ |my_permanent_table| default| null| MANAGED| false| | strs| null| null|TEMPORARY| true| +------------------+--------+-----------+---------+-----------+ |
Internally, catalog
creates a CatalogImpl (that uses the current SparkSession
).
Accessing DataFrameReader — read
method
1 2 3 4 5 |
read: DataFrameReader |
read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
1 2 3 4 5 6 |
val spark: SparkSession = // create instance val dfReader: DataFrameReader = spark.read |
Getting Runtime Configuration — conf
Attribute
1 2 3 4 5 |
conf: RuntimeConfig |
conf
returns the current RuntimeConfig.
Internally, conf
creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.
streams
Attribute
1 2 3 4 5 |
streams: StreamingQueryManager |
streams
attribute gives access to StreamingQueryManager (through SessionState).
1 2 3 4 5 6 |
val spark: SparkSession = ... spark.streams.active.foreach(println) |
experimentalMethods
Attribute
1 2 3 4 5 |
experimental: ExperimentalMethods |
experimentalMethods
is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]
s.
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
Creating SparkSession Instance — newSession
method
1 2 3 4 5 |
newSession(): SparkSession |
newSession
creates (starts) a new SparkSession
(with the current SparkContext and SharedState).
1 2 3 4 5 6 |
scala> val newSession = spark.newSession newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a |
Stopping SparkSession — stop
Method
1 2 3 4 5 |
stop(): Unit |
stop
stops the SparkSession
, i.e. stops the underlying SparkContext
.
Create DataFrame from BaseRelation — baseRelationToDataFrame
Method
1 2 3 4 5 |
baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame |
Internally, baseRelationToDataFrame
creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.
Note
|
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
|
Note
|
|
Creating SessionState Instance — instantiateSessionState
Internal Method
1 2 3 4 5 |
instantiateSessionState(className: String, sparkSession: SparkSession): SessionState |
instantiateSessionState
finds the className
that is then used to create and build a BaseSessionStateBuilder
.
instantiateSessionState
may report an IllegalArgumentException
while instantiating the class of a SessionState
:
1 2 3 4 5 |
Error while instantiating '[className]' |
Note
|
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).
|
sessionStateClassName
Internal Method
1 2 3 4 5 |
sessionStateClassName(conf: SparkConf): String |
sessionStateClassName
gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.
Note
|
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).
|
Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame
Internal Method
1 2 3 4 5 6 7 8 |
internalCreateDataFrame( catalystRows: RDD[InternalRow], schema: StructType, isStreaming: Boolean = false): DataFrame |
internalCreateDataFrame
creates a DataFrame with a LogicalRDD.
Note
|
|
Creating SparkSession Instance
SparkSession
takes the following when created:
-
Optional SharedState
-
Optional SessionState
Accessing ExperimentalMethods — experimental
Method
1 2 3 4 5 |
experimental: ExperimentalMethods |
experimental
…FIXME
getDefaultSession
Object Method
1 2 3 4 5 |
getDefaultSession: Option[SparkSession] |
getDefaultSession
…FIXME
Accessing ExecutionListenerManager — listenerManager
Method
1 2 3 4 5 |
listenerManager: ExecutionListenerManager |
listenerManager
…FIXME
Accessing SessionState — sessionState
Lazy Attribute
1 2 3 4 5 |
sessionState: SessionState |
sessionState
…FIXME
setActiveSession
Object Method
1 2 3 4 5 |
setActiveSession(session: SparkSession): Unit |
setActiveSession
…FIXME