RateStreamSource
RateStreamSource is a streaming source that generates consecutive numbers with timestamp that can be useful for testing and PoCs.
RateStreamSource is created for rate format (that is registered by RateSourceProvider).
|
1 2 3 4 5 6 7 8 9 |
val rates = spark .readStream .format("rate") // <-- use RateStreamSource .option("rowsPerSecond", 1) .load |
RateStreamSource uses a predefined schema that cannot be changed.
|
1 2 3 4 5 6 7 8 9 |
val schema = rates.schema scala> println(schema.treeString) root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) |
| Name | Type |
|---|---|
|
|
|
|
|
|
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Getting Maximum Available Offsets — getOffset Method
|
1 2 3 4 5 |
getOffset: Option[Offset] |
|
Note
|
getOffset is a part of the Source Contract.
|
|
Caution
|
FIXME |
Generating DataFrame for Streaming Batch — getBatch Method
|
1 2 3 4 5 |
getBatch(start: Option[Offset], end: Offset): DataFrame |
|
Note
|
getBatch is a part of Source Contract.
|
Internally, getBatch calculates the seconds to start from and end at (from the input start and end offsets) or assumes 0.
getBatch then calculates the values to generate for the start and end seconds.
You should see the following DEBUG message in the logs:
|
1 2 3 4 5 |
DEBUG RateStreamSource: startSeconds: [startSeconds], endSeconds: [endSeconds], rangeStart: [rangeStart], rangeEnd: [rangeEnd] |
If the start and end ranges are equal, getBatch creates an empty DataFrame (with the schema) and returns.
Otherwise, when the ranges are different, getBatch creates a DataFrame using SparkContext.range operator (for the start and end ranges and numPartitions partitions).
Creating RateStreamSource Instance
RateStreamSource takes the following when created:
RateStreamSource initializes the internal registries and counters.
spark技术分享