Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2494

KafkaSpout does not handle CommitFailedException

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      In situations when tuple processing takes longer than session timeout, we get CommitFailedException and instead of recovering from it Storm worker dies.

      2017-04-26 11:07:04.902 o.a.s.util [ERROR] Async loop died!
      org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) ~[storm-core-1.1.0.jar:1.1.0]
      \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0]
      \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
      2017-04-26 11:07:04.909 o.a.s.d.executor [ERROR] 
      org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) ~[stormjar.jar:3.0.2]
      \tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) ~[stormjar.jar:3.0.2]
      \tat org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) ~[storm-core-1.1.0.jar:1.1.0]
      \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0]
      \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
      2017-04-26 11:07:04.953 o.a.s.util [ERROR] Halting process: (\"Worker died\")
      java.lang.RuntimeException: (\"Worker died\")
      \tat org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.jar:1.1.0]
      \tat clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
      \tat org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763) [storm-core-1.1.0.jar:1.1.0]
      \tat org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274) [storm-core-1.1.0.jar:1.1.0]
      \tat org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) [storm-core-1.1.0.jar:1.1.0]
      \tat clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      \tat java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
      
      2017-04-26 11:07:44.507 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, timeUnit=SECONDS}}
      2017-04-26 11:07:44.516 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] Instantiated KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, timeUnit=SECONDS}, ratio=TimeInterval{length=0, timeUnit=MILLISECONDS}, maxRetries=2147483647, maxRetryDelay=TimeInterval{length=0, timeUnit=MILLISECONDS}}
      2017-04-26 11:07:45.048 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with the following configuration: KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, request.timeout.ms=30000, group.id=Group1, bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, key=org.apache.kafka.common.serialization.StringDeserializer@1b5080fd, value=org.apache.kafka.common.serialization.StringDeserializer@2720873b, pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, subscription=org.apache.storm.kafka.spout.NamedSubscription@7f068c1f, translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@1f1ca6a2, retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, timeUnit=SECONDS}}}
      2017-04-26 11:07:45.111 o.a.s.k.s.KafkaSpout [INFO] Kafka Spout opened with the following configuration: KafkaSpoutConfig{kafkaProps={enable.auto.commit=false, request.timeout.ms=30000, group.id=Group2, bootstrap.servers=192.168.1.143:9092, session.timeout.ms=20000}, key=org.apache.kafka.common.serialization.StringDeserializer@45ffa954, value=org.apache.kafka.common.serialization.StringDeserializer@4b384f9b, pollTimeoutMs=200, offsetCommitPeriodMs=5000, maxUncommittedOffsets=1000, firstPollOffsetStrategy=UNCOMMITTED_EARLIEST, subscription=org.apache.storm.kafka.spout.NamedSubscription@4f07c224, translator=org.apache.storm.kafka.spout.SimpleRecordTranslator@a0545a0, retryService=KafkaSpoutRetryExponentialBackoff{delay=TimeInterval{length=0, timeUnit=SECONDS}, ratio=TimeInterval{length=2, timeUnit=MILLISECONDS}, maxRetries=2147483647, maxRetryDelay=TimeInterval{length=10, timeUnit=SECONDS}}}
      2017-04-26 11:07:45.297 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer subscribed topics [topic-1]
      2017-04-26 11:07:45.302 o.a.s.k.s.NamedSubscription [INFO] Kafka consumer subscribed topics [topic-2]
      2017-04-26 11:07:45.456 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. [consumer-group=Group1, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, topic-partitions=[]]
      2017-04-26 11:07:45.463 o.a.s.k.s.KafkaSpout [INFO] Partitions revoked. [consumer-group=Group1, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, topic-partitions=[]]
      2017-04-26 11:07:45.545 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. [consumer-group=Group1, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@275d5222, topic-partitions=[topic-1]]
      2017-04-26 11:07:45.546 o.a.s.k.s.KafkaSpout [INFO] Partitions reassignment. [consumer-group=Group1, consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32cbdbb0, topic-partitions=[topic-2]]
      2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated OffsetManager{topic-partition=topic-1, fetchOffset=11803, committedOffset=11802, ackedMsgs=[]}
      2017-04-26 11:07:45.551 o.a.s.k.s.i.OffsetManager [DEBUG] Instantiated OffsetManager{topic-partition=topic-2, fetchOffset=11801, committedOffset=11800, ackedMsgs=[]}
      2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
      2017-04-26 11:07:45.552 o.a.s.k.s.KafkaSpout [INFO] Initialization complete
      

      I think expected behaviour would be that KafkaSpout would recover from exception (client will reconnect and get partitions reassigned) without worker getting killed.

        Issue Links

          Activity

          Hide
          Srdo Stig Rohde Døssing added a comment -

          Hugo Louro This is caused by using the subscribe API. I don't think we should try to make the code work around this. Upgrading to 1.2.0 for https://issues.apache.org/jira/browse/STORM-2640 will solve this.

          Show
          Srdo Stig Rohde Døssing added a comment - Hugo Louro This is caused by using the subscribe API. I don't think we should try to make the code work around this. Upgrading to 1.2.0 for https://issues.apache.org/jira/browse/STORM-2640 will solve this.
          Hide
          hmclouro Hugo Louro added a comment -

          Stig Rohde Døssing I understand that. The only reason I decided to put some thought into a possible fix is to cover the scenario where a customer that is in Storm 1.1.0 does not want to upgrade to 1.2.0. But my idea all along is to advise to move manual partition assignment (i.e. upgrade to Storm 1.2.0).

          Show
          hmclouro Hugo Louro added a comment - Stig Rohde Døssing I understand that. The only reason I decided to put some thought into a possible fix is to cover the scenario where a customer that is in Storm 1.1.0 does not want to upgrade to 1.2.0. But my idea all along is to advise to move manual partition assignment (i.e. upgrade to Storm 1.2.0).
          Hide
          Srdo Stig Rohde Døssing added a comment -

          Hugo Louro Okay, I just wanted to make sure you didn't spend time debugging to find out why this happens

          Regarding fixing this, I think it might not be easy. We'd need to make sure that the spout always calls poll before commit in nextTuple, so it can trigger rebalance in case the spout has been idle for longer than the session timeout, and we'd probably also need to change the rebalance listener logic. It currently assumes that when onPartitionsRevoked runs, the spout was assigned the partitions it has offset managers for.

          Users who don't want to upgrade storm-kafka-client for some reason can still work around this by setting a higher session timeout, and newer Kafka versions allow you to set it pretty high without losing fast detection of dead consumers due to KIP-62.

          Show
          Srdo Stig Rohde Døssing added a comment - Hugo Louro Okay, I just wanted to make sure you didn't spend time debugging to find out why this happens Regarding fixing this, I think it might not be easy. We'd need to make sure that the spout always calls poll before commit in nextTuple, so it can trigger rebalance in case the spout has been idle for longer than the session timeout, and we'd probably also need to change the rebalance listener logic. It currently assumes that when onPartitionsRevoked runs, the spout was assigned the partitions it has offset managers for. Users who don't want to upgrade storm-kafka-client for some reason can still work around this by setting a higher session timeout, and newer Kafka versions allow you to set it pretty high without losing fast detection of dead consumers due to KIP-62.
          Hide
          hmclouro Hugo Louro added a comment -
          Show
          hmclouro Hugo Louro added a comment - STORM-2640 also fixes STORM-2494

            People

            • Assignee:
              hmclouro Hugo Louro
              Reporter:
              yubarseg@gmail.com Yuri Barseghyan
            • Votes:
              2 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development