Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Spark has a useful API for accumulating data in a thread safe way https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2 and comes with some out-of-box useful accumulators e.g. for Longs https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator
I usually use accumulators for wiring in debugging, profiling, monitoring and diagnostics into Spark jobs. I usually fire off a Future before running a Spark job to periodically print the stats (e.g. TPS, histograms, counts, timings, etc)
So far I cannot find anything that is similar for Kafka Streams. Does anything exist? I imagine this is possible at least for each instance of a Kafka app, but to make this work across several instances would require creating an intermediate topic.
Of course we want to be able to call this accumulator in a similar way to the Spark Accumulator while preserving guarantees. Example usage:
val countAccumulator: Accumulator[Long] = ... Future { every(1 minute) { logger.info("Processed " + countAccumulator.value + " records") } } stream.map(x => { countAccumulator.add(1) x })