Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7470

Thread safe accumulator across all instances

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      Spark has a useful API for accumulating data in a thread safe way https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.AccumulatorV2 and comes with some out-of-box useful accumulators e.g. for Longs https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.util.LongAccumulator

      I usually use accumulators for wiring in debugging, profiling, monitoring and diagnostics into Spark jobs. I usually fire off a Future before running a Spark job to periodically print the stats (e.g. TPS, histograms, counts, timings, etc)

      So far I cannot find anything that is similar for Kafka Streams. Does anything exist? I imagine this is possible at least for each instance of a Kafka app, but to make this work across several instances would require creating an intermediate topic.

       

      Of course we want to be able to call this accumulator in a similar way to the Spark Accumulator while preserving guarantees.  Example usage:

      val countAccumulator: Accumulator[Long] = ...
      
      Future {
        every(1 minute) {
          logger.info("Processed " + countAccumulator.value + " records")
        }
      }
      
      stream.map(x => {
        countAccumulator.add(1)
      
        x
      })

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sams sam
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: