Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16355

ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1
    • None
    • streams
    • None

    Description

      While a Streams application was restoring its state after an outage, it hit the following:

      org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=<TOPIC>, partition=16, offset=454875695, stacktrace=java.util.ConcurrentModificationException
      at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
      at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
      at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
      at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
      at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
      at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
      at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
      at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
      at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
      at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
      at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
      at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
      at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

      That specific application is running 3.5.1, but looking at InMemoryTimeOrderedKeyValueBuffer.evictWhile(), it seems the code has not changed much since then so it may still happen.

      Attachments

        Activity

          People

            ksolves.kafka Ksolves India Limited
            mimaison Mickael Maison
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: