关注 spark技术分享,
撸spark源码 玩spark最佳实践

SQLExecution Helper Object

admin阅读(3350)

SQLExecution Helper Object

SQLExecution defines spark.sql.execution.id Spark property that is used to track multiple Spark jobs that should all together constitute a single structured query execution (that could be easily reported as a single execution unit).

Structured query actions are executed using SQLExecution.withNewExecutionId static method that sets spark.sql.execution.id as Spark Core’s local property and “stitches” different Spark jobs as parts of one structured query action (that you can then see in web UI’s SQL tab).

Tip

Use SparkListener to listen to SparkListenerSQLExecutionStart events and know the execution ids of structured queries that have been executed in a Spark SQL application.

Note
Jobs without spark.sql.execution.id key are not considered to belong to SQL query executions.

SQLExecution keeps track of all execution ids and their QueryExecutions in executionIdToQueryExecution internal registry.

Tip
Use SQLExecution.getQueryExecution to find the QueryExecution for an execution id.

Executing Dataset Action (with Zero or More Spark Jobs) Under New Execution Id — withNewExecutionId Method

withNewExecutionId executes body query action with a new execution id (given as the input executionId or auto-generated) so that all Spark jobs that have been scheduled by the query action could be marked as parts of the same Dataset action execution.

withNewExecutionId allows for collecting all the Spark jobs (even executed on separate threads) together under a single SQL query execution for reporting purposes, e.g. to reporting them as one single structured query in web UI.

Note
If there is another execution id already set, it is replaced for the course of the current action.

In addition, the QueryExecution variant posts SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events (to LiveListenerBus event bus) before and after executing the body action, respectively. It is used to inform SQLListener when a SQL query execution starts and ends.

Note
Nested execution ids are not supported in the QueryExecution variant.
Note

withNewExecutionId is used when:

  • Dataset is requested to Dataset.withNewExecutionId

  • Dataset is requested to withAction

  • DataFrameWriter is requested to run a command

  • Spark Structured Streaming’s StreamExecution commits a batch to a streaming sink

  • Spark Thrift Server’s SparkSQLDriver runs a command

Finding QueryExecution for Execution ID — getQueryExecution Method

getQueryExecution gives the QueryExecution for the executionId or null if not found.

Executing Action (with Zero or More Spark Jobs) Tracked Under Given Execution Id — withExecutionId Method

withExecutionId executes the body action as part of executing multiple Spark jobs under executionId execution identifier.

Note

withExecutionId is used when:

SparkSQLEnv

admin阅读(941)

SparkSQLEnv

Caution
FIXME

Thrift JDBC/ODBC Server — Spark Thrift Server (STS)

admin阅读(1869)

Thrift JDBC/ODBC Server — Spark Thrift Server (STS)

Thrift JDBC/ODBC Server (aka Spark Thrift Server or STS) is Spark SQL’s port of Apache Hive’s HiveServer2 that allows JDBC/ODBC clients to execute SQL queries over JDBC and ODBC protocols on Apache Spark.

With Spark Thrift Server, business users can work with their shiny Business Intelligence (BI) tools, e.g. Tableau or Microsoft Excel, and connect to Apache Spark using the ODBC interface. That brings the in-memory distributed capabilities of Spark SQL’s query engine (with all the Catalyst query optimizations you surely like very much) to environments that were initially “disconnected”.

Beside, SQL queries in Spark Thrift Server share the same SparkContext that helps further improve performance of SQL queries using the same data sources.

Spark Thrift Server is a Spark standalone application that you start using start-thriftserver.sh and stop using stop-thriftserver.sh shell scripts.

Spark Thrift Server has its own tab in web UI — JDBC/ODBC Server available at /sqlserver URL.

spark thriftserver webui.png
Figure 1. Spark Thrift Server’s web UI

Spark Thrift Server can work in HTTP or binary transport modes.

Use beeline command-line tool or SQuirreL SQL Client or Spark SQL’s DataSource API to connect to Spark Thrift Server through the JDBC interface.

Spark Thrift Server extends spark-submit‘s command-line options with --hiveconf [prop=value].

Important

You have to enable hive-thriftserver build profile to include Spark Thrift Server in your build.

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.hive.thriftserver and org.apache.hive.service.server loggers to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Starting Thrift JDBC/ODBC Server — start-thriftserver.sh

You can start Thrift JDBC/ODBC Server using ./sbin/start-thriftserver.sh shell script.

With INFO logging level enabled, when you execute the script you should see the following INFO messages in the logs:

Internally, start-thriftserver.sh script submits org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 standalone application for execution (using spark-submit).

Tip
Using the more explicit approach with spark-submit to start Spark Thrift Server could be easier to trace execution by seeing the logs printed out to the standard output and hence terminal directly.

Using Beeline JDBC Client to Connect to Spark Thrift Server

beeline is a command-line tool that allows you to access Spark Thrift Server using the JDBC interface on command line. It is included in the Spark distribution in bin directory.

You can connect to Spark Thrift Server using connect command as follows:

When connecting in non-secure mode, simply enter the username on your machine and a blank password.

Once connected, you can send SQL queries (as if Spark SQL were a JDBC-compliant database).

Connecting to Spark Thrift Server using SQuirreL SQL Client 3.7.1

Spark Thrift Server allows for remote access to Spark SQL using JDBC protocol.

Note
This section was tested with SQuirreL SQL Client 3.7.1 (squirrelsql-3.7.1-standard.zip) on Mac OS X.

SQuirreL SQL Client is a Java SQL client for JDBC-compliant databases.

Run the client using java -jar squirrel-sql.jar.

spark thriftserver squirrel.png
Figure 2. SQuirreL SQL Client

You first have to configure a JDBC driver for Spark Thrift Server. Spark Thrift Server uses org.spark-project.hive:hive-jdbc:1.2.1.spark2 dependency that is the JDBC driver (that also downloads transitive dependencies).

Tip
The Hive JDBC Driver, i.e. hive-jdbc-1.2.1.spark2.jar and other jar files are in jars directory of the Apache Spark distribution (or assembly/target/scala-2.11/jars for local builds).
Table 1. SQuirreL SQL Client’s Connection Parameters
Parameter Description

Name

Spark Thrift Server

Example URL

jdbc:hive2://localhost:10000

Extra Class Path

All the jar files of your Spark distribution

Class Name

org.apache.hive.jdbc.HiveDriver

spark thriftserver squirrel adddriver.png
Figure 3. Adding Hive JDBC Driver in SQuirreL SQL Client

With the Hive JDBC Driver defined, you can connect to Spark SQL Thrift Server.

spark thriftserver squirrel addalias.png
Figure 4. Adding Hive JDBC Driver in SQuirreL SQL Client

Since you did not specify the database to use, Spark SQL’s default is used.

spark thriftserver squirrel metadata.png
Figure 5. SQuirreL SQL Client Connected to Spark Thrift Server (Metadata Tab)

Below is show tables SQL query in SQuirrel SQL Client executed in Spark SQL through Spark Thrift Server.

spark thriftserver squirrel show tables.png
Figure 6. show tables SQL Query in SQuirrel SQL Client using Spark Thrift Server

Using Spark SQL’s DataSource API to Connect to Spark Thrift Server

What might seem a quite artificial setup at first is accessing Spark Thrift Server using Spark SQL’s DataSource API, i.e. DataFrameReader‘s jdbc method.

Tip

When executed in local mode, Spark Thrift Server and spark-shell will try to access the same Hive Warehouse’s directory that will inevitably lead to an error.

Use spark.sql.warehouse.dir to point to another directory for spark-shell.

You should also not share the same home directory between them since metastore_db becomes an issue.

  1. Connect to Spark Thrift Server at localhost on port 10000

  2. Use people table. It assumes that people table is available.

ThriftServerTab — web UI’s Tab for Spark Thrift Server

ThriftServerTab is…​FIXME

Caution
FIXME Elaborate

Stopping Thrift JDBC/ODBC Server — stop-thriftserver.sh

You can stop a running instance of Thrift JDBC/ODBC Server using ./sbin/stop-thriftserver.sh shell script.

With DEBUG logging level enabled, you should see the following messages in the logs:

Tip
You can also send SIGTERM signal to the process of Thrift JDBC/ODBC Server, i.e. kill [PID] that triggers the same sequence of shutdown steps as stop-thriftserver.sh.

Transport Mode

Spark Thrift Server can be configured to listen in two modes (aka transport modes):

  1. Binary mode — clients should send thrift requests in binary

  2. HTTP mode — clients send thrift requests over HTTP.

You can control the transport modes using
HIVE_SERVER2_TRANSPORT_MODE=http or hive.server2.transport.mode (default: binary). It can be binary (default) or http.

main method

Thrift JDBC/ODBC Server is a Spark standalone application that you…​

Caution
FIXME

HiveThriftServer2Listener

Caution
FIXME

SparkSqlParser — Default SQL Parser

admin阅读(1929)

SparkSqlParser — Default SQL Parser

SparkSqlParser is the default SQL parser of the SQL statements supported in Spark SQL.

SparkSqlParser supports variable substitution.

SparkSqlParser uses SparkSqlAstBuilder (as AstBuilder).

Note
Spark SQL supports SQL statements as described in SqlBase.g4 ANTLR grammar.

SparkSqlParser is available as sqlParser of a SessionState.

SparkSqlParser is used to translate an expression to the corresponding Column in the following:

SparkSqlParser is used to parse table strings into their corresponding table identifiers in the following:

SparkSqlParser is used to translate a SQL text to its corresponding logical operator in SparkSession.sql method.

Tip

Enable INFO logging level for org.apache.spark.sql.execution.SparkSqlParser logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

Variable Substitution

Caution
FIXME See SparkSqlParser and substitutor.

SparkSqlAstBuilder

admin阅读(2929)

SparkSqlAstBuilder

SparkSqlAstBuilder is an AstBuilder that converts valid Spark SQL statements into Catalyst expressions, logical plans or table identifiers (using visit callback methods).

Note
Spark SQL uses ANTLR parser generator for parsing structured text.

SparkSqlAstBuilder is created exclusively when SparkSqlParser is created (which is when SparkSession is requested for the lazily-created SessionState).

spark sql SparkSqlAstBuilder.png
Figure 1. Creating SparkSqlAstBuilder

SparkSqlAstBuilder takes a SQLConf when created.

Note

SparkSqlAstBuilder can also be temporarily created for expr standard function (to create column expressions).

Table 1. SparkSqlAstBuilder’s Visit Callback Methods
Callback Method ANTLR rule / labeled alternative Spark SQL Entity

visitAnalyze

#analyze

  • AnalyzeColumnCommand logical command for ANALYZE TABLE with FOR COLUMNS clause (but no PARTITION specification)

  • AnalyzePartitionCommand logical command for ANALYZE TABLE with PARTITION specification (but no FOR COLUMNS clause)

  • AnalyzeTableCommand logical command for ANALYZE TABLE with neither PARTITION specification nor FOR COLUMNS clause

Note

visitAnalyze supports NOSCAN identifier only (and reports a ParseException if not used).

NOSCAN is used for AnalyzePartitionCommand and AnalyzeTableCommand logical commands only.

visitBucketSpec

#bucketSpec

visitCacheTable

#cacheTable

visitCreateHiveTable

#createHiveTable

CreateTable

visitCreateTable

#createTable

visitCreateView

#createView

CreateViewCommand for CREATE VIEW AS SQL statement

visitCreateTempViewUsing

#createTempViewUsing

CreateTempViewUsing for CREATE TEMPORARY VIEW … USING

visitDescribeTable

#describeTable

  • DescribeColumnCommand logical command for DESCRIBE TABLE with a single column only (i.e. no PARTITION specification).

  • DescribeTableCommand logical command for all other variants of DESCRIBE TABLE (i.e. no column)

visitInsertOverwriteHiveDir

#insertOverwriteHiveDir

visitShowCreateTable

#showCreateTable

ShowCreateTableCommand logical command for SHOW CREATE TABLE SQL statement

Table 2. SparkSqlAstBuilder’s Parsing Handlers
Parsing Handler LogicalPlan Added

withRepartitionByExpression

ParserInterface — SQL Parser Contract

admin阅读(4631)

ParserInterface — SQL Parser Contract

ParserInterface is the contract of SQL parsers that can parse Expressions, LogicalPlans, TableIdentifiers, and StructTypes given the textual representation of SQL statements.

Note
AbstractSqlParser is the one and only known extension of the ParserInterface Contract in Spark SQL.

ParserInterface is available as sqlParser in SessionState.

Table 1. ParserInterface Contract
Method Description

parseExpression

Parses a SQL text to an Expression

Used in the following:

parsePlan

Parses a SQL text to a LogicalPlan

Used when:

parseTableIdentifier

Parses a SQL text to a TableIdentifier

Used when:

parseTableSchema

Parses a SQL text to a StructType

Used when:

CatalystSqlParser — DataTypes and StructTypes Parser

admin阅读(2483)

CatalystSqlParser — DataTypes and StructTypes Parser

CatalystSqlParser is a AbstractSqlParser with AstBuilder as the required astBuilder.

CatalystSqlParser is used to translate DataTypes from their canonical string representation (e.g. when adding fields to a schema or casting column to a different data type) or StructTypes.

When parsing, you should see INFO messages in the logs:

It is also used in HiveClientImpl (when converting columns from Hive to Spark) and in OrcFileOperator (when inferring the schema for ORC files).

Tip

Enable INFO logging level for org.apache.spark.sql.catalyst.parser.CatalystSqlParser logger to see what happens inside.

Add the following line to conf/log4j.properties:

Refer to Logging.

AstBuilder — ANTLR-based SQL Parser

admin阅读(2587)

AstBuilder — ANTLR-based SQL Parser

AstBuilder converts SQL statements into Spark SQL’s relational entities (i.e. data types, Catalyst expressions, logical plans or TableIdentifiers) using visit callback methods.

AstBuilder is the AST builder of AbstractSqlParser (i.e. the base SQL parsing infrastructure in Spark SQL).

Tip

Spark SQL supports SQL statements as described in SqlBase.g4. Using the file can tell you (almost) exactly what Spark SQL supports at any given time.

“Almost” being that although the grammar accepts a SQL statement it can be reported as not allowed by AstBuilder, e.g.

AstBuilder is a ANTLR AbstractParseTreeVisitor (as SqlBaseBaseVisitor) that is generated from SqlBase.g4 ANTLR grammar for Spark SQL.

Note

SqlBaseBaseVisitor is a ANTLR-specific base class that is auto-generated at build time from a ANTLR grammar in SqlBase.g4.

SqlBaseBaseVisitor is an ANTLR AbstractParseTreeVisitor.

Table 1. AstBuilder’s Visit Callback Methods
Callback Method ANTLR rule / labeled alternative Spark SQL Entity

visitAliasedQuery

visitColumnReference

visitDereference

visitExists

#exists labeled alternative

Exists expression

visitExplain

explain rule

Note

Can be a OneRowRelation for an EXPLAIN for an unexplainable DescribeTableCommand logical command as created from DESCRIBE TABLE SQL statement.

visitFirst

#first labeled alternative

First aggregate function expression

visitFromClause

fromClause

Supports multiple comma-separated relations (that all together build a condition-less INNER JOIN) with optional LATERAL VIEW.

A relation can be one of the following or a combination thereof:

  • Table identifier

  • Inline table using VALUES exprs AS tableIdent

  • Table-valued function (currently only range is supported)

visitFunctionCall

functionCall labeled alternative

Tip
See the function examples below.

visitInlineTable

inlineTable rule

UnresolvedInlineTable unary logical operator (as the child of SubqueryAlias for tableAlias)

expression can be as follows:

tableAlias can be specified explicitly or defaults to colN for every column (starting from 1 for N).

visitInsertIntoTable

#insertIntoTable labeled alternative

InsertIntoTable (indirectly)

A 3-element tuple with a TableIdentifier, optional partition keys and the exists flag disabled

Note
insertIntoTable is part of insertInto that is in turn used only as a helper labeled alternative in singleInsertQuery and multiInsertQueryBody rules.

visitInsertOverwriteTable

#insertOverwriteTable labeled alternative

InsertIntoTable (indirectly)

A 3-element tuple with a TableIdentifier, optional partition keys and the exists flag

In a way, visitInsertOverwriteTable is simply a more general version of the visitInsertIntoTable with the exists flag on or off per IF NOT EXISTS used or not. The main difference is that dynamic partitions are used with no IF NOT EXISTS.

Note
insertOverwriteTable is part of insertInto that is in turn used only as a helper labeled alternative in singleInsertQuery and multiInsertQueryBody rules.

visitMultiInsertQuery

multiInsertQueryBody

A logical operator with a InsertIntoTable (and UnresolvedRelation leaf operator)

visitNamedExpression

namedExpression

  • Alias (for a single alias)

  • MultiAlias (for a parenthesis enclosed alias list

  • a bare Expression

visitNamedQuery

SubqueryAlias

visitQuerySpecification

querySpecification

OneRowRelation or LogicalPlan

Note

visitQuerySpecification creates a OneRowRelation for a SELECT without a FROM clause.

visitPredicated

predicated

Expression

visitRelation

relation

LogicalPlan for a FROM clause.

visitRowConstructor

visitSingleDataType

singleDataType

DataType

visitSingleExpression

singleExpression

Expression

Takes the named expression and relays to visitNamedExpression

visitSingleInsertQuery

#singleInsertQuery labeled alternative

A logical operator with a InsertIntoTable

visitSortItem

sortItem

SortOrder unevaluable unary expression

DESC)? (NULLS nullOrder=(LAST

FIRST))?
;

ORDER BY order+=sortItem (‘,’ order+=sortItem)*
SORT BY sort+=sortItem (‘,’ sort+=sortItem)*

(ORDER

SORT) BY sortItem (‘,’ sortItem)*)?
`

visitSingleStatement

singleStatement

LogicalPlan from a single statement

Note
A single statement can be quite involved.

visitSingleTableIdentifier

singleTableIdentifier

TableIdentifier

visitStar

#star labeled alternative

UnresolvedStar

visitStruct

visitSubqueryExpression

#subqueryExpression labeled alternative

ScalarSubquery

visitWindowDef

windowDef labeled alternative

Table 2. AstBuilder’s Parsing Handlers
Parsing Handler LogicalPlan Added

withAggregation

  • GroupingSets for GROUP BY … GROUPING SETS (…)

  • Aggregate for GROUP BY … (WITH CUBE | WITH ROLLUP)?

withGenerate

Generate with a UnresolvedGenerator and join flag turned on for LATERAL VIEW (in SELECT or FROM clauses).

withHints

Hint for /*+ hint */ in SELECT queries.

Tip
Note + (plus) between /* and */

hint is of the format name or name (param1, param2, …​).

withInsertInto

withJoinRelations

Join for a FROM clause and relation alone.

The following join types are supported:

  • INNER (default)

  • CROSS

  • LEFT (with optional OUTER)

  • LEFT SEMI

  • RIGHT (with optional OUTER)

  • FULL (with optional OUTER)

  • ANTI (optionally prefixed with LEFT)

The following join criteria are supported:

  • ON booleanExpression

  • USING '(' identifier (',' identifier)* ')'

Joins can be NATURAL (with no join criteria).

withQueryResultClauses

withQuerySpecification

Adds a query specification to a logical operator.

For transform SELECT (with TRANSFORM, MAP or REDUCE qualifiers), withQuerySpecification does…​FIXME


For regular SELECT (no TRANSFORM, MAP or REDUCE qualifiers), withQuerySpecification adds (in that order):

  1. Generate unary logical operators (if used in the parsed SQL text)

  2. Filter unary logical plan (if used in the parsed SQL text)

  3. GroupingSets or Aggregate unary logical operators (if used in the parsed SQL text)

  4. Project and/or Filter unary logical operators

  5. WithWindowDefinition unary logical operator (if used in the parsed SQL text)

  6. UnresolvedHint unary logical operator (if used in the parsed SQL text)

withPredicate

  • NOT? IN '(' query ')' gives an In predicate expression with a ListQuery subquery expression

  • NOT? IN '(' expression (',' expression)* ')' gives an In predicate expression

withWindows

WithWindowDefinition for window aggregates (given WINDOW definitions).

Used for withQueryResultClauses and withQuerySpecification with windows definition.

Tip
Consult windows, namedWindow, windowSpec, windowFrame, and frameBound (with windowRef and windowDef) ANTLR parsing rules for Spark SQL in SqlBase.g4.
Note
AstBuilder belongs to org.apache.spark.sql.catalyst.parser package.

Function Examples

The examples are handled by visitFunctionCall.

aliasPlan Internal Method

aliasPlan…​FIXME

Note
aliasPlan is used when…​FIXME

mayApplyAliasPlan Internal Method

mayApplyAliasPlan…​FIXME

Note
mayApplyAliasPlan is used when…​FIXME

AbstractSqlParser — Base SQL Parsing Infrastructure

admin阅读(2361)

AbstractSqlParser — Base SQL Parsing Infrastructure

AbstractSqlParser is the base of ParserInterfaces that use an AstBuilder to parse SQL statements and convert them to Spark SQL entities, i.e. DataType, StructType, Expression, LogicalPlan and TableIdentifier.

AbstractSqlParser is the foundation of the SQL parsing infrastructure.

Table 1. AbstractSqlParser Contract
Method Description

astBuilder

AstBuilder for parsing SQL statements.

Used in all the parse methods, i.e. parseDataType, parseExpression, parsePlan, parseTableIdentifier, and parseTableSchema.

Table 2. AbstractSqlParser’s Implementations
Name Description

SparkSqlParser

The default SQL parser in SessionState available as sqlParser property.

CatalystSqlParser

Creates a DataType or a StructType (schema) from their canonical string representation.

Setting Up SqlBaseLexer and SqlBaseParser for Parsing — parse Method

parse sets up a proper ANTLR parsing infrastructure with SqlBaseLexer and SqlBaseParser (which are the ANTLR-specific classes of Spark SQL that are auto-generated at build time from the SqlBase.g4 grammar).

Tip
Review the definition of ANTLR grammar for Spark SQL in sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4.

Internally, parse first prints out the following INFO message to the logs:

Tip
Enable INFO logging level for the custom AbstractSqlParser, i.e. SparkSqlParser or CatalystSqlParser, to see the above INFO message.

parse then creates and sets up a SqlBaseLexer and SqlBaseParser that in turn passes the latter on to the input toResult function where the parsing finally happens.

Note
parse uses SLL prediction mode for parsing first before falling back to LL mode.

In case of parsing errors, parse reports a ParseException.

Note
parse is used in all the parse methods, i.e. parseDataType, parseExpression, parsePlan, parseTableIdentifier, and parseTableSchema.

parseDataType Method

Note
parseDataType is part of ParserInterface Contract to…​FIXME.

parseDataType…​FIXME

parseExpression Method

Note
parseExpression is part of ParserInterface Contract to…​FIXME.

parseExpression…​FIXME

parseFunctionIdentifier Method

Note
parseFunctionIdentifier is part of ParserInterface Contract to…​FIXME.

parseFunctionIdentifier…​FIXME

parseTableIdentifier Method

Note
parseTableIdentifier is part of ParserInterface Contract to…​FIXME.

parseTableIdentifier…​FIXME

parseTableSchema Method

Note
parseTableSchema is part of ParserInterface Contract to…​FIXME.

parseTableSchema…​FIXME

parsePlan Method

Note
parsePlan is part of ParserInterface Contract to…​FIXME.

parsePlan creates a LogicalPlan for a given SQL textual statement.

Internally, parsePlan builds a SqlBaseParser and requests AstBuilder to parse a single SQL statement.

If a SQL statement could not be parsed, parsePlan throws a ParseException:

SQL Parsing Framework

admin阅读(951)

SQL Parsing Framework

SQL Parser Framework in Spark SQL uses ANTLR to translate a SQL text to a data type, Expression, TableIdentifier or LogicalPlan.

The contract of the SQL Parser Framework is described by ParserInterface contract. The contract is then abstracted in AbstractSqlParser class so subclasses have to provide custom AstBuilder only.

There are two concrete implementations of AbstractSqlParser:

  1. SparkSqlParser that is the default parser of the SQL expressions into Spark’s types.

  2. CatalystSqlParser that is used to parse data types from their canonical string representation.

关注公众号:spark技术分享

联系我们联系我们