Description
Here is a small test case:
test("count distinct") { val inputData = MemoryStream[(Int, Seq[Int])] val aggregated = inputData.toDF() .select($"*", explode($"_2") as 'value) .groupBy($"_1") .agg(size(collect_set($"value"))) .as[(Int, Int)] testStream(aggregated, Update)( AddData(inputData, (1, Seq(1, 2))), CheckLastBatch((1, 2)) ) }