StreamingSymmetricHashJoinExec Binary Physical Operator — Stream-Stream Joins
StreamingSymmetricHashJoinExec
is a binary physical operator that represents a Join
logical operator of two streaming structured queries with equality predicates at execution time.
Note
|
Find out more in Join Logical Operator section in The Internals of Spark SQL book. |
StreamingSymmetricHashJoinExec
is created exclusively when StreamingJoinStrategy execution planning strategy is requested to plan a logical query plan with a Join
logical operator of two streaming structured queries with equality predicates (i.e. EqualTo
and EqualNullSafe
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
// Reduce the number of partitions // StreamingSymmetricHashJoin physical operator requires: // 1. The numbers of partitions on left and right are the same // 2. Uses the numbers of partitions for its own number of partitions // That makes debugging state stores easier to manage import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, 1) assert(spark.sessionState.conf.numShufflePartitions == 1) val left = spark.readStream.format("rate").load val right = spark.readStream.format("rate").load val equiStreamingJoin = left.join(right, "value") // You cannot use QueryExecution directly // Since Structured Streaming uses its own "restrictive" QueryExecution scala> equiStreamingJoin.explain == Physical Plan == *(3) Project [value#190L, timestamp#189, timestamp#193] +- StreamingSymmetricHashJoin [value#190L], [value#194L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 2decf335-c6d4-4810-b95e-abf75181006a, opId = 0, ver = 0, numPartitions = 1], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(value#190L, 1) : +- *(1) Filter isnotnull(value#190L) : +- StreamingRelation rate, [timestamp#189, value#190L] +- ReusedExchange [timestamp#193, value#194L], Exchange hashpartitioning(value#190L, 1) import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val q = equiStreamingJoin .writeStream .format("console") .option("truncate", false) .trigger(Trigger.ProcessingTime(30.seconds)) .queryName("stream-stream-join") .start scala> q.explain == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@b807853 +- *(3) Project [value#364L, timestamp#363, timestamp#361] +- StreamingSymmetricHashJoin [value#364L], [value#362L], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-376c26ce-c6ae-4572-8e04-927c7a445b46/state, runId = bdfeae3b-3732-4dfe-8d1d-22a089e60fc1, opId = 0, ver = 3, numPartitions = 1], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(value#364L, 1) : +- *(1) Filter isnotnull(value#364L) : +- *(1) Project [timestamp#363, value#364L] : +- *(1) ScanV2 rate[timestamp#363, value#364L] +- ReusedExchange [timestamp#361, value#362L], Exchange hashpartitioning(value#364L, 1) // In the end, stop the query q.stop |
StreamingSymmetricHashJoinExec
is a stateful physical operator that writes to a state store.
StreamingSymmetricHashJoinExec
uses the performance metrics of StateStoreWriter.
When executed, StreamingSymmetricHashJoinExec
…FIXME
The output schema of StreamingSymmetricHashJoinExec
is…FIXME
The output partitioning of StreamingSymmetricHashJoinExec
is…FIXME
Executing Physical Operator — doExecute
Method
1 2 3 4 5 |
doExecute(): RDD[InternalRow] |
Note
|
doExecute is a part of SparkPlan contract to produce the result of a physical operator as an RDD of internal binary rows (i.e. InternalRow ).
|
doExecute
…FIXME
Creating StreamingSymmetricHashJoinExec Instance
StreamingSymmetricHashJoinExec
takes the following to be created:
-
Optional StatefulOperatorStateInfo
StreamingSymmetricHashJoinExec
initializes the internal registries and counters.
processPartitions
Internal Method
1 2 3 4 5 6 7 |
processPartitions( leftInputIter: Iterator[InternalRow], rightInputIter: Iterator[InternalRow]): Iterator[InternalRow] |
processPartitions
…FIXME
Note
|
processPartitions is used exclusively when StreamingSymmetricHashJoinExec physical operator is requested to doExecute.
|