Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16355

ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1
    • None
    • streams
    • None

    Description

      While a Streams application was restoring its state after an outage, it hit the following:

      org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_16, processor=KSTREAM-SOURCE-0000000000, topic=<TOPIC>, partition=16, offset=454875695, stacktrace=java.util.ConcurrentModificationException
      at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
      at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
      at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
      at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
      at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
      at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
      at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
      at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
      at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
      at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
      at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
      at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
      at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
      at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
      at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1747)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

      That specific application is running 3.5.1, but looking at InMemoryTimeOrderedKeyValueBuffer.evictWhile(), it seems the code has not changed much since then so it may still happen.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            yangpoan PoAn Yang
            mimaison Mickael Maison

            Dates

              Created:
              Updated:

              Slack

                Issue deployment