Column
Column
represents a column in a Dataset that holds a Catalyst Expression that produces a value per row.
Note
|
A Column is a value generator for every row in a Dataset .
|
A special column *
references all columns in a Dataset
.
With the implicits converstions imported, you can create “free” column references using Scala’s symbols.
1 2 3 4 5 6 7 8 9 10 |
val spark: SparkSession = ... import spark.implicits._ import org.apache.spark.sql.Column scala> val nameCol: Column = 'name nameCol: org.apache.spark.sql.Column = name |
Note
|
“Free” column references are Column s with no association to a Dataset .
|
You can also create free column references from $
-prefixed strings.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// Note that $ alone creates a ColumnName scala> val idCol = $"id" idCol: org.apache.spark.sql.ColumnName = id import org.apache.spark.sql.Column // The target type triggers the implicit conversion to Column scala> val idCol: Column = $"id" idCol: org.apache.spark.sql.Column = id |
1 2 3 4 5 6 7 8 9 10 11 |
import org.apache.spark.sql.functions._ scala> val nameCol = col("name") nameCol: org.apache.spark.sql.Column = name scala> val cityCol = column("city") cityCol: org.apache.spark.sql.Column = city |
Finally, you can create a bound Column
using the Dataset
the column is supposed to be part of using Dataset.apply factory method or Dataset.col operator.
Note
|
You can use bound Column references only with the Dataset s they have been created from.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> val textCol = dataset.col("text") textCol: org.apache.spark.sql.Column = text scala> val idCol = dataset.apply("id") idCol: org.apache.spark.sql.Column = id scala> val idCol = dataset("id") idCol: org.apache.spark.sql.Column = id |
You can reference nested columns using .
(dot).
Operator | Description |
---|---|
Specifying type hint about the expected return value of the column |
|
Note
|
|
Tip
|
Read about typed column references in TypedColumn Expressions. |
Specifying Type Hint — as
Operator
1 2 3 4 5 |
as[U : Encoder]: TypedColumn[Any, U] |
as
creates a TypedColumn (that gives a type hint about the expected return value of the column).
1 2 3 4 5 6 |
scala> $"id".as[Int] res1: org.apache.spark.sql.TypedColumn[Any,Int] = id |
Adding Column to Dataset — withColumn
Method
1 2 3 4 5 |
withColumn(colName: String, col: Column): DataFrame |
withColumn
method returns a new DataFrame
with the new column col
with colName
name added.
Note
|
withColumn can replace an existing colName column.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
scala> val df = Seq((1, "jeden"), (2, "dwa")).toDF("number", "polish") df: org.apache.spark.sql.DataFrame = [number: int, polish: string] scala> df.show +------+------+ |number|polish| +------+------+ | 1| jeden| | 2| dwa| +------+------+ scala> df.withColumn("polish", lit(1)).show +------+------+ |number|polish| +------+------+ | 1| 1| | 2| 1| +------+------+ |
You can add new columns do a Dataset
using withColumn method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
val spark: SparkSession = ... val dataset = spark.range(5) // Add a new column called "group" scala> dataset.withColumn("group", 'id % 2).show +---+-----+ | id|group| +---+-----+ | 0| 0| | 1| 1| | 2| 0| | 3| 1| | 4| 0| +---+-----+ |
Creating Column Instance For Catalyst Expression — apply
Factory Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
val spark: SparkSession = ... case class Word(id: Long, text: String) val dataset = Seq(Word(0, "hello"), Word(1, "spark")).toDS scala> val idCol = dataset.apply("id") idCol: org.apache.spark.sql.Column = id // or using Scala's magic a little bit // the following is equivalent to the above explicit apply call scala> val idCol = dataset("id") idCol: org.apache.spark.sql.Column = id |
like
Operator
Caution
|
FIXME |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
scala> df("id") like "0" res0: org.apache.spark.sql.Column = id LIKE 0 scala> df.filter('id like "0").show +---+-----+ | id| text| +---+-----+ | 0|hello| +---+-----+ |
Symbols As Column Names
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
scala> val df = Seq((0, "hello"), (1, "world")).toDF("id", "text") df: org.apache.spark.sql.DataFrame = [id: int, text: string] scala> df.select('id) res0: org.apache.spark.sql.DataFrame = [id: int] scala> df.select('id).show +---+ | id| +---+ | 0| | 1| +---+ |
Defining Windowing Column (Analytic Clause) — over
Operator
1 2 3 4 5 6 |
over(): Column over(window: WindowSpec): Column |
over
creates a windowing column (aka analytic clause) that allows to execute a aggregate function over a window (i.e. a group of records that are in some relation to the current record).
Tip
|
Read up on windowed aggregation in Spark SQL in Window Aggregate Functions. |
1 2 3 4 5 6 7 8 9 10 11 12 |
scala> val overUnspecifiedFrame = $"someColumn".over() overUnspecifiedFrame: org.apache.spark.sql.Column = someColumn OVER (UnspecifiedFrame) import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.expressions.WindowSpec val spec: WindowSpec = Window.rangeBetween(Window.unboundedPreceding, Window.currentRow) scala> val overRange = $"someColumn" over spec overRange: org.apache.spark.sql.Column = someColumn OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) |
cast
Operator
cast
method casts a column to a data type. It makes for type-safe maps with Row objects of the proper type (not Any
).
1 2 3 4 5 6 |
cast(to: String): Column cast(to: DataType): Column |
cast
uses CatalystSqlParser to parse the data type from its canonical string representation.
cast Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
scala> val df = Seq((0f, "hello")).toDF("label", "text") df: org.apache.spark.sql.DataFrame = [label: float, text: string] scala> df.printSchema root |-- label: float (nullable = false) |-- text: string (nullable = true) // without cast import org.apache.spark.sql.Row scala> df.select("label").map { case Row(label) => label.getClass.getName }.show(false) +---------------+ |value | +---------------+ |java.lang.Float| +---------------+ // with cast import org.apache.spark.sql.types.DoubleType scala> df.select(col("label").cast(DoubleType)).map { case Row(label) => label.getClass.getName }.show(false) +----------------+ |value | +----------------+ |java.lang.Double| +----------------+ |