关注 spark技术分享,
撸spark源码 玩spark最佳实践

DataFrame — Dataset of Rows with RowEncoder

DataFrame — Dataset of Rows with RowEncoder

Spark SQL introduces a tabular functional data abstraction called DataFrame. It is designed to ease developing Spark applications for processing large amount of structured tabular data on Spark infrastructure.

DataFrame is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets that you can specify a schema for.

DataFrame is a collection of rows with a schema that is the result of executing a structured query (once it will have been executed).

DataFrame uses the immutable, in-memory, resilient, distributed and parallel capabilities of RDD, and applies a structure called schema to the data.

Note

In Spark 2.0.0 DataFrame is a mere type alias for Dataset[Row].

DataFrame is a distributed collection of tabular data organized into rows and named columns. It is conceptually equivalent to a table in a relational database with operations to project (select), filter, intersect, join, group, sort, join, aggregate, or convert to a RDD (consult DataFrame API)

Spark SQL borrowed the concept of DataFrame from pandas’ DataFrame and made it immutable, parallel (one machine, perhaps with many processors and cores) and distributed (many machines, perhaps with many processors and cores).

Note
Hey, big data consultants, time to help teams migrate the code from pandas’ DataFrame into Spark’s DataFrames (at least to PySpark’s DataFrame) and offer services to set up large clusters!

DataFrames in Spark SQL strongly rely on the features of RDD – it’s basically a RDD exposed as structured DataFrame by appropriate operations to handle very big data from the day one. So, petabytes of data should not scare you (unless you’re an administrator to create such clustered Spark environment – contact me when you feel alone with the task).

You can create DataFrames by loading data from structured files (JSON, Parquet, CSV), RDDs, tables in Hive, or external databases (JDBC). You can also create DataFrames from scratch and build upon them (as in the above example). See DataFrame API. You can read any format given you have appropriate Spark SQL extension of DataFrameReader to format the dataset appropriately.

Caution
FIXME Diagram of reading data from sources to create DataFrame

You can execute queries over DataFrames using two approaches:

  • the good ol’ SQL – helps migrating from “SQL databases” world into the world of DataFrame in Spark SQL

  • Query DSL – an API that helps ensuring proper syntax at compile time.

DataFrame also allows you to do the following tasks:

DataFrames use the Catalyst query optimizer to produce efficient queries (and so they are supposed to be faster than corresponding RDD-based queries).

Note
Your DataFrames can also be type-safe and moreover further improve their performance through specialized encoders that can significantly cut serialization and deserialization times.

You can enforce types on generic rows and hence bring type safety (at compile time) by encoding rows into type-safe Dataset object. As of Spark 2.0 it is a preferred way of developing Spark applications.

Features of DataFrame

A DataFrame is a collection of “generic” Row instances (as RDD[Row]) and a schema.

Note
Regardless of how you create a DataFrame, it will always be a pair of RDD[Row] and StructType.

SQLContext, spark, and Spark shell

You use org.apache.spark.sql.SQLContext to build DataFrames and execute SQL queries.

The quickest and easiest way to work with Spark SQL is to use Spark shell and spark object.

As you may have noticed, spark in Spark shell is actually a org.apache.spark.sql.hive.HiveContext that integrates the Spark SQL execution engine with data stored in Apache Hive.

The Apache Hive™ data warehouse software facilitates querying and managing large datasets residing in distributed storage.

Creating DataFrames from Scratch

Use Spark shell as described in Spark shell.

Using toDF

After you import spark.implicits._ (which is done for you by Spark shell) you may apply toDF method to convert objects to DataFrames.

Creating DataFrame using Case Classes in Scala

This method assumes the data comes from a Scala case class that will describe the schema.

Custom DataFrame Creation using createDataFrame

SQLContext offers a family of createDataFrame operations.

Loading data from structured files

Creating DataFrame from CSV file

Let’s start with an example in which schema inference relies on a custom case class in Scala.

Creating DataFrame from CSV files using spark-csv module

You’re going to use spark-csv module to load data from a CSV data source that handles proper parsing and loading.

Note
Support for CSV data sources is available by default in Spark 2.0.0. No need for an external module.

Start the Spark shell using --packages option as follows:

Reading Data from External Data Sources (read method)

You can create DataFrames by loading data from structured files (JSON, Parquet, CSV), RDDs, tables in Hive, or external databases (JDBC) using SQLContext.read method.

read returns a DataFrameReader instance.

Among the supported structured data (file) formats are (consult Specifying Data Format (format method) for DataFrameReader):

  • JSON

  • parquet

  • JDBC

  • ORC

  • Tables in Hive and any JDBC-compliant database

  • libsvm

Querying DataFrame

Note
Spark SQL offers a Pandas-like Query DSL.

Using Query DSL

You can select specific columns using select method.

Note
This variant (in which you use stringified column names) can only select existing columns, i.e. you cannot create new ones using select expressions.

In the following example you query for the top 5 of the most active bidders.

Note the tiny $ and desc together with the column name to sort the rows by.

Using SQL

Register a DataFrame as a named temporary table to run SQL.

  1. Register a temporary table so SQL queries make sense

You can execute a SQL query on a DataFrame using sql operation, but before the query is executed it is optimized by Catalyst query optimizer. You can print the physical plan for a DataFrame using the explain operation.

Filtering

Handling data in Avro format

Use custom serializer using spark-avro.

Run Spark shell with --packages com.databricks:spark-avro_2.11:2.0.0 (see 2.0.0 artifact is not in any public maven repo why --repositories is required).

And then…​

See org.apache.spark.sql.SaveMode (and perhaps org.apache.spark.sql.SaveMode from Scala’s perspective).

赞(0) 打赏
未经允许不得转载:spark技术分享 » DataFrame — Dataset of Rows with RowEncoder
分享到: 更多 (0)

关注公众号:spark技术分享

联系我们联系我们

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏