StreamingJoinStrategy Execution Planning Strategy for Stream-Stream Equi-Joins
StreamingJoinStrategy
is an execution planning strategy (i.e. Strategy
) that IncrementalExecution uses to plan Join
binary logical operators with two streaming queries to StreamingSymmetricHashJoinExec physical operators.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
Using StreamingJoinStrategy scala> :type spark org.apache.spark.sql.SparkSession scala> :type spark.sessionState.planner org.apache.spark.sql.execution.SparkPlanner // FIXME How to use the strategy in the demo val strategy = spark.sessionState.planner.StreamingJoinStrategy val left = spark.readStream.format("rate").load val right = spark.readStream.format("rate").load val q = left.join(right, "value") scala> q.explain(true) ... == Optimized Logical Plan == Project [value#52L, timestamp#51, timestamp#55] +- Join Inner, (value#52L = value#56L) :- Filter isnotnull(value#52L) : +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@60b0015, rate, [timestamp#51, value#52L] +- Filter isnotnull(value#56L) +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@24d92ffc, rate, [timestamp#55, value#56L] == Physical Plan == *(3) Project [value#52L, timestamp#51, timestamp#55] +- StreamingSymmetricHashJoin [value#52L], [value#56L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = f254d136-d903-4b1c-9fd5-861b541848ab, opId = 0, ver = 0, numPartitions = 200], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(value#52L, 200) : +- *(1) Filter isnotnull(value#52L) : +- StreamingRelation rate, [timestamp#51, value#52L] +- ReusedExchange [timestamp#55, value#56L], Exchange hashpartitioning(value#52L, 200) |