Description
The issue can be reproduced if a redundant map is added between groupBy and sum
app.source(data.lines.toList, 1, "source"). // word => (word, count) flatMap(line => line.split("[\\s]+")).map((_, 1)). // (word, count1), (word, count2) => (word, count1 + count2) groupByKey().sum.log
The problem is with AndThen#process method, where flatMap is lazily/ never executed.
override def process(value: IN): TraversableOnce[OUT] = { first.process(value).flatMap(second.process) }