Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-4055

ConcurrentModificationException on KafkaConsumer when running topology with Metrics Reporters

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 2.6.1
    • 2.7.0
    • storm-kafka-client
    • None

    Description

      After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are seeing ConcurrentModificationException in our topology at runtime. I believe this is due to the re-use of a KafkaConsumer instance between the KafkaSpout and the 
      KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and 2.6.1.
       

      Steps to Reproduce:

      Configure a topology with a basic KafkaSpout. Configure the topology with one of the metrics loggers. We used our own custom one, but reproduced it with ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for us, but we did not dig into why.

      reporter config:

      topology.metrics.reporters: [
        {
          "filter": {
            "expression": ".*",
            "class": "org.apache.storm.metrics2.filters.RegexFilter"
          },
          "report.period": 15,
          "report.period.units": "SECONDS",
          "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"

          }
      ]

      Stacktrace:

      [ERROR] Exception thrown from NewRelicReporter#report. Exception was suppressed.
      java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: metrics-newRelicReporter-1-thread-1, id: 24) otherThread(id: 40)
          at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484) ~[stormjar.jar:?]
          at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465) ~[stormjar.jar:?]
          at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144) ~[stormjar.jar:?]
          at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123) ~[stormjar.jar:?]
          at org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181) ~[stormjar.jar:?]
          at org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93) ~[stormjar.jar:?]
          at org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90) ~[stormjar.jar:?]
          at com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60) ~[stormjar.jar:?]
          at com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154) ~[stormjar.jar:?]
          at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source) ~[?:?]
          at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown Source) ~[?:?]
          at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown Source) ~[?:?]
          at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source) ~[?:?]
          at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source) ~[?:?]
          at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
          at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
          at com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138) ~[stormjar.jar:?]
          at com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243) ~[metrics-core-3.2.6.jar:3.2.6]
          at com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182) [metrics-core-3.2.6.jar:3.2.6]
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
          at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) [?:?]
          at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
          at java.base/java.lang.Thread.run(Unknown Source) [?:?]

      Workaround

      Configure the with RegexFilter or similar that excludes the KafkaOffsetPartitionMetrics.

      Impact

      I am concerned that depending on the timing of the access to the spout that the offending metric could fast forward or rewind the spout. I did not do any further testing to see if the lock could be mis-managed in such a way that the spout is directly impacted, but it is feasible. Impact may need to be adjusted if it is confirmed that a simple metric reporter could result in skipping events or re-processing them.

      Potential Code Issues:

      KafkaSpout.java

      private transient Consumer<K, V> consumer;
      ...

      public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
              ...
              //this consumer will be used by the spout everywhere
              consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
              tupleListener.open(conf, context);
              this.kafkaOffsetMetricManager
                  = new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);

      () -> consumer does not appear to be a safe provider.  It re-uses the same instance of the KafkaConsumer as the KafkaSpout in another thread and KafkaConsumer is not thread safe.

      KafkaOffsetPartitionMetrics.java: getBeginningOffsets, getEndOffsets{}

       

       

      private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> topicPartitions) {
          Consumer<K, V> consumer = consumerSupplier.get();
          ...
          try {
              // This will actually try to modify the KafkaSpout instance of the consumer which could negatively impact the spout
              beginningOffsets = consumer.beginningOffsets(topicPartitions);

      {

      {    }

      }}
          ...
      }private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> topicPartitions) {
          Consumer<K, V> consumer = consumerSupplier.get();
          ...
          try {
              // This will actually try to modify the KafkaSpout instance of the consumer which could negatively impact the spout
              endOffsets = consumer.endOffsets(topicPartitions);

      {\{    }

      }}
          ...
      }

      Attachments

        Activity

          Thanks for the report. Since you have already dived into the code, would you be willing to provide a PR to fix ?

          rzo1 Richard Zowalla added a comment - Thanks for the report. Since you have already dived into the code, would you be willing to provide a PR to fix ?

          Hi rzo1  I would be happy to provide a PR for fix. It would be my first contribution to this code base, so I will want to take some time with it to ensure no other regressions. At the moment it is just difficult to find the time to pick this up now that we have a workaround. 

          castratia Anthony Castrati added a comment - Hi rzo1   I would be happy to provide a PR for fix. It would be my first contribution to this code base, so I will want to take some time with it to ensure no other regressions. At the moment it is just difficult to find the time to pick this up now that we have a workaround. 
          rabreu Rui Abreu added a comment - Fixed by: https://github.com/apache/storm/pull/3691

          People

            Unassigned Unassigned
            castratia Anthony Castrati
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 40m
                40m