关注 spark技术分享,
撸spark源码 玩spark最佳实践

Row

admin阅读(1729)

Row

Row is a generic row object with an ordered collection of fields that can be accessed by an ordinal / an index (aka generic access by ordinal), a name (aka native primitive access) or using Scala’s pattern matching.

Note
Row is also called Catalyst Row.

Row may have an optional schema.

The traits of Row:

  • length or sizeRow knows the number of elements (columns).

  • schemaRow knows the schema

Row belongs to org.apache.spark.sql.Row package.

Creating Row — apply Factory Method

Caution
FIXME

Field Access by Index — apply and get methods

Fields of a Row instance can be accessed by index (starting from 0) using apply or get.

Note
Generic access by ordinal (using apply or get) returns a value of type Any.

Get Field As Type — getAs method

You can query for fields with their proper types using getAs with an index

Note

FIXME

Schema

A Row instance can have a schema defined.

Note
Unless you are instantiating Row yourself (using Row Object), a Row has always a schema.
Note
It is RowEncoder to take care of assigning a schema to a Row when toDF on a Dataset or when instantiating DataFrame through DataFrameReader.

Row Object

Row companion object offers factory methods to create Row instances from a collection of elements (apply), a sequence of elements (fromSeq) and tuples (fromTuple).

Row object can merge Row instances.

It can also return an empty Row instance.

Pattern Matching on Row

Row can be used in pattern matching (since Row Object comes with unapplySeq).

DataFrame — Dataset of Rows with RowEncoder

admin阅读(1190)

DataFrame — Dataset of Rows with RowEncoder

Spark SQL introduces a tabular functional data abstraction called DataFrame. It is designed to ease developing Spark applications for processing large amount of structured tabular data on Spark infrastructure.

DataFrame is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets that you can specify a schema for.

DataFrame is a collection of rows with a schema that is the result of executing a structured query (once it will have been executed).

DataFrame uses the immutable, in-memory, resilient, distributed and parallel capabilities of RDD, and applies a structure called schema to the data.

Note

In Spark 2.0.0 DataFrame is a mere type alias for Dataset[Row].

DataFrame is a distributed collection of tabular data organized into rows and named columns. It is conceptually equivalent to a table in a relational database with operations to project (select), filter, intersect, join, group, sort, join, aggregate, or convert to a RDD (consult DataFrame API)

Spark SQL borrowed the concept of DataFrame from pandas’ DataFrame and made it immutable, parallel (one machine, perhaps with many processors and cores) and distributed (many machines, perhaps with many processors and cores).

Note
Hey, big data consultants, time to help teams migrate the code from pandas’ DataFrame into Spark’s DataFrames (at least to PySpark’s DataFrame) and offer services to set up large clusters!

DataFrames in Spark SQL strongly rely on the features of RDD – it’s basically a RDD exposed as structured DataFrame by appropriate operations to handle very big data from the day one. So, petabytes of data should not scare you (unless you’re an administrator to create such clustered Spark environment – contact me when you feel alone with the task).

You can create DataFrames by loading data from structured files (JSON, Parquet, CSV), RDDs, tables in Hive, or external databases (JDBC). You can also create DataFrames from scratch and build upon them (as in the above example). See DataFrame API. You can read any format given you have appropriate Spark SQL extension of DataFrameReader to format the dataset appropriately.

Caution
FIXME Diagram of reading data from sources to create DataFrame

You can execute queries over DataFrames using two approaches:

  • the good ol’ SQL – helps migrating from “SQL databases” world into the world of DataFrame in Spark SQL

  • Query DSL – an API that helps ensuring proper syntax at compile time.

DataFrame also allows you to do the following tasks:

DataFrames use the Catalyst query optimizer to produce efficient queries (and so they are supposed to be faster than corresponding RDD-based queries).

Note
Your DataFrames can also be type-safe and moreover further improve their performance through specialized encoders that can significantly cut serialization and deserialization times.

You can enforce types on generic rows and hence bring type safety (at compile time) by encoding rows into type-safe Dataset object. As of Spark 2.0 it is a preferred way of developing Spark applications.

Features of DataFrame

A DataFrame is a collection of “generic” Row instances (as RDD[Row]) and a schema.

Note
Regardless of how you create a DataFrame, it will always be a pair of RDD[Row] and StructType.

SQLContext, spark, and Spark shell

You use org.apache.spark.sql.SQLContext to build DataFrames and execute SQL queries.

The quickest and easiest way to work with Spark SQL is to use Spark shell and spark object.

As you may have noticed, spark in Spark shell is actually a org.apache.spark.sql.hive.HiveContext that integrates the Spark SQL execution engine with data stored in Apache Hive.

The Apache Hive™ data warehouse software facilitates querying and managing large datasets residing in distributed storage.

Creating DataFrames from Scratch

Use Spark shell as described in Spark shell.

Using toDF

After you import spark.implicits._ (which is done for you by Spark shell) you may apply toDF method to convert objects to DataFrames.

Creating DataFrame using Case Classes in Scala

This method assumes the data comes from a Scala case class that will describe the schema.

Custom DataFrame Creation using createDataFrame

SQLContext offers a family of createDataFrame operations.

Loading data from structured files

Creating DataFrame from CSV file

Let’s start with an example in which schema inference relies on a custom case class in Scala.

Creating DataFrame from CSV files using spark-csv module

You’re going to use spark-csv module to load data from a CSV data source that handles proper parsing and loading.

Note
Support for CSV data sources is available by default in Spark 2.0.0. No need for an external module.

Start the Spark shell using --packages option as follows:

Reading Data from External Data Sources (read method)

You can create DataFrames by loading data from structured files (JSON, Parquet, CSV), RDDs, tables in Hive, or external databases (JDBC) using SQLContext.read method.

read returns a DataFrameReader instance.

Among the supported structured data (file) formats are (consult Specifying Data Format (format method) for DataFrameReader):

  • JSON

  • parquet

  • JDBC

  • ORC

  • Tables in Hive and any JDBC-compliant database

  • libsvm

Querying DataFrame

Note
Spark SQL offers a Pandas-like Query DSL.

Using Query DSL

You can select specific columns using select method.

Note
This variant (in which you use stringified column names) can only select existing columns, i.e. you cannot create new ones using select expressions.

In the following example you query for the top 5 of the most active bidders.

Note the tiny $ and desc together with the column name to sort the rows by.

Using SQL

Register a DataFrame as a named temporary table to run SQL.

  1. Register a temporary table so SQL queries make sense

You can execute a SQL query on a DataFrame using sql operation, but before the query is executed it is optimized by Catalyst query optimizer. You can print the physical plan for a DataFrame using the explain operation.

Filtering

Handling data in Avro format

Use custom serializer using spark-avro.

Run Spark shell with --packages com.databricks:spark-avro_2.11:2.0.0 (see 2.0.0 artifact is not in any public maven repo why --repositories is required).

And then…​

See org.apache.spark.sql.SaveMode (and perhaps org.apache.spark.sql.SaveMode from Scala’s perspective).

Dataset — Structured Query with Data Encoder

admin阅读(1400)

Dataset — Structured Query with Data Encoder

Dataset is a strongly-typed data structure in Spark SQL that represents a structured query.

Note
A structured query can be written using SQL or Dataset API.

The following figure shows the relationship between different entities of Spark SQL that all together give the Dataset data structure.

spark sql Dataset.png
Figure 1. Dataset’s Internals

It is therefore fair to say that Dataset consists of the following three elements:

  1. QueryExecution (with the parsed unanalyzed LogicalPlan of a structured query)

  2. Encoder (of the type of the records for fast serialization and deserialization to and from InternalRow)

  3. SparkSession

When created, Dataset takes such a 3-element tuple with a SparkSession, a QueryExecution and an Encoder.

Dataset is created when:

Datasets are lazy and structured query operators and expressions are only triggered when an action is invoked.

The Dataset API offers declarative and type-safe operators that makes for an improved experience for data processing (comparing to DataFrames that were a set of index- or column name-based Rows).

Note

Dataset was first introduced in Apache Spark 1.6.0 as an experimental feature, and has since turned itself into a fully supported API.

As of Spark 2.0.0, DataFrame – the flagship data abstraction of previous versions of Spark SQL – is currently a mere type alias for Dataset[Row]:

Dataset offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to DataFrame makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.

It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using DataFrame, regular SQL queries or even RDDs).

Using Dataset objects turns DataFrames of Row instances into a DataFrames of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.

If however a LogicalPlan is used to create a Dataset, the logical plan is first executed (using the current SessionState in the SparkSession) that yields the QueryExecution plan.

A Dataset is Queryable and Serializable, i.e. can be saved to a persistent storage.

Note
SparkSession and QueryExecution are transient attributes of a Dataset and therefore do not participate in Dataset serialization. The only firmly-tied feature of a Dataset is the Encoder.

You can request the “untyped” view of a Dataset or access the RDD that is generated after executing the query. It is supposed to give you a more pleasant experience while transitioning from the legacy RDD-based or DataFrame-based APIs you may have used in the earlier versions of Spark SQL or encourage migrating from Spark Core’s RDD API to Spark SQL’s Dataset API.

The default storage level for Datasets is MEMORY_AND_DISK because recomputing the in-memory columnar representation of the underlying table is expensive. You can however persist a Dataset.

Note
Spark 2.0 has introduced a new query model called Structured Streaming for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded as well as streaming and unbounded data sets with a single unified API for different execution models.

A Dataset is local if it was created from local collections using SparkSession.emptyDataset or SparkSession.createDataset methods and their derivatives like toDF. If so, the queries on the Dataset can be optimized and run locally, i.e. without using Spark executors.

Note
Dataset makes sure that the underlying QueryExecution is analyzed and checked.
Table 1. Dataset’s Properties
Name Description

boundEnc

ExpressionEncoder

Used when…​FIXME

deserializer

Deserializer expression to convert internal rows to objects of type T

Created lazily by requesting the ExpressionEncoder to resolveAndBind

Used when:

exprEnc

Implicit ExpressionEncoder

Used when…​FIXME

logicalPlan

Analyzed logical plan with all logical commands executed and turned into a LocalRelation.

When initialized, logicalPlan requests the QueryExecution for analyzed logical plan. If the plan is a logical command or a union thereof, logicalPlan executes the QueryExecution (using executeCollect).

planWithBarrier

rdd

(lazily-created) RDD of JVM objects of type T (as converted from rows in Dataset in the internal binary row format).

Note
rdd gives RDD with the extra execution step to convert rows from their internal binary row format to JVM objects that will impact the JVM memory as the objects are inside JVM (while were outside before). You should not use rdd directly.

Internally, rdd first creates a new logical plan that deserializes the Dataset’s logical plan.

rdd then requests SessionState to execute the logical plan to get the corresponding RDD of binary rows.

Note
rdd uses SparkSession to access SessionState.

rdd then requests the Dataset’s ExpressionEncoder for the data type of the rows (using deserializer expression) and maps over them (per partition) to create records of the expected type T.

Note
rdd is at the “boundary” between the internal binary row format and the JVM type of the dataset. Avoid the extra deserialization step to lower JVM memory requirements of your Spark application.

sqlContext

Lazily-created SQLContext

Used when…​FIXME

Getting Input Files of Relations (in Structured Query) — inputFiles Method

inputFiles requests QueryExecution for optimized logical plan and collects the following logical operators:

inputFiles then requests the logical operators for their underlying files:

resolve Internal Method

Caution
FIXME

Creating Dataset Instance

Dataset takes the following when created:

Note
You can also create a Dataset using LogicalPlan that is immediately executed using SessionState.

Internally, Dataset requests QueryExecution to analyze itself.

Dataset initializes the internal registries and counters.

Is Dataset Local? — isLocal Method

isLocal flag is enabled (i.e. true) when operators like collect or take could be run locally, i.e. without using executors.

Internally, isLocal checks whether the logical query plan of a Dataset is LocalRelation.

Is Dataset Streaming? — isStreaming method

isStreaming is enabled (i.e. true) when the logical plan is streaming.

Internally, isStreaming takes the Dataset’s logical plan and gives whether the plan is streaming or not.

Queryable

Caution
FIXME

withNewRDDExecutionId Internal Method

withNewRDDExecutionId executes the input body action under new execution id.

Caution
FIXME What’s the difference between withNewRDDExecutionId and withNewExecutionId?
Note
withNewRDDExecutionId is used when Dataset.foreach and Dataset.foreachPartition actions are used.

Creating DataFrame (For Logical Query Plan and SparkSession) — ofRows Internal Factory Method

Note
ofRows is part of Dataset Scala object that is marked as a private[sql] and so can only be accessed from code in org.apache.spark.sql package.

ofRows returns DataFrame (which is the type alias for Dataset[Row]). ofRows uses RowEncoder to convert the schema (based on the input logicalPlan logical plan).

Internally, ofRows prepares the input logicalPlan for execution and creates a Dataset[Row] with the current SparkSession, the QueryExecution and RowEncoder.

Note

ofRows is used when:

Tracking Multi-Job Structured Query Execution (PySpark) — withNewExecutionId Internal Method

withNewExecutionId executes the input body action under new execution id.

Note
withNewExecutionId sets a unique execution id so that all Spark jobs belong to the Dataset action execution.
Note

withNewExecutionId is used exclusively when Dataset is executing Python-based actions (i.e. collectToPython, collectAsArrowToPython and toPythonIterator) that are not of much interest in this gitbook.

Feel free to contact me at jacek@japila.pl if you think I should re-consider my decision.

Executing Action Under New Execution ID — withAction Internal Method

withAction requests QueryExecution for the optimized physical query plan and resets the metrics of every physical operator (in the physical plan).

withAction requests SQLExecution to execute the input action with the executable physical plan (tracked under a new execution id).

In the end, withAction notifies ExecutionListenerManager that the name action has finished successfully or with an exception.

Note
withAction uses SparkSession to access ExecutionListenerManager.
Note

withAction is used when Dataset is requested for the following:

Creating Dataset Instance (For LogicalPlan and SparkSession) — apply Internal Factory Method

Note
apply is part of Dataset Scala object that is marked as a private[sql] and so can only be accessed from code in org.apache.spark.sql package.

apply…​FIXME

Note

apply is used when:

Collecting All Rows From Spark Plan — collectFromPlan Internal Method

collectFromPlan…​FIXME

Note
collectFromPlan is used for Dataset.head, Dataset.collect and Dataset.collectAsList operators.

selectUntyped Internal Method

selectUntyped…​FIXME

Note
selectUntyped is used exclusively when Dataset.select typed transformation is used.

Helper Method for Typed Transformations — withTypedPlan Internal Method

withTypedPlan…​FIXME

Note
withTypedPlan is annotated with Scala’s @inline annotation that requests the Scala compiler to try especially hard to inline it.
Note
withTypedPlan is used in the Dataset typed transformations, i.e. withWatermark, joinWith, hint, as, filter, limit, sample, dropDuplicates, filter, map, repartition, repartitionByRange, coalesce and sort with sortWithinPartitions (through the sortInternal internal method).

Helper Method for Set-Based Typed Transformations — withSetOperator Internal Method

withSetOperator…​FIXME

Note
withSetOperator is annotated with Scala’s @inline annotation that requests the Scala compiler to try especially hard to inline it.
Note
withSetOperator is used in the Dataset typed transformations, i.e. union, unionByName, intersect and except.

sortInternal Internal Method

sortInternal creates a Dataset with Sort unary logical operator (and the logicalPlan as the child logical plan).

Internally, sortInternal firstly builds ordering expressions for the given sortExprs columns, i.e. takes the sortExprs columns and makes sure that they are SortOrder expressions already (and leaves them untouched) or wraps them into SortOrder expressions with Ascending sort direction.

In the end, sortInternal creates a Dataset with Sort unary logical operator (with the ordering expressions, the given global flag, and the logicalPlan as the child logical plan).

Note
sortInternal is used for the sort and sortWithinPartitions typed transformations in the Dataset API (with the only change of the global flag being enabled and disabled, respectively).

Helper Method for Untyped Transformations and Basic Actions — withPlan Internal Method

withPlan simply uses ofRows internal factory method to create a DataFrame for the input LogicalPlan and the current SparkSession.

Note
withPlan is annotated with Scala’s @inline annotation that requests the Scala compiler to try especially hard to inline it.

Further Reading and Watching

SparkSessionExtensions

admin阅读(2626)

SparkSessionExtensions

SparkSessionExtensions is an interface that a Spark developer can use to extend a SparkSession with custom query execution rules and a relational entity parser.

As a Spark developer, you use Builder.withExtensions method (while building a new SparkSession) to access the session-bound SparkSessionExtensions.

Table 1. SparkSessionExtensions API
Method Description

injectCheckRule

injectOptimizerRule

Registering a custom operator optimization rule

injectParser

injectPlannerStrategy

injectPostHocResolutionRule

injectResolutionRule

SparkSessionExtensions is an integral part of SparkSession (and is indirectly required to create one).

Table 2. SparkSessionExtensions’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

optimizerRules

Collection of RuleBuilder functions (i.e. SparkSession ⇒ Rule[LogicalPlan])

Used when SparkSessionExtensions is requested to:

Associating Custom Operator Optimization Rules with SparkSession — buildOptimizerRules Method

buildOptimizerRules gives the optimizerRules logical rules that are associated with the input SparkSession.

Note
buildOptimizerRules is used exclusively when BaseSessionStateBuilder is requested for the custom operator optimization rules to add to the base Operator Optimization batch.

Registering Custom Check Analysis Rule (Builder) — injectCheckRule Method

injectCheckRule…​FIXME

Registering Custom Operator Optimization Rule (Builder) — injectOptimizerRule Method

injectOptimizerRule simply registers a custom operator optimization rule (as a RuleBuilder function) to the optimizerRules internal registry.

Registering Custom Parser (Builder) — injectParser Method

injectParser…​FIXME

Registering Custom Planner Strategy (Builder) — injectPlannerStrategy Method

injectPlannerStrategy…​FIXME

Registering Custom Post-Hoc Resolution Rule (Builder) — injectPostHocResolutionRule Method

injectPostHocResolutionRule…​FIXME

Registering Custom Resolution Rule (Builder) — injectResolutionRule Method

injectResolutionRule…​FIXME

implicits Object — Implicits Conversions

admin阅读(1479)

implicits Object — Implicits Conversions

implicits object gives implicit conversions for converting Scala objects (incl. RDDs) into a Dataset, DataFrame, Columns or supporting such conversions (through Encoders).

Table 1. implicits API
Name Description

localSeqToDatasetHolder

Creates a DatasetHolder with the input Seq[T] converted to a Dataset[T] (using SparkSession.createDataset).

Encoders

Encoders for primitive and object types in Scala and Java (aka boxed types)

StringToColumn

Converts $"name" into a Column

rddToDatasetHolder

symbolToColumn

implicits object is defined inside SparkSession and hence requires that you build a SparkSession instance first before importing implicits conversions.

Tip

In Scala REPL-based environments, e.g. spark-shell, use :imports to know what imports are in scope.

implicits object extends SQLImplicits abstract class.

DatasetHolder Scala Case Class

DatasetHolder is a Scala case class that, when created, takes a Dataset[T].

DatasetHolder is created (implicitly) when rddToDatasetHolder and localSeqToDatasetHolder implicit conversions are used.

DatasetHolder has toDS and toDF methods that simply return the Dataset[T] (it was created with) or a DataFrame (using Dataset.toDF operator), respectively.

Builder — Building SparkSession using Fluent API

admin阅读(1334)

Builder — Building SparkSession using Fluent API

Builder is the fluent API to create a SparkSession.

Table 1. Builder API
Method Description

appName

config

enableHiveSupport

Enables Hive support

getOrCreate

Gets the current SparkSession or creates a new one.

master

withExtensions

Access to the SparkSessionExtensions

Builder is available using the builder object method of a SparkSession.

Note
You can have multiple SparkSessions in a single Spark application for different data catalogs (through relational entities).
Table 2. Builder’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

extensions

SparkSessionExtensions

Used when…​FIXME

options

Used when…​FIXME

Getting Or Creating SparkSession Instance — getOrCreate Method

getOrCreate…​FIXME

Enabling Hive Support — enableHiveSupport Method

enableHiveSupport enables Hive support, i.e. running structured queries on Hive tables (and a persistent Hive metastore, support for Hive serdes and Hive user-defined functions).

Note

You do not need any existing Hive installation to use Spark’s Hive support. SparkSession context will automatically create metastore_db in the current directory of a Spark application and a directory configured by spark.sql.warehouse.dir.

Refer to SharedState.

Internally, enableHiveSupport makes sure that the Hive classes are on CLASSPATH, i.e. Spark SQL’s org.apache.hadoop.hive.conf.HiveConf, and sets spark.sql.catalogImplementation internal configuration property to hive.

withExtensions Method

withExtensions simply executes the input f function with the SparkSessionExtensions.

appName Method

appName…​FIXME

config Method

config…​FIXME

master Method

master…​FIXME

SparkSession — The Entry Point to Spark SQL

admin阅读(1394)

SparkSession — The Entry Point to Spark SQL

SparkSession is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.

As a Spark developer, you create a SparkSession using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).

Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala Seq), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods, ExecutionListenerManager, UDFRegistration), executing a SQL query, loading a table and the last but not least accessing DataFrameReader interface to load a dataset of the format of your choice (to some extent).

Note

spark object in spark-shell (the instance of SparkSession that is auto-created) has Hive support enabled.

In order to disable the pre-configured Hive support in the spark object, use spark.sql.catalogImplementation internal configuration property with in-memory value (that uses InMemoryCatalog external catalog instead).

You can have as many SparkSessions as you want in a single Spark application. The common use case is to keep relational entities separate logically in catalogs per SparkSession.

In the end, you stop a SparkSession using SparkSession.stop method.

Table 1. SparkSession API (Object and Instance Methods)
Method Description

active

(New in 2.4.0)

builder

Object method to create a Builder to get the current SparkSession instance or create a new one.

catalog

Access to the current metadata catalog of relational entities, e.g. database(s), tables, functions, table columns, and temporary views.

clearActiveSession

Object method

clearDefaultSession

Object method

close

conf

Access to the current runtime configuration

createDataFrame

createDataset

emptyDataFrame

emptyDataset

experimental

Access to the current ExperimentalMethods

getActiveSession

Object method

getDefaultSession

Object method

implicits

listenerManager

Access to the current ExecutionListenerManager

newSession

Creates a new SparkSession

range

Creates a Dataset[java.lang.Long]

read

Access to the current DataFrameReader to load data from external data sources

sessionState

Access to the current SessionState

Internally, sessionState clones the optional parent SessionState (if given when creating the SparkSession) or creates a new SessionState using BaseSessionStateBuilder as defined by spark.sql.catalogImplementation configuration property:

setActiveSession

Object method

setDefaultSession

Object method

sharedState

Access to the current SharedState

sparkContext

Access to the underlying SparkContext

sql

“Executes” a SQL query

sqlContext

Access to the underlying SQLContext

stop

Stops the associated SparkContext

table

Loads data from a table

time

Executes a code block and prints out (to standard output) the time taken to execute it

udf

Access to the current UDFRegistration

version

Returns the version of Apache Spark

Note
baseRelationToDataFrame acts as a mechanism to plug BaseRelation object hierarchy in into LogicalPlan object hierarchy that SparkSession uses to bridge them.

Creating SparkSession Using Builder Pattern — builder Object Method

builder creates a new Builder that you use to build a fully-configured SparkSession using a fluent API.

Tip
Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.

Accessing Version of Spark — version Method

version returns the version of Apache Spark in use.

Internally, version uses spark.SPARK_VERSION value that is the version property in spark-version-info.properties properties file on CLASSPATH.

Creating Empty Dataset (Given Encoder) — emptyDataset Operator

emptyDataset creates an empty Dataset (assuming that future records being of type T).

emptyDataset creates a LocalRelation logical query plan.

Creating Dataset from Local Collections or RDDs — createDataset methods

createDataset is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T], Java’s List[T], or a distributed RDD[T].

createDataset creates a LocalRelation (for the input data collection) or LogicalRDD (for the input RDD[T]) logical operators.

Tip

You may want to consider implicits object and toDS method instead.

Internally, createDataset first looks up the implicit expression encoder in scope to access the AttributeReferences (of the schema).

Note
Only unresolved expression encoders are currently supported.

The expression encoder is then used to map elements (of the input Seq[T]) into a collection of InternalRows. With the references and rows, createDataset returns a Dataset with a LocalRelation logical query plan.

Creating Dataset With Single Long Column — range Operator

range family of methods create a Dataset of Long numbers.

Note
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.

Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.

Creating Empty DataFrame —  emptyDataFrame method

emptyDataFrame creates an empty DataFrame (with no rows and columns).

It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

Creating DataFrames from Local Collections or RDDs — createDataFrame Method

createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.

Caution
FIXME

Executing SQL Queries (aka SQL Mode) — sql Method

sql executes the sqlText SQL statement and creates a DataFrame.

Note

sql is imported in spark-shell so you can execute SQL statements as if sql were a part of the environment.

Internally, sql requests the current ParserInterface to execute a SQL query that gives a LogicalPlan.

Note
sql uses SessionState to access the current ParserInterface.

sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.

Tip

spark-sql is the main SQL environment in Spark to work with pure SQL statements (where you do not have to use Scala to execute them).

Accessing UDFRegistration — udf Attribute

udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based queries.

Internally, it is simply an alias for SessionState.udfRegistration.

Loading Data From Table — table Method

  1. Parses tableName to a TableIdentifier and calls the other table

table creates a DataFrame (wrapper) from the input tableName table (but only if available in the session catalog).

Accessing Metastore — catalog Attribute

catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and views).

Tip
All methods in Catalog return Datasets.

Internally, catalog creates a CatalogImpl (that uses the current SparkSession).

Accessing DataFrameReader — read method

read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

Getting Runtime Configuration — conf Attribute

conf returns the current RuntimeConfig.

Internally, conf creates a RuntimeConfig (when requested the very first time and cached afterwards) with the SQLConf of the SessionState.

readStream method

readStream returns a new DataStreamReader.

streams Attribute

streams attribute gives access to StreamingQueryManager (through SessionState).

experimentalMethods Attribute

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

Note
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.

Creating SparkSession Instance — newSession method

newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).

Stopping SparkSession — stop Method

stop stops the SparkSession, i.e. stops the underlying SparkContext.

Create DataFrame from BaseRelation — baseRelationToDataFrame Method

Internally, baseRelationToDataFrame creates a DataFrame from the input BaseRelation wrapped inside LogicalRelation.

Note
LogicalRelation is an logical plan adapter for BaseRelation (so BaseRelation can be part of a logical plan).
Note

baseRelationToDataFrame is used when:

Creating SessionState Instance — instantiateSessionState Internal Method

instantiateSessionState finds the className that is then used to create and build a BaseSessionStateBuilder.

instantiateSessionState may report an IllegalArgumentException while instantiating the class of a SessionState:

Note
instantiateSessionState is used exclusively when SparkSession is requested for SessionState per spark.sql.catalogImplementation configuration property (and one is not available yet).

sessionStateClassName Internal Method

sessionStateClassName gives the name of the class of the SessionState per spark.sql.catalogImplementation, i.e.

Note
sessionStateClassName is used exclusively when SparkSession is requested for the SessionState (and one is not available yet).

Creating DataFrame From RDD Of Internal Binary Rows and Schema — internalCreateDataFrame Internal Method

internalCreateDataFrame creates a DataFrame with a LogicalRDD.

Note

internalCreateDataFrame is used when:

Creating SparkSession Instance

SparkSession takes the following when created:

clearActiveSession Object Method

clearActiveSession…​FIXME

clearDefaultSession Object Method

clearDefaultSession…​FIXME

Accessing ExperimentalMethods — experimental Method

experimental…​FIXME

getActiveSession Object Method

getActiveSession…​FIXME

getDefaultSession Object Method

getDefaultSession…​FIXME

Accessing ExecutionListenerManager — listenerManager Method

listenerManager…​FIXME

Accessing SessionState — sessionState Lazy Attribute

sessionState…​FIXME

setActiveSession Object Method

setActiveSession…​FIXME

setDefaultSession Object Method

setDefaultSession…​FIXME

Accessing SharedState — sharedState Method

sharedState…​FIXME

Measuring Duration of Executing Code Block — time Method

time…​FIXME

Fundamentals of Spark SQL Application Development

admin阅读(1481)

Fundamentals of Spark SQL Application Development

Development of a Spark SQL application requires the following steps:

  1. Setting up Development Environment (IntelliJ IDEA, Scala and sbt)

  2. Specifying Library Dependencies

  3. Creating SparkSession

  4. Loading Data from Data Sources

  5. Processing Data Using Dataset API

  6. Saving Data to Persistent Storage

  7. Deploying Spark Application to Cluster (using spark-submit)

Dataset API vs SQL

admin阅读(1063)

Dataset API vs SQL

Spark SQL supports two “modes” to write structured queries: Dataset API and SQL.

It turns out that some structured queries can be expressed easier using Dataset API, but there are some that are only possible in SQL. In other words, you may find mixing Dataset API and SQL modes challenging yet rewarding.

You could at some point consider writing structured queries using Catalyst data structures directly hoping to avoid the differences and focus on what is supported in Spark SQL, but that could quickly become unwieldy for maintenance (i.e. finding Spark SQL developers who could be comfortable with it as well as being fairly low-level and therefore possibly too dependent on a specific Spark SQL version).

This section describes the differences between Spark SQL features to develop Spark applications using Dataset API and SQL mode.

  1. RuntimeReplaceable Expressions are only available using SQL mode by means of SQL functions like nvl, nvl2, ifnull, nullif, etc.

  2. Column.isin and SQL IN predicate with a subquery (and In Predicate Expression)

Datasets vs DataFrames vs RDDs

admin阅读(1097)

Datasets vs DataFrames vs RDDs

Many may have been asking yourself why they should be using Datasets rather than the foundation of all Spark – RDDs using case classes.

This document collects advantages of Dataset vs RDD[CaseClass] to answer the question Dan has asked on twitter:

“In #Spark, what is the advantage of a DataSet over an RDD[CaseClass]?”

Saving to or Writing from Data Sources

With Dataset API, loading data from a data source or saving it to one is as simple as using SparkSession.read or Dataset.write methods, appropriately.

Accessing Fields / Columns

You select columns in a datasets without worrying about the positions of the columns.

In RDD, you have to do an additional hop over a case class and access fields by name.

关注公众号:spark技术分享

联系我们联系我们