Details
-
Question
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.2.0
-
None
Description
I am trying to aggregate the count of records every 10 seconds using the structured streaming for the following incoming kafka data
{ "ts2" : "2018/05/01 00:02:50.041", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:02:50", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.079", "serviceGroupId" : "123", "userId" : "avv-0", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" } { "ts2" : "2018/05/01 00:09:02.086", "serviceGroupId" : "123", "userId" : "avv-2", "stream" : "", "lastUserActivity" : "00:09:02", "lastUserActivityCount" : "0" }
With the following logic
val sdvTuneInsAgg1 = df .withWatermark("ts2", "10 seconds") .groupBy(window(col("ts2"),"10 seconds")) .agg(count("*") as "count") .as[CountMetric1] val query1 = sdvTuneInsAgg1.writeStream .format("console") .foreach(writer) .start()
and I do not see any records inside the writer. But, the only anomaly is that the current date is 2018/05/24 but the record that I am processing (ts2) has old dates. Will aggregation / count work in this scenario ?