Avro Data Source
Spark SQL supports structured queries over Avro files as well as in columns (in a DataFrame
).
Note
|
Apache Avro is a data serialization format and provides the following features:
|
Avro data source is provided by the spark-avro
external module. You should include it as a dependency in your Spark application (e.g. spark-submit --packages
or in build.sbt
).
1 2 3 4 5 |
org.apache.spark:spark-avro_2.12:2.4.0 |
The following shows how to include the spark-avro
module in a spark-shell
session.
1 2 3 4 5 |
$ ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.0 |
Name | Description | ||
---|---|---|---|
Parses an Avro-encoded binary column and converts to a Catalyst value per JSON-encoded Avro schema |
|||
Converts a column to an Avro-encoded binary column |
After the module is loaded, you should import the org.apache.spark.sql.avro
package to have the from_avro and to_avro functions available.
1 2 3 4 5 |
import org.apache.spark.sql.avro._ |
Converting Column to Avro-Encoded Binary Column — to_avro
Method
1 2 3 4 5 |
to_avro(data: Column): Column |
to_avro
creates a Column with the CatalystDataToAvro unary expression (with the Catalyst expression of the given data
column).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import org.apache.spark.sql.avro._ val q = spark.range(1).withColumn("to_avro_id", to_avro('id)) scala> q.show +---+----------+ | id|to_avro_id| +---+----------+ | 0| [00]| +---+----------+ val logicalPlan = q.queryExecution.logical scala> println(logicalPlan.numberedTreeString) 00 'Project [id#33L, catalystdatatoavro('id) AS to_avro_id#35] 01 +- Range (0, 1, step=1, splits=Some(8)) import org.apache.spark.sql.avro.CatalystDataToAvro // Let's use QueryExecution.analyzed instead // https://issues.apache.org/jira/browse/SPARK-26063 val analyzedPlan = q.queryExecution.analyzed val toAvroExpr = analyzedPlan.expressions.drop(1).head.children.head.asInstanceOf[CatalystDataToAvro] scala> println(toAvroExpr.sql) to_avro(`id`, bigint) |
Converting Avro-Encoded Column to Catalyst Value — from_avro
Method
1 2 3 4 5 |
from_avro(data: Column, jsonFormatSchema: String): Column |
from_avro
creates a Column with the AvroDataToCatalyst unary expression (with the Catalyst expression of the given data
column and the jsonFormatSchema
JSON-encoded schema).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import org.apache.spark.sql.avro._ val data = spark.range(1).withColumn("to_avro_id", to_avro('id)) // Use from_avro to decode to_avro-encoded id column val jsonFormatSchema = s""" |{ | "type": "long", | "name": "id" |} """.stripMargin val q = data.select(from_avro('to_avro_id, jsonFormatSchema) as "id_from_avro") scala> q.show +------------+ |id_from_avro| +------------+ | 0| +------------+ val logicalPlan = q.queryExecution.logical scala> println(logicalPlan.numberedTreeString) 00 'Project [avrodatatocatalyst('to_avro_id, 01 { 02 "type": "long", 03 "name": "id" 04 } 05 ) AS id_from_avro#77] 06 +- Project [id#66L, catalystdatatoavro(id#66L) AS to_avro_id#68] 07 +- Range (0, 1, step=1, splits=Some(8)) import org.apache.spark.sql.avro.AvroDataToCatalyst // Let's use QueryExecution.analyzed instead // https://issues.apache.org/jira/browse/SPARK-26063 val analyzedPlan = q.queryExecution.analyzed val fromAvroExpr = analyzedPlan.expressions.head.children.head.asInstanceOf[AvroDataToCatalyst] scala> println(fromAvroExpr.sql) from_avro(`to_avro_id`, bigint) |