dropDuplicates Operator — Streaming Deduplication
1 2 3 4 5 6 7 |
dropDuplicates(): Dataset[T] dropDuplicates(colNames: Seq[String]): Dataset[T] dropDuplicates(col1: String, cols: String*): Dataset[T] |
dropDuplicates
operator…FIXME
Note
|
For a streaming Dataset, dropDuplicates will keep all data across triggers as intermediate state to drop duplicates rows. You can use withWatermark operator to limit how late the duplicate data can be and system will accordingly limit the state. In addition, too late data older than watermark will be dropped to avoid any possibility of duplicates.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
scala> spark.version res0: String = 2.3.0-SNAPSHOT // Start a streaming query // Using old-fashioned MemoryStream (with the deprecated SQLContext) import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.SQLContext implicit val sqlContext: SQLContext = spark.sqlContext val source = MemoryStream[(Int, Int)] val ids = source.toDS.toDF("time", "id"). withColumn("time", $"time" cast "timestamp"). // <-- convert time column from Int to Timestamp dropDuplicates("id"). withColumn("time", $"time" cast "long") // <-- convert time column back from Timestamp to Int // Conversions are only for display purposes // Internally we need timestamps for watermark to work // Displaying timestamps could be too much for such a simple task scala> println(ids.queryExecution.analyzed.numberedTreeString) 00 Project [cast(time#10 as bigint) AS time#15L, id#6] 01 +- Deduplicate [id#6], true 02 +- Project [cast(time#5 as timestamp) AS time#10, id#6] 03 +- Project [_1#2 AS time#5, _2#3 AS id#6] 04 +- StreamingExecutionRelation MemoryStream[_1#2,_2#3], [_1#2, _2#3] import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val q = ids. writeStream. format("memory"). queryName("dups"). outputMode(OutputMode.Append). trigger(Trigger.ProcessingTime(30.seconds)). option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts start // Publish duplicate records source.addData(1 -> 1) source.addData(2 -> 1) source.addData(3 -> 1) q.processAllAvailable() // Check out how dropDuplicates removes duplicates // --> per single streaming batch (easy) scala> spark.table("dups").show +----+---+ |time| id| +----+---+ | 1| 1| +----+---+ source.addData(4 -> 1) source.addData(5 -> 2) // --> across streaming batches (harder) scala> spark.table("dups").show +----+---+ |time| id| +----+---+ | 1| 1| | 5| 2| +----+---+ // Check out the internal state scala> println(q.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 2, "numRowsUpdated" : 1, "memoryUsedBytes" : 17751 } // You could use web UI's SQL tab instead // Use Details for Query source.addData(6 -> 2) scala> spark.table("dups").show +----+---+ |time| id| +----+---+ | 1| 1| | 5| 2| +----+---+ // Check out the internal state scala> println(q.lastProgress.stateOperators(0).prettyJson) { "numRowsTotal" : 2, "numRowsUpdated" : 0, "memoryUsedBytes" : 17751 } // Restart the streaming query q.stop val q = ids. writeStream. format("memory"). queryName("dups"). outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only trigger(Trigger.ProcessingTime(30.seconds)). option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts start // Doh! MemorySink is fine, but Complete is only available with a streaming aggregation // Answer it if you know why --> https://stackoverflow.com/q/45756997/1305344 // It's a high time to work on https://issues.apache.org/jira/browse/SPARK-21667 // to understand the low-level details (and the reason, it seems) // Disabling operation checks and starting over // ./bin/spark-shell -c spark.sql.streaming.unsupportedOperationCheck=false // it works now --> no exception! scala> spark.table("dups").show +----+---+ |time| id| +----+---+ +----+---+ source.addData(0 -> 1) // wait till the batch is triggered scala> spark.table("dups").show +----+---+ |time| id| +----+---+ | 0| 1| +----+---+ source.addData(1 -> 1) source.addData(2 -> 1) // wait till the batch is triggered scala> spark.table("dups").show +----+---+ |time| id| +----+---+ +----+---+ // What?! No rows?! It doesn't look as if it worked fine :( // Use groupBy to pass the requirement of having streaming aggregation for Complete output mode val counts = ids.groupBy("id").agg(first($"time") as "first_time") scala> counts.explain == Physical Plan == *HashAggregate(keys=[id#246], functions=[first(time#255L, false)]) +- StateStoreSave [id#246], StatefulOperatorStateInfo(<unknown>,3585583b-42d7-4547-8d62-255581c48275,0,0), Append, 0 +- *HashAggregate(keys=[id#246], functions=[merge_first(time#255L, false)]) +- StateStoreRestore [id#246], StatefulOperatorStateInfo(<unknown>,3585583b-42d7-4547-8d62-255581c48275,0,0) +- *HashAggregate(keys=[id#246], functions=[merge_first(time#255L, false)]) +- *HashAggregate(keys=[id#246], functions=[partial_first(time#255L, false)]) +- *Project [cast(time#250 as bigint) AS time#255L, id#246] +- StreamingDeduplicate [id#246], StatefulOperatorStateInfo(<unknown>,3585583b-42d7-4547-8d62-255581c48275,1,0), 0 +- Exchange hashpartitioning(id#246, 200) +- *Project [cast(_1#242 as timestamp) AS time#250, _2#243 AS id#246] +- StreamingRelation MemoryStream[_1#242,_2#243], [_1#242, _2#243] val q = counts. writeStream. format("memory"). queryName("dups"). outputMode(OutputMode.Complete). // <-- memory sink supports checkpointing for Complete output mode only trigger(Trigger.ProcessingTime(30.seconds)). option("checkpointLocation", "checkpoint-dir"). // <-- use checkpointing to save state between restarts start source.addData(0 -> 1) source.addData(1 -> 1) // wait till the batch is triggered scala> spark.table("dups").show +---+----------+ | id|first_time| +---+----------+ | 1| 0| +---+----------+ // Publish duplicates // Check out how dropDuplicates removes duplicates // Stop the streaming query // Specify event time watermark to remove old duplicates |