Standard Functions for Collections (Collection Functions)
Name | Description | ||||
---|---|---|---|---|---|
|
|||||
|
|||||
Creates a new row for each element in the given array or map column. If the array/map is |
|||||
Extract data from arbitrary JSON-encoded values into a StructType or ArrayType of |
|||||
|
|||||
|
|||||
|
|||||
|
|||||
Returns a reversed string or an array with reverse order of elements
|
|||||
Returns the size of the given array or map. Returns -1 if |
size
Collection Function
1 2 3 4 5 |
size(e: Column): Column |
size
returns the size of the given array or map. Returns -1 if null
.
Internally, size
creates a Column
with Size
unary expression.
1 2 3 4 5 6 7 8 |
import org.apache.spark.sql.functions.size val c = size('id) scala> println(c.expr.asCode) Size(UnresolvedAttribute(ArrayBuffer(id))) |
posexplode_outer
Collection Function
1 2 3 4 5 |
posexplode_outer(e: Column): Column |
posexplode_outer
…FIXME
explode
Collection Function
Caution
|
FIXME |
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> Seq(Array(0,1,2)).toDF("array").withColumn("num", explode('array)).show +---------+---+ | array|num| +---------+---+ |[0, 1, 2]| 0| |[0, 1, 2]| 1| |[0, 1, 2]| 2| +---------+---+ |
Note
|
explode function is an equivalent of flatMap operator for Dataset .
|
explode_outer
Collection Function
1 2 3 4 5 |
explode_outer(e: Column): Column |
explode_outer
generates a new row for each element in e
array or map column.
Note
|
Unlike explode, explode_outer generates null when the array or map is null or empty.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
val arrays = Seq((1,Seq.empty[String])).toDF("id", "array") scala> arrays.printSchema root |-- id: integer (nullable = false) |-- array: array (nullable = true) | |-- element: string (containsNull = true) scala> arrays.select(explode_outer($"array")).show +----+ | col| +----+ |null| +----+ |
Internally, explode_outer
creates a Column with GeneratorOuter and Explode Catalyst expressions.
1 2 3 4 5 6 7 8 9 |
val explodeOuter = explode_outer($"array").expr scala> println(explodeOuter.numberedTreeString) 00 generatorouter(explode('array)) 01 +- explode('array) 02 +- 'array |
Extracting Data from Arbitrary JSON-Encoded Values — from_json
Collection Function
1 2 3 4 5 6 7 8 9 |
from_json(e: Column, schema: StructType, options: Map[String, String]): Column (1) from_json(e: Column, schema: DataType, options: Map[String, String]): Column (2) from_json(e: Column, schema: StructType): Column (3) from_json(e: Column, schema: DataType): Column (4) from_json(e: Column, schema: String, options: Map[String, String]): Column (5) |
-
Calls <2> with
StructType
converted toDataType
-
(fixme)
-
Calls <1> with empty
options
-
Relays to the other
from_json
with emptyoptions
-
Uses schema as
DataType
in the JSON format or falls back toStructType
in the DDL format
from_json
parses a column with a JSON-encoded value into a StructType or ArrayType of StructType
elements with the specified schema.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
val jsons = Seq("""{ "id": 0 }""").toDF("json") import org.apache.spark.sql.types._ val schema = new StructType() .add($"id".int.copy(nullable = false)) import org.apache.spark.sql.functions.from_json scala> jsons.select(from_json($"json", schema) as "ids").show +---+ |ids| +---+ |[0]| +---+ |
Note
|
A schema can be one of the following:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
// Define the schema for JSON-encoded messages // Note that the schema is nested (on the addresses field) import org.apache.spark.sql.types._ val addressesSchema = new StructType() .add($"city".string) .add($"state".string) .add($"zip".string) val schema = new StructType() .add($"firstName".string) .add($"lastName".string) .add($"email".string) .add($"addresses".array(addressesSchema)) scala> schema.printTreeString root |-- firstName: string (nullable = true) |-- lastName: string (nullable = true) |-- email: string (nullable = true) |-- addresses: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- city: string (nullable = true) | | |-- state: string (nullable = true) | | |-- zip: string (nullable = true) // Generate the JSON-encoded schema // That's the variant of the schema that from_json accepts val schemaAsJson = schema.json // Use prettyJson to print out the JSON-encoded schema // Only for demo purposes scala> println(schema.prettyJson) { "type" : "struct", "fields" : [ { "name" : "firstName", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "lastName", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "email", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "addresses", "type" : { "type" : "array", "elementType" : { "type" : "struct", "fields" : [ { "name" : "city", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "state", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "zip", "type" : "string", "nullable" : true, "metadata" : { } } ] }, "containsNull" : true }, "nullable" : true, "metadata" : { } } ] } // Let's "validate" the JSON-encoded schema import org.apache.spark.sql.types.DataType val dt = DataType.fromJson(schemaAsJson) scala> println(dt.sql) STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>> // No exception means that the JSON-encoded schema should be fine // Use it with from_json val rawJsons = Seq(""" { "firstName" : "Jacek", "lastName" : "Laskowski", "email" : "jacek@japila.pl", "addresses" : [ { "city" : "Warsaw", "state" : "N/A", "zip" : "02-791" } ] } """).toDF("rawjson") val people = rawJsons .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json") .select("json.*") // <-- flatten the struct field .withColumn("address", explode($"addresses")) // <-- explode the array field .drop("addresses") // <-- no longer needed .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field scala> people.show +---------+---------+---------------+------+-----+------+ |firstName| lastName| email| city|state| zip| +---------+---------+---------------+------+-----+------+ | Jacek|Laskowski|jacek@japila.pl|Warsaw| N/A|02-791| +---------+---------+---------------+------+-----+------+ |
Note
|
options controls how a JSON is parsed and contains the same options as the json format.
|
Internally, from_json
creates a Column with JsonToStructs unary expression.
Note
|
from_json (creates a JsonToStructs that) uses a JSON parser in FAILFAST parsing mode that simply fails early when a corrupted/malformed record is found (and hence does not support columnNameOfCorruptRecord JSON option).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
val jsons = Seq("""{ id: 0 }""").toDF("json") import org.apache.spark.sql.types._ val schema = new StructType() .add($"id".int.copy(nullable = false)) .add($"corrupted_records".string) val opts = Map("columnNameOfCorruptRecord" -> "corrupted_records") scala> jsons.select(from_json($"json", schema, opts) as "ids").show +----+ | ids| +----+ |null| +----+ |
Note
|
from_json corresponds to SQL’s from_json .
|
array_contains
Collection Function
1 2 3 4 5 |
array_contains(column: Column, value: Any): Column |
array_contains
creates a Column
for a column
argument as an array and the value
of same type as the type of the elements of the array.
Internally, array_contains
creates a Column with a ArrayContains
expression.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// Arguments must be an array followed by a value of same type as the array elements import org.apache.spark.sql.functions.array_contains val c = array_contains(column = $"ids", value = 1) val ids = Seq(Seq(1,2,3), Seq(1), Seq(2,3)).toDF("ids") val q = ids.filter(c) scala> q.show +---------+ | ids| +---------+ |[1, 2, 3]| | [1]| +---------+ |
array_contains
corresponds to SQL’s array_contains
.
1 2 3 4 5 6 7 8 9 |
import org.apache.spark.sql.functions.array_contains val c = array_contains(column = $"ids", value = Array(1, 2)) val e = c.expr scala> println(e.sql) array_contains(`ids`, [1,2]) |
Tip
|
Use SQL’s array_contains to use values from columns for the column and value arguments.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
val codes = Seq( (Seq(1, 2, 3), 2), (Seq(1), 1), (Seq.empty[Int], 1), (Seq(2, 4, 6), 0)).toDF("codes", "cd") scala> codes.show +---------+---+ | codes| cd| +---------+---+ |[1, 2, 3]| 2| | [1]| 1| | []| 1| |[2, 4, 6]| 0| +---------+---+ val q = codes.where("array_contains(codes, cd)") scala> q.show +---------+---+ | codes| cd| +---------+---+ |[1, 2, 3]| 2| | [1]| 1| +---------+---+ // array_contains standard function with Columns does NOT work. Why?! // Asked this question on StackOverflow --> https://stackoverflow.com/q/50412939/1305344 val q = codes.where(array_contains($"codes", $"cd")) scala> q.show java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.ColumnName cd at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:77) at org.apache.spark.sql.functions$.array_contains(functions.scala:3046) ... 50 elided // Thanks Russel for this excellent "workaround" // https://stackoverflow.com/a/50413766/1305344 import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.ArrayContains val q = codes.where(new Column(ArrayContains($"codes".expr, $"cd".expr))) scala> q.show +---------+---+ | codes| cd| +---------+---+ |[1, 2, 3]| 2| | [1]| 1| +---------+---+ |