SQLContext
Caution
|
As of Spark 2.0.0 |
In the pre-Spark 2.0’s ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from creating an instance of SQLContext.
A SQLContext
object requires a SparkContext
, a CacheManager
, and a SQLListener. They are all transient
and do not participate in serializing a SQLContext.
You should use SQLContext
for the following:
- Creating Datasets
- Creating Dataset[Long] (range method)
- Creating DataFrames
- Creating DataFrames for Table
- Accessing DataFrameReader
- Accessing StreamingQueryManager
- Registering User-Defined Functions (UDF)
- Caching DataFrames in In-Memory Cache
- Setting Configuration Properties
- Bringing Converter Objects into Scope
- Creating External Tables
- Dropping Temporary Tables
- Listing Existing Tables
- Managing Active SQLContext for JVM
- Executing SQL Queries
Creating SQLContext Instance
You can create a SQLContext
using the following constructors:
SQLContext(sc: SparkContext)
SQLContext.getOrCreate(sc: SparkContext)
SQLContext.newSession()
allows for creating a new instance ofSQLContext
with a separate SQL configuration (through a sharedSparkContext
).
Setting Configuration Properties
You can set Spark SQL configuration properties using:
setConf(props: Properties): Unit
setConf(key: String, value: String): Unit
You can get the current value of a configuration property by key using:
getConf(key: String): String
getConf(key: String, defaultValue: String): String
getAllConfs: immutable.Map[String, String]
Note
|
Properties that start with spark.sql are reserved for Spark SQL. |
Creating DataFrames
emptyDataFrame
1 2 3 4 5 |
emptyDataFrame: DataFrame |
emptyDataFrame
creates an empty DataFrame
. It calls createDataFrame
with an empty RDD[Row]
and an empty schema StructType(Nil).
createDataFrame for RDD and Seq
1 2 3 4 5 6 |
createDataFrame[A <: Product](rdd: RDD[A]): DataFrame createDataFrame[A <: Product](data: Seq[A]): DataFrame |
createDataFrame
family of methods can create a DataFrame
from an RDD
of Scala’s Product types like case classes or tuples or Seq
thereof.
createDataFrame for RDD of Row with Explicit Schema
1 2 3 4 5 |
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame |
This variant of createDataFrame
creates a DataFrame
from RDD
of Row and explicit schema.
Registering User-Defined Functions (UDF)
1 2 3 4 5 |
udf: UDFRegistration |
udf
method gives you access to UDFRegistration
to manipulate user-defined functions. Functions registered using udf
are available for Hive queries only.
Tip
|
Read up on UDFs in UDFs — User-Defined Functions document. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
// Create a DataFrame val df = Seq("hello", "world!").zip(0 to 1).toDF("text", "id") // Register the DataFrame as a temporary table in Hive df.registerTempTable("texts") scala> sql("SHOW TABLES").show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | texts| true| +---------+-----------+ scala> sql("SELECT * FROM texts").show +------+---+ | text| id| +------+---+ | hello| 0| |world!| 1| +------+---+ // Just a Scala function val my_upper: String => String = _.toUpperCase // Register the function as UDF spark.udf.register("my_upper", my_upper) scala> sql("SELECT *, my_upper(text) AS MY_UPPER FROM texts").show +------+---+--------+ | text| id|MY_UPPER| +------+---+--------+ | hello| 0| HELLO| |world!| 1| WORLD!| +------+---+--------+ |
Caching DataFrames in In-Memory Cache
1 2 3 4 5 |
isCached(tableName: String): Boolean |
isCached
method asks CacheManager
whether tableName
table is cached in memory or not. It simply requests CacheManager
for CachedData
and when exists, it assumes the table is cached.
1 2 3 4 5 |
cacheTable(tableName: String): Unit |
You can cache a table in memory using cacheTable
.
Caution
|
Why would I want to cache a table? |
1 2 3 4 5 6 |
uncacheTable(tableName: String) clearCache(): Unit |
uncacheTable
and clearCache
remove one or all in-memory cached tables.
Implicits — SQLContext.implicits
The implicits
object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for “primitive” types as well as the collections thereof.
Note
|
Import the implicits by
|
It holds Encoders for Scala “primitive” types like Int
, Double
, String
, and their collections.
It offers support for creating Dataset
from RDD
of any types (for which an encoder exists in scope), or case classes or tuples, and Seq
.
It also offers conversions from Scala’s Symbol
or $
to Column
.
It also offers conversions from RDD
or Seq
of Product
types (e.g. case classes or tuples) to DataFrame
. It has direct conversions from RDD
of Int
, Long
and String
to DataFrame
with a single column name _1
.
Note
|
It is not possible to call toDF methods on RDD objects of other “primitive” types except Int , Long , and String . |
Creating Datasets
1 2 3 4 5 6 |
createDataset[T: Encoder](data: Seq[T]): Dataset[T] createDataset[T: Encoder](data: RDD[T]): Dataset[T] |
createDataset
family of methods creates a Dataset from a collection of elements of type T
, be it a regular Scala Seq
or Spark’s RDD
.
It requires that there is an encoder in scope.
Note
|
Importing SQLContext.implicits brings many encoders available in scope. |
Accessing DataFrameReader (read method)
1 2 3 4 5 |
read: DataFrameReader |
The experimental read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
Creating External Tables
1 2 3 4 5 6 7 8 |
createExternalTable(tableName: String, path: String): DataFrame createExternalTable(tableName: String, path: String, source: String): DataFrame createExternalTable(tableName: String, source: String, options: Map[String, String]): DataFrame createExternalTable(tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame |
The experimental createExternalTable
family of methods is used to create an external table tableName
and return a corresponding DataFrame
.
Caution
|
FIXME What is an external table? |
It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.
Dropping Temporary Tables
1 2 3 4 5 |
dropTempTable(tableName: String): Unit |
dropTempTable
method drops a temporary table tableName
.
Caution
|
FIXME What is a temporary table? |
Creating Dataset[Long] (range method)
1 2 3 4 5 6 7 8 |
range(end: Long): Dataset[Long] range(start: Long, end: Long): Dataset[Long] range(start: Long, end: Long, step: Long): Dataset[Long] range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] |
The range
family of methods creates a Dataset[Long]
with the sole id
column of LongType
for given start
, end
, and step
.
Note
|
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions . |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> spark.range(5) res0: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> .show +---+ | id| +---+ | 0| | 1| | 2| | 3| | 4| +---+ |
Creating DataFrames for Table
1 2 3 4 5 |
table(tableName: String): DataFrame |
table
method creates a tableName
table and returns a corresponding DataFrame
.
Listing Existing Tables
1 2 3 4 5 6 |
tables(): DataFrame tables(databaseName: String): DataFrame |
table
methods return a DataFrame
that holds names of existing tables in a database.
1 2 3 4 5 6 7 8 9 10 11 |
scala> spark.tables.show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | t| true| | t2| true| +---------+-----------+ |
The schema consists of two columns – tableName
of StringType
and isTemporary
of BooleanType
.
Note
|
tables is a result of SHOW TABLES [IN databaseName] . |
1 2 3 4 5 6 |
tableNames(): Array[String] tableNames(databaseName: String): Array[String] |
tableNames
are similar to tables
with the only difference that they return Array[String]
which is a collection of table names.
Accessing StreamingQueryManager
1 2 3 4 5 |
streams: StreamingQueryManager |
The streams
method returns a StreamingQueryManager that is used to…TK
Caution
|
FIXME |
Managing Active SQLContext for JVM
1 2 3 4 5 |
SQLContext.getOrCreate(sparkContext: SparkContext): SQLContext |
SQLContext.getOrCreate
method returns an active SQLContext
object for the JVM or creates a new one using a given sparkContext
.
Note
|
It is a factory-like method that works on SQLContext class. |
Interestingly, there are two helper methods to set and clear the active SQLContext
object – setActive
and clearActive
respectively.
1 2 3 4 5 6 |
setActive(spark: SQLContext): Unit clearActive(): Unit |
Executing SQL Queries
1 2 3 4 5 |
sql(sqlText: String): DataFrame |
sql
executes the sqlText
SQL query.
Note
|
It supports Hive statements through HiveContext. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
scala> sql("set spark.sql.hive.version").show(false) 16/04/10 15:19:36 INFO HiveSqlParser: Parsing command: set spark.sql.hive.version +----------------------+-----+ |key |value| +----------------------+-----+ |spark.sql.hive.version|1.2.1| +----------------------+-----+ scala> sql("describe database extended default").show(false) 16/04/10 15:21:14 INFO HiveSqlParser: Parsing command: describe database extended default +-------------------------+--------------------------+ |database_description_item|database_description_value| +-------------------------+--------------------------+ |Database Name |default | |Description |Default Hive database | |Location |file:/user/hive/warehouse | |Properties | | +-------------------------+--------------------------+ // Create temporary table scala> spark.range(10).registerTempTable("t") 16/04/14 23:34:31 INFO HiveSqlParser: Parsing command: t scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t") 16/04/14 23:34:38 INFO HiveSqlParser: Parsing command: CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t scala> spark.tables.show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | t| true| | t2| true| +---------+-----------+ |
sql
parses sqlText
using a dialect that can be set up using spark.sql.dialect setting.
Note
|
|
Tip
|
You may also use spark-sql shell script to interact with Hive. |
Internally, it uses SessionState.sqlParser.parsePlan(sql)
method to create a LogicalPlan.
Caution
|
FIXME Review |
1 2 3 4 5 6 7 8 9 10 11 |
scala> sql("show tables").show(false) 16/04/09 13:05:32 INFO HiveSqlParser: Parsing command: show tables +---------+-----------+ |tableName|isTemporary| +---------+-----------+ |dafa |false | +---------+-----------+ |
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating New Session
1 2 3 4 5 |
newSession(): SQLContext |
You can use newSession
method to create a new session without a cost of instantiating a new SqlContext from scratch.
newSession
returns a new SqlContext
that shares SparkContext
, CacheManager
, SQLListener, and ExternalCatalog.
Caution
|
FIXME Why would I need that? |