Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7035

Kafka Processor's init() method sometimes is not called

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Abandoned
    • 1.0.0
    • None
    • streams
    • None

    Description

      Scenario:

      1. We have processing of Kafka Topic which is implemented with Processor API

      2. We want to collect metrics (lets say just count number of processed entities for simplicity)

      3. How we tried to organize this

      • process data with process() method and send it down the stream with context
      • on each call of process() method update the counter
      • schedule puctuate function which will send metric to special topic. Metric is build with counter

      You can find the code (we removed all business sensitive code out of it, so it should be easy to read) in attachment

       

      Problematic Kafka Streams behaviour that i can see by logging every step:

      1. We have 800000 messages in the input topic

      2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, ProcessorB, ProcessorC and ProcessorD

      3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed correctly, results are sent down the stream. Counter is upated

      4. init() method was not called for ProcessorA and ProcessorB

      5. ProcessorC and ProcessorD are created and they start to receive all the rest of data. 95-99%

      6. init() method is called for both ProcessorC and ProcessorD. It initiates punctuation, which causes Metrics message be created and sent down the metric stream periodically

      7. ProcessorA and ProcessorB are closed. init() was never called for them. So Metric entity was not sent to metrics topic

      8. Processing is finished.

       

      In the end:

      Expected:

      • 800000 entities were processed and sent to the Sink
      • Metrics entities contain counters which sum up to 800000

      Actual results:

      • 800000 entities were processed and sent to the Sink
      • Metrics entities contain counters which sum up to some number 3-6% less than 800000, for example 786543

       

      Problem:

      • init() method call is not guaranteed
      • there is no way to guarantee that all work was done by punctuate method before close()

       

       

       

      Attachments

        1. TransformProcessor.java
          2 kB
          Oleksandr Konopko

        Activity

          People

            Unassigned Unassigned
            akonopko Oleksandr Konopko
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: