Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3176

KafkaSpout commit offset occurs CommitFailedException which leads to worker dead

    XMLWordPrintableJSON

Details

    Description

      KafkaSpout use the commitAsync api of Consumer, if the interval time between the call of consumer.poll() more than max.poll.interval.ms or the heartbeat of consumer timeout, that will occur CommitFailedException,  and then the worker will die, the log like this: 

      // 2018-07-31 19:19:03.341 o.a.s.util [ERROR] Async loop died!
      org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer th
      an the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in
      poll() with max.poll.records.
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) ~[stormjar.jar:?]
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) ~[stormjar.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1126) ~[stormjar.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:XXX) ~[stormjar.jar:?]
      at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:430) ~[stormjar.jar:?]
      at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:264) ~[stormjar.jar:?]
      at org.apache.storm.daemon.executor$fn__10936$fn__10951$fn__10982.invoke(executor.clj:647) ~[XXX.jar:?]
      at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [XXX.jar:?]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
      2018-07-31 19:19:03.342 o.a.s.d.executor [ERROR]
      

      I find it will catch the Exception in auto-commit mode of consumer, the source code is:

      // private void maybeAutoCommitOffsetsSync(long timeoutMs) {
          if (autoCommitEnabled) {
              Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
              try {
                  log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);
                  if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
                      log.debug("Auto-commit of offsets {} for group {} timed out before completion",
                              allConsumedOffsets, groupId);
              } catch (WakeupException | InterruptException e) {
                  log.debug("Auto-commit of offsets {} for group {} was interrupted before completion",
                          allConsumedOffsets, groupId);
                  // rethrow wakeups since they are triggered by the user
                  throw e;
              } catch (Exception e) {
                  // consistent with async auto-commit failures, we do not propagate the exception
                  log.warn("Auto-commit of offsets {} failed for group {}: {}", allConsumedOffsets, groupId,
                          e.getMessage());
              }
          }
      }
      

      I think KafkaSpout should do like this, catch the Exception avoid to worker die. And when the msg ack failed, Spout should judge the offset of the msgID is larger than the last commit offset(Spout can guarantee that these msgs which offset less than the last commit offset are all ack), if not, the msg should not retry.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wangzzu Matt Wang
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h