StreamingDeduplicateExec Unary Physical Operator for Streaming Deduplication
StreamingDeduplicateExec
is a unary physical operator (i.e. UnaryExecNode
) that writes state to StateStore with support for streaming watermark.
StreamingDeduplicateExec
is created exclusively when StreamingDeduplicationStrategy
plans Deduplicate unary logical operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
val uniqueValues = spark. readStream. format("rate"). load. dropDuplicates("value") // <-- creates Deduplicate logical operator scala> println(uniqueValues.queryExecution.logical.numberedTreeString) 00 Deduplicate [value#214L], true 01 +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4785f176,rate,List(),None,List(),None,Map(),None), rate, [timestamp#213, value#214L] scala> uniqueValues.explain == Physical Plan == StreamingDeduplicate [value#214L], StatefulOperatorStateInfo(<unknown>,5a65879c-67bc-4e77-b417-6100db6a52a2,0,0), 0 +- Exchange hashpartitioning(value#214L, 200) +- StreamingRelation rate, [timestamp#213, value#214L] // Start the query and hence StreamingDeduplicateExec import scala.concurrent.duration._ import org.apache.spark.sql.streaming.{OutputMode, Trigger} val sq = uniqueValues. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Update). start // sorting not supported for non-aggregate queries // and so values are unsorted ------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ |timestamp|value| +---------+-----+ +---------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-07-25 22:12:03.018|0 | |2017-07-25 22:12:08.018|5 | |2017-07-25 22:12:04.018|1 | |2017-07-25 22:12:06.018|3 | |2017-07-25 22:12:05.018|2 | |2017-07-25 22:12:07.018|4 | +-----------------------+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----------------------+-----+ |timestamp |value| +-----------------------+-----+ |2017-07-25 22:12:10.018|7 | |2017-07-25 22:12:09.018|6 | |2017-07-25 22:12:12.018|9 | |2017-07-25 22:12:13.018|10 | |2017-07-25 22:12:15.018|12 | |2017-07-25 22:12:11.018|8 | |2017-07-25 22:12:14.018|11 | |2017-07-25 22:12:16.018|13 | |2017-07-25 22:12:17.018|14 | |2017-07-25 22:12:18.018|15 | +-----------------------+-----+ // Eventually... sq.stop |
StreamingDeduplicateExec
uses the performance metrics of StateStoreWriter.
The output schema of StreamingDeduplicateExec
is exactly the child‘s output schema.
The output partitioning of StreamingDeduplicateExec
is exactly the child‘s output partitioning.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
/** // Start spark-shell with debugging and Kafka support SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" \ ./bin/spark-shell \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT */ // Reading val topic1 = spark. readStream. format("kafka"). option("subscribe", "topic1"). option("kafka.bootstrap.servers", "localhost:9092"). option("startingoffsets", "earliest"). load // Processing with deduplication // Don't use watermark // The following won't work due to https://issues.apache.org/jira/browse/SPARK-21546 /** val records = topic1. withColumn("eventtime", 'timestamp). // <-- just to put the right name given the purpose withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // <-- use the renamed eventtime column dropDuplicates("value"). // dropDuplicates will use watermark // only when eventTime column exists // include the watermark column => internal design leak? select('key cast "string", 'value cast "string", 'eventtime). as[(String, String, java.sql.Timestamp)] */ val records = topic1. dropDuplicates("value"). select('key cast "string", 'value cast "string"). as[(String, String)] scala> records.explain == Physical Plan == *Project [cast(key#0 as string) AS key#249, cast(value#1 as string) AS value#250] +- StreamingDeduplicate [value#1], StatefulOperatorStateInfo(<unknown>,68198b93-6184-49ae-8098-006c32cc6192,0,0), 0 +- Exchange hashpartitioning(value#1, 200) +- *Project [key#0, value#1] +- StreamingRelation kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] // Writing import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = records. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). queryName("from-kafka-topic1-to-console"). outputMode(OutputMode.Update). start // Eventually... sq.stop |
Tip
|
Enable Add the following line to
Refer to Logging. |
Executing Physical Operator — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow ).
|
Internally, doExecute
initializes metrics.
doExecute
executes child physical operator and creates a StateStoreRDD with storeUpdateFunction
that:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).
-
Filters out rows from
Iterator[InternalRow]
that matchwatermarkPredicateForData
(when defined and timeoutConf isEventTimeTimeout
) -
For every row (as
InternalRow
)-
Extracts the key from the row (using the unsafe projection above)
-
Gets the saved state in
StateStore
for the key -
(when there was a state for the key in the row) Filters out (aka drops) the row
-
(when there was no state for the key in the row) Stores a new (and empty) state for the key and increments numUpdatedStateRows and numOutputRows metrics.
-
-
In the end,
storeUpdateFunction
creates aCompletionIterator
that executes a completion function (akacompletionFunction
) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows).The completion function does the following:
-
Updates allUpdatesTimeMs metric (that is the total time to execute
storeUpdateFunction
) -
Updates allRemovalsTimeMs metric with the time taken to remove keys older than the watermark from the StateStore
-
Updates commitTimeMs metric with the time taken to commit the changes to the StateStore
-
Creating StreamingDeduplicateExec Instance
StreamingDeduplicateExec
takes the following when created:
-
Duplicate keys (as used in dropDuplicates operator)
-
Optional StatefulOperatorStateInfo