Demo: StateStoreSaveExec with Complete Output Mode
The following example code shows the behaviour of StateStoreSaveExec in Complete output mode.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
// START: Only for easier debugging // The state is then only for one partition // which should make monitoring it easier import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1) scala> spark.sessionState.conf.numShufflePartitions res1: Int = 1 // END: Only for easier debugging // Read datasets from a Kafka topic // ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT // Streaming aggregation using groupBy operator is required to have StateStoreSaveExec operator val valuesPerGroup = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). load. withColumn("tokens", split('value, ",")). withColumn("group", 'tokens(0)). withColumn("value", 'tokens(1) cast "int"). select("group", "value"). groupBy($"group"). agg(collect_list("value") as "values"). orderBy($"group".asc) // valuesPerGroup is a streaming Dataset with just one source // so it knows nothing about output mode or watermark yet // That's why StatefulOperatorStateInfo is generic // and no batch-specific values are printed out // That will be available after the first streaming batch // Use sq.explain to know the runtime-specific values scala> valuesPerGroup.explain == Physical Plan == *Sort [group#25 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(group#25 ASC NULLS FIRST, 1) +- ObjectHashAggregate(keys=[group#25], functions=[collect_list(value#36, 0, 0)]) +- Exchange hashpartitioning(group#25, 1) +- StateStoreSave [group#25], StatefulOperatorStateInfo(<unknown>,899f0fd1-b202-45cd-9ebd-09101ca90fa8,0,0), Append, 0 +- ObjectHashAggregate(keys=[group#25], functions=[merge_collect_list(value#36, 0, 0)]) +- Exchange hashpartitioning(group#25, 1) +- StateStoreRestore [group#25], StatefulOperatorStateInfo(<unknown>,899f0fd1-b202-45cd-9ebd-09101ca90fa8,0,0) +- ObjectHashAggregate(keys=[group#25], functions=[merge_collect_list(value#36, 0, 0)]) +- Exchange hashpartitioning(group#25, 1) +- ObjectHashAggregate(keys=[group#25], functions=[partial_collect_list(value#36, 0, 0)]) +- *Project [split(cast(value#1 as string), ,)[0] AS group#25, cast(split(cast(value#1 as string), ,)[1] as int) AS value#36] +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] // Start the query and hence StateStoreSaveExec // Use Complete output mode import scala.concurrent.duration._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} val sq = valuesPerGroup. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Complete). start ------------------------------------------- Batch: 0 ------------------------------------------- +-----+------+ |group|values| +-----+------+ +-----+------+ // there's only 1 stateful operator and hence 0 for the index in stateOperators scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 0, "numRowsUpdated" : 0, "memoryUsedBytes" : 60 } // publish 1 new key-value pair in a single streaming batch // 0,1 ------------------------------------------- Batch: 1 ------------------------------------------- +-----+------+ |group|values| +-----+------+ |0 |[1] | +-----+------+ // it's Complete output mode so numRowsTotal is the number of keys in the state store // no keys were available earlier (it's just started!) and so numRowsUpdated is 0 scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 1, "numRowsUpdated" : 0, "memoryUsedBytes" : 324 } // publish new key and old key in a single streaming batch // new keys // 1,1 // updates to already-stored keys // 0,2 ------------------------------------------- Batch: 2 ------------------------------------------- +-----+------+ |group|values| +-----+------+ |0 |[2, 1]| |1 |[1] | +-----+------+ // it's Complete output mode so numRowsTotal is the number of keys in the state store // no keys were available earlier and so numRowsUpdated is...0?! // Think it's a BUG as it should've been 1 (for the row 0,2) // 8/30 Sent out a question to the Spark user mailing list scala> println(sq.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 2, "numRowsUpdated" : 0, "memoryUsedBytes" : 572 } // In the end... sq.stop |