TextSocketSource
TextSocketSource
is a streaming source that reads lines from a socket at the host
and port
(defined by parameters).
It uses lines internal in-memory buffer to keep all of the lines that were read from a socket forever.
Caution
|
This source is not for production use due to design contraints, e.g. infinite in-memory collection of lines read and no fault recovery. It is designed only for tutorials and debugging. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import org.apache.spark.sql.SparkSession val spark: SparkSession = SparkSession.builder.getOrCreate() // Connect to localhost:9999 // You can use "nc -lk 9999" for demos val textSocket = spark. readStream. format("socket"). option("host", "localhost"). option("port", 9999). load import org.apache.spark.sql.Dataset val lines: Dataset[String] = textSocket.as[String].map(_.toUpperCase) val query = lines.writeStream.format("console").start // Start typing the lines in nc session // They will appear UPPERCASE in the terminal ------------------------------------------- Batch: 0 ------------------------------------------- +---------+ | value| +---------+ |UPPERCASE| +---------+ scala> query.explain == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#21] +- *MapElements <function1>, obj#20: java.lang.String +- *DeserializeToObject value#43.toString, obj#19: java.lang.String +- LocalTableScan [value#43] scala> query.stop |
lines Internal Buffer
1 2 3 4 5 |
lines: ArrayBuffer[(String, Timestamp)] |
lines
is the internal buffer of all the lines TextSocketSource
read from the socket.
Maximum Available Offset (getOffset method)
Note
|
getOffset is a part of the Streaming Source Contract.
|
TextSocketSource
‘s offset can either be none or LongOffset
of the number of lines in the internal lines buffer.
Schema (schema method)
TextSocketSource
supports two schemas:
-
A single
value
field of String type. -
value
field ofStringType
type andtimestamp
field of TimestampType type of formatyyyy-MM-dd HH:mm:ss
.
Tip
|
Refer to sourceSchema for TextSocketSourceProvider .
|
Creating TextSocketSource Instance
1 2 3 4 5 6 7 8 9 |
TextSocketSource( host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) |
When TextSocketSource
is created (see TextSocketSourceProvider), it gets 4 parameters passed in:
-
host
-
port
-
includeTimestamp flag
Caution
|
It appears that the source did not get “renewed” to use SparkSession instead. |
It opens a socket at given host
and port
parameters and reads a buffering character-input stream using the default charset and the default-sized input buffer (of 8192
bytes) line by line.
Caution
|
FIXME Review Java’s Charset.defaultCharset()
|
It starts a readThread
daemon thread (called TextSocketSource(host, port)
) to read lines from the socket. The lines are added to the internal lines buffer.