1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
package streaming.core import org.apache.spark.sql.SparkSession import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.Eventually import streaming.util.Logging import org.apache.spark.sql.types._ import org.apache.spark.sql.{streaming, functions => F, _} /** * Created by sunbiaobiao on 2019/3/27. */ class SchemaTest extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with Eventually with Logging { override def beforeAll {} test("spark sql schema") { val spark = SparkSession.builder().getOrCreate() import spark.implicits._ import spark._ val addressesSchema = new StructType().add($"city".string).add($"state".string).add($"zip".string) val schema = new StructType().add($"firstName".string).add($"lastName".string).add($"email".string).add($"addresses".array(addressesSchema)).add("sons", ArrayType(MapType(StringType, new StructType().add($"firstName".string))))add("keys", MapType(StringType, StringType)) println(schema.prettyJson) import org.apache.spark.sql.types.DataType //val schemaAsJson = schema.prettyJson // 打印出来的json 如下 val schemaAsJson = """ |{ | "type" : "struct", | "fields" : [ { | "name" : "firstName", | "type" : "string", | "nullable" : true, | "metadata" : { } | }, { | "name" : "lastName", | "type" : "string", | "nullable" : true, | "metadata" : { } | }, { | "name" : "email", | "type" : "string", | "nullable" : true, | "metadata" : { } | }, { | "name" : "addresses", | "type" : { | "type" : "array", | "elementType" : { | "type" : "struct", | "fields" : [ { | "name" : "city", | "type" : "string", | "nullable" : true, | "metadata" : { } | }, { | "name" : "state", | "type" : "string", | "nullable" : true, | "metadata" : { } | }, { | "name" : "zip", | "type" : "string", | "nullable" : true, | "metadata" : { } | } ] | }, | "containsNull" : true | }, | "nullable" : true, | "metadata" : { } | }, { | "name" : "sons", | "type" : { | "type" : "array", | "elementType" : { | "type" : "map", | "keyType" : "string", | "valueType" : { | "type" : "struct", | "fields" : [ { | "name" : "firstName", | "type" : "string", | "nullable" : true, | "metadata" : { } | } ] | }, | "valueContainsNull" : true | }, | "containsNull" : true | }, | "nullable" : true, | "metadata" : { } | }, { | "name" : "keys", | "type" : { | "type" : "map", | "keyType" : "string", | "valueType" : "string", | "valueContainsNull" : true | }, | "nullable" : true, | "metadata" : { } | } ] |} """.stripMargin val dt = DataType.fromJson(schemaAsJson) val rawJsons = Seq(""" { "firstName" : "Jacek", "lastName" : "Laskowski", "email" : "jacek@japila.pl", "addresses" : [ { "city" : "Warsaw", "state" : "N/A", "zip" : "02-791" } ] } """).toDF("rawjson") val people = rawJsons .select(F.from_json($"rawjson", dt, Map.empty[String, String]) as "json") .select("json.*") // <-- flatten the struct field .withColumn("address", F.explode($"addresses")) // <-- explode the array field .drop("addresses") // <-- no longer needed .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct field } } |
使用 json定义spark sql schema 代码例子
未经允许不得转载:spark技术分享 » 使用 json定义spark sql schema 代码例子
相关推荐
-      实践|图解AQE的使用
-      Spark 3.0 AQE 专治各种数据倾斜
-      实战例子|自定义一个窗口函数来计算网站的会话数
-      spark window函数使用案例
-      Spark持续流处理和微批处理的对比
-      案例|使用 spark Pivot 处理复杂的数据统计需求
-      学习 | Spark 2.4 原生支持内置支持avro, spark read avro
-      写在阿里Blink正式开源之际