Now Spark Structured Streaming is quite different when compared to Spark Streaming DStreams.
So to better understand Structured Streaming,we need to put our Spark Streaming knowledge in the back burner and try to compare them only when required otherwise most of the time its better to think with a fresh mind towards Structured Streaming.
Structured Streaming provides a similar interface as that of a Batch Spark DataFrame/DataSet.Few of the source where spark can read from is File/Directories , Kafka, TwitterUtils,rate,socket.
val stream1 = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 5432)
.load()
val ds1 = stream1.as[String].flatMap(x => x.split("\\s+"))
val cnts = ds1.groupBy($"value").count()
//println(cnts.explain(true))
val wt = cnts.writeStream
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
//println(wt.explain(true))
wt.awaitTermination()
Trigger.ProcessingTime
Trigger.ProcessingTime ==>
The query will be executed with micro-batches mode, where micro-batches will be kicked off at the
user-specified intervals.
1)If the previous micro-batch completes within the interval, then the engine will wait until the
interval is over before kicking off the next micro-batch.
2)If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary
is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
3)If no new data is available, then no micro-batch will be kicked off.
A Simple example : Streaming2_Socket
Now the thing to note here is : Spark treats the input data as a table and new records are being added to them regularly.
Important to thing to note here is the "ouputMode" .It can be complete,update ,append.
By complete we mean that once the stream starts ,spark is gonna keep information of the output result and keep updating the info of the result table and every time it writes ,it will write the whole result table ie :
If i run the above code and then in terminal say as :
Vinyass-MacBook-Pro:~ vinyasshetty$ nc -lk 5432
vinyas shetty
hello vinyas
Then spark will give the result as :
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hello| 1|
|vinyas| 2|
|shetty| 1|
+------+-----+
Now spark will not be trigerred again since it sees no change in data.
Also when it sees a chnage in data ,spark has a internal clock which keeps running and for every
10 seconds it will see if there was change and if yes,then it will trigger again :
Now i will say as :
vinyas shetty <SAY ENTER>
spark will give result as :
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hello| 1|
|vinyas| 3|
|shetty| 2|
+------+-----+
Two things to notice ,once count is getting updated and everytime you see the "complete" result being print.So this is the effect of complete mode. With that said ,spark does NOT support non-aggregation operation in "complete" mode because it will be massive data that spark needs to keep note of.
//Below will FAIL
val stream1 = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 5432)
.load()
val ds1 = stream1.as[String].flatMap(x => x.split("\\s+"))
val cnts = ds1.map($"value") //Just a Dummy map without any aggregation
val wt = cnts.writeStream
.format("console")
.outputMode("complete") //If we chnage this to append it will work
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
wt.awaitTermination()
"append" mode feature:
Ex : Streaming1_Append
val stream1 = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 5432)
.load()
val ds1 = stream1.as[String].flatMap(x => x.split("\\s+"))
val cnts = ds1.select($"value")
//println(cnts.explain(true))
val wt = cnts.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
wt.awaitTermination()
Input :
Vinyass-MacBook-Pro:~ vinyasshetty$ nc -lk 5432
vinyas shetty
Spark will give result as :
-------------------------------------------
Batch: 0
-------------------------------------------
+------+
| value|
+------+
|vinyas|
|shetty|
+------+
Then i will add :
hello vinyas
-------------------------------------------
Batch: 1
-------------------------------------------
+------+
| value|
+------+
| hello|
|vinyas|
+------+
Now as you see spark will output only the latest record which reads from the input .And it does NOT
maintain history because of this we cannot do a regular aggregation(we have a workaround)
like we did in complete mode
One more Example of append and aggregation(Streaming1_1_Append) :
spark.sparkContext.setLogLevel("ERROR")
val stream1 = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 5432)
.load()
val ds1 = stream1.as[String].flatMap(x => x.split("\\s+")).groupByKey(x=> x)
//Now the difference here is groupByKey returns a KeyValueGroupedDataset
//and NOT a RelationalGroupedDataset. If we have a dataset which has RelationalGroupedDataset ,then only
//update and complete mode is supported.
val cnts = ds1.mapGroups((k,i)=> (k,i.size))
//println(cnts.explain(true))
val wt = cnts.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
wt.awaitTermination()
Input :
Vinyass-MacBook-Pro:~ vinyasshetty$ nc -lk 5432
vinyas shetty
Spark will give :
-------------------------------------------
Batch: 0
-------------------------------------------
+------+---+
| _1| _2|
+------+---+
|vinyas| 1|
|shetty| 1|
+------+---+
Then i feed input as :
hello vinyas <say enter>
then spark gives output as :
-------------------------------------------
Batch: 1
-------------------------------------------
+------+---+
| _1| _2|
+------+---+
| hello| 1|
|vinyas| 1|
+------+---+
Now as you see the two important points ,count of vinyas did NOT increase in Batch 1 and also Batch 1 did
NOT have the shetty row of Batch 0.
If i feed input now as :
hello vinyas vinyas
-------------------------------------------
Batch: 2
-------------------------------------------
+------+---+
| _1| _2|
+------+---+
| hello| 1|
|vinyas| 2|
+------+---+
// This current processing data had vinyas twice.
"update" mode : this is a fusion of complete and append . ie it will behave same as "append" if there is no aggregation.But if there is a aggregation then it will be same as complete,but will ouput only the current result.Will be clear with below example:
Streaming1_1_Update
spark.sparkContext.setLogLevel("ERROR")
val stream1 = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 5432)
.load()
val ds1 = stream1.as[String].flatMap(x => x.split("\\s+")).groupBy($"value")
val cnts = ds1.count()
val wt = cnts.writeStream
.format("console")
.outputMode("update")
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
wt.awaitTermination()
1st input :
Vinyass-MacBook-Pro:~ vinyasshetty$ nc -lk 5432
vinyas shetty
Spark will output as :
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|vinyas| 1|
|shetty| 1|
+------+-----+
Then i will give input as :
hello vinyas < say enter>
Spark will output as :
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hello| 1|
|vinyas| 2|
+------+-----+
Now as you see its a fusion ie count of vinyas is updated ,but the result in batch 1 will not have old data ie
shetty record.
Now if i say:
shetty hey <enter>
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hey| 1|
|shetty| 2|
+------+-----+
Now shetty gets updated and the output will have records from the current data only.
Ex : Streaming_File1
Below i will show you a example of how the input source can be directory
val spark = SparkSession.builder()
.config("spark.sql.streaming.schemaInference","true")//Now not a good thing ,always good to give explicit schema
.appName("file1").master("local[*]").getOrCreate()
val fstream = spark.readStream
.format("csv")
.option("maxFilesPerTrigger",1) //Reads one file at a time in the directory.
.option("header","true")
//.option("inferSchema","true") //This does NOT WORK
.option("latestFirst","true")
.load("/Users/vinyasshetty/data_spark_structured_streaming/data/people/")
println(fstream.printSchema())
val fwstream = fstream.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime(10 seconds))
.start()
fwstream.awaitTermination()
Input :
Vinyass-MacBook-Pro:~ vinyasshetty$ ls -l /Users/vinyasshetty/data_spark_structured_streaming/data/people/
total 24
-rw-r--r-- 1 vinyasshetty staff 105 Mar 16 18:02 1.csv
-rw-r--r-- 1 vinyasshetty staff 75 Mar 16 18:02 2.csv
-rw-r--r-- 1 vinyasshetty staff 76 Mar 24 07:25 3.csv
Vinyass-MacBook-Pro:~ vinyasshetty$ cat /Users/vinyasshetty/data_spark_structured_streaming/data/people/*csv
name,city,country,age
Amy,Paris,FR,30
Bob,New York,US,22
Charlie,London,UK,35
Denise,San Francisco,US,22
name,city,country,age
Edward,London,UK,53
Francis,,FR,22
George,London,UK,
name,city,country,age
Vinyas,London,UK,53
Namratha,,FR,22
Varsha,London,UK,
Vinyass-MacBook-Pro:~ vinyasshetty$
Result :
root
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
|-- age: string (nullable = true)
()
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+------+-------+----+
| name| city|country| age|
+--------+------+-------+----+
| Vinyas|London| UK| 53|
|Namratha| null| FR| 22|
| Varsha|London| UK|null|
+--------+------+-------+----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------+-------+---+
| name| city|country|age|
+-------+-------------+-------+---+
| Amy| Paris| FR| 30|
| Bob| New York| US| 22|
|Charlie| London| UK| 35|
| Denise|San Francisco| US| 22|
+-------+-------------+-------+---+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+------+-------+----+
| name| city|country| age|
+-------+------+-------+----+
| Edward|London| UK| 53|
|Francis| null| FR| 22|
| George|London| UK|null|
+-------+------+-------+----+
Now as you see since we asked spark to infer and it inferred all to be string.So its better to explicitly give a schema like how we do in batch.
For Testing purpose we can use "rate" ,this will help in generating quick datasets: Streaming3_Rate
val rstream = spark.readStream
.format("rate")
.option("rowsPerSecond",100)
.option("numPartitions",10)
.load() // This is
println(rstream.isStreaming)
//println(rstream.count()) //We cannot do a count??
val cstream = rstream.groupBy().count()
val wstream = cstream.writeStream
.outputMode("complete")
.format("console")
.option("truncate","false")
.trigger(Trigger.ProcessingTime(5 seconds)) // So each batch will have 500 records
.start()
Try this(Use append mode and remove grouping if you want to view the actual data) :
rstream will be :
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)