Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4344

Exception when accessing partition, offset and timestamp in processor class

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Not A Bug
    • 0.10.1.0
    • None
    • streams
    • None

    Description

      I have a kafka stream pipeline like below
      source topic stream -> filter for null value ->map to make it keyed by id ->custom processor to mystore ->to another topic -> ktable

      I am hitting the below type of exception in a custom processor class if I try to access offset() or partition() or timestamp() from the ProcessorContext in the process() method. I was hoping it would return the partition and offset for the enclosing topic(in this case source topic) where its consuming from or -1 based on the api docs.

      java.lang.IllegalStateException: This should not happen as offset() should only be called while a record is processed
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.offset(ProcessorContextImpl.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
      at com.sai.repo.MyStore.process(MyStore.java:72) ~[classes!/:?]
      at com.sai.repo.MyStore.process(MyStore.java:39) ~[classes!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.0.jar!/:?]
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.0.jar!/:?]

      Attachments

        Issue Links

          Activity

            People

              guozhang Guozhang Wang
              saimishra saiprasad mishra
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: