Kafka
  1. Kafka
  2. KAFKA-693

Consumer rebalance fails if no leader available for a partition and stops all fetchers

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      I am currently experiencing this with the MirrorMaker but I assume it happens for any rebalance. The symptoms are:

      I have replication factor of 1

      1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config mirror-consumer.properties --producer.config mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 --num.producers=1) with a broker down
      1.1 I set the refresh.leader.backoff.ms to 600000 (10min) so that the ConsumerFetcherManager doesn't retry to often to get the unavailable partitions
      1.2 The rebalance starts at the init step and fails: Exception in thread "main" kafka.common.ConsumerRebalanceFailedException: KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
      1.3 After the exception, everything stops (fetchers and queues)
      1.4 I attached the full logs (info & debug) for this case

      2. If i start the MirrorMaker with all the brokers up and then kill a broker
      2.1 The first rebalance is successful
      2.2 The consumer will handle correctly the broker down and stop the associated ConsumerFetcherThread
      2.3 The refresh.leader.backoff.ms to 600000 works correctly
      2.4 If something triggers a rebalance (new topic, partition reassignment...), then we go back to 1., the rebalance fails and stops everything.

      I think the desired behavior is to consumer whatever is available, and try later at some intervals. I would be glad to help on that issue although the Consumer code seems a little tough to get on.

      1. KAFKA-693.patch
        10 kB
        Maxime Brugidou
      2. KAFKA-693-v2.patch
        13 kB
        Maxime Brugidou
      3. KAFKA-693-v3.patch
        15 kB
        Maxime Brugidou
      4. mirror_debug.log
        151 kB
        Maxime Brugidou
      5. mirror.log
        70 kB
        Maxime Brugidou

        Issue Links

          Activity

          Neha Narkhede made changes -
          Labels p2
          Neha Narkhede made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Jun Rao made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Fix Version/s 0.8 [ 12317244 ]
          Resolution Fixed [ 1 ]
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Committed to 0.8 with the following minor changes.

          1. ConsumerFetcherManager: fixed the bug in the new warn logging.
          2. AbstractFetcherThread: moved isOffsetInvalid() to where InvalidOffset is defined.

          Show
          Jun Rao added a comment - Thanks for the patch. Committed to 0.8 with the following minor changes. 1. ConsumerFetcherManager: fixed the bug in the new warn logging. 2. AbstractFetcherThread: moved isOffsetInvalid() to where InvalidOffset is defined.
          Maxime Brugidou made changes -
          Attachment KAFKA-693-v3.patch [ 12565320 ]
          Hide
          Maxime Brugidou added a comment -

          Added v3 with your remarks

          Show
          Maxime Brugidou added a comment - Added v3 with your remarks
          Hide
          Jun Rao added a comment -

          Thanks for patch v2. Looks good. Some minor comments:

          11. I think we still need to change ConsumerFetcherManager.doWork(): Currently, if we hit an exception when calling addFetcher(), we won't remove any partition from noLeaderPartitionSet, include those that have been processed successfully. We can change it so that we remove each partition from noLeaderPartitionSet after calling addFetcher() successfully.

          20. AbstractFetcherThread: Instead of doing initialOffset < 0, could we define an isOffsetInvalid() method?

          Show
          Jun Rao added a comment - Thanks for patch v2. Looks good. Some minor comments: 11. I think we still need to change ConsumerFetcherManager.doWork(): Currently, if we hit an exception when calling addFetcher(), we won't remove any partition from noLeaderPartitionSet, include those that have been processed successfully. We can change it so that we remove each partition from noLeaderPartitionSet after calling addFetcher() successfully. 20. AbstractFetcherThread: Instead of doing initialOffset < 0, could we define an isOffsetInvalid() method?
          Hide
          Maxime Brugidou added a comment -

          10. Created PartitionTopicInfo.InvalidOffset

          11. In ConsumerFetcherManager.doWork(), I believe that addFetcher() is called before the partition is removed from noLeaderPartitionSet, if an exception is caught the partition will still be in the noLeaderPartitionSet, so I didn't change anything

          12. done

          13. done

          Show
          Maxime Brugidou added a comment - 10. Created PartitionTopicInfo.InvalidOffset 11. In ConsumerFetcherManager.doWork(), I believe that addFetcher() is called before the partition is removed from noLeaderPartitionSet, if an exception is caught the partition will still be in the noLeaderPartitionSet, so I didn't change anything 12. done 13. done
          Maxime Brugidou made changes -
          Attachment KAFKA-693-v2.patch [ 12565097 ]
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Some comments:

          10. ZookeeperConsumerConnector: Let's define a constant InvalidOffset, instead of using -1 directly.

          11. ConsumerFetcherManager.doWork(): After we identify the leader of a partition, the leader could change immediately. So, we may hit the exception when calling addFetcher(). When this happens, we haven't added the partition to the fetcher and we don't want to lose it. So, we should add it back to noLeaderPartitionSet so that we can find the new leader later.

          12. ReplicaFetcherThread: Yes, it should also throw an exception if getOffsetBefore returns an error.

          13. AbstractFetcherThread.doWork(): We need to handle the exception when calling handleOffsetOutOfRange(). If we get an exception, we should add the partition to partitionsWithError. This will cover both ConsumerFetcherThread and ReplicaFetcherThread.

          Show
          Jun Rao added a comment - Thanks for the patch. Some comments: 10. ZookeeperConsumerConnector: Let's define a constant InvalidOffset, instead of using -1 directly. 11. ConsumerFetcherManager.doWork(): After we identify the leader of a partition, the leader could change immediately. So, we may hit the exception when calling addFetcher(). When this happens, we haven't added the partition to the fetcher and we don't want to lose it. So, we should add it back to noLeaderPartitionSet so that we can find the new leader later. 12. ReplicaFetcherThread: Yes, it should also throw an exception if getOffsetBefore returns an error. 13. AbstractFetcherThread.doWork(): We need to handle the exception when calling handleOffsetOutOfRange(). If we get an exception, we should add the partition to partitionsWithError. This will cover both ConsumerFetcherThread and ReplicaFetcherThread.
          Maxime Brugidou made changes -
          Assignee Maxime Brugidou [ brugidou ]
          Maxime Brugidou made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Maxime Brugidou added a comment -

          Here is a patch:

          1. AbstractFetchThread.addPartition(): call handleOffsetOutOfRange if initialOffset < 0

          2. I didnt touch ConsumerFetcherManager.doWork() since addFetcher() is called for partitions with leaders only (which is why 3 is unnecessary).

          3. ConsumerFetcherThrad.handleOffsetOutOfRange: check partitionErrorAndOffset.error and throw appropriate exception (which should have been done anyway, I don't think this is necessary for the patch)
          3.1 Note: this should probably be done in the ReplicaFetcherThread too?

          4. ZookeeperConsumerConnector.ZkRebalanceListener: Do not compute leaderIdForPartitionMap in rebalance() and set PartitionTopicInfo offsets to -1 if not in Zk (new consumer)

          5. PartitionTopicInfo: removed brokerId

          6. Fixed tests for compilation (I am having a hard time running tests since ./sbt test does not seem to work for me very well)

          7. Should we increase the default refresh.leader.backoff.ms ? It's tradeoff between being able to pick fast a new leader to consume (useful when replication is on) and not flooding the broker when there is no leader (or replication is off). 200ms is very short, but something hybrid like "try 5 times at 200ms backoff, then every 5min" would get all use cases.

          I am running this on test clusters with a mirrormaker andthe error that I had in my initial test case (in the description) does not occur anymore.

          Show
          Maxime Brugidou added a comment - Here is a patch: 1. AbstractFetchThread.addPartition(): call handleOffsetOutOfRange if initialOffset < 0 2. I didnt touch ConsumerFetcherManager.doWork() since addFetcher() is called for partitions with leaders only (which is why 3 is unnecessary). 3. ConsumerFetcherThrad.handleOffsetOutOfRange: check partitionErrorAndOffset.error and throw appropriate exception (which should have been done anyway, I don't think this is necessary for the patch) 3.1 Note: this should probably be done in the ReplicaFetcherThread too? 4. ZookeeperConsumerConnector.ZkRebalanceListener: Do not compute leaderIdForPartitionMap in rebalance() and set PartitionTopicInfo offsets to -1 if not in Zk (new consumer) 5. PartitionTopicInfo: removed brokerId 6. Fixed tests for compilation (I am having a hard time running tests since ./sbt test does not seem to work for me very well) 7. Should we increase the default refresh.leader.backoff.ms ? It's tradeoff between being able to pick fast a new leader to consume (useful when replication is on) and not flooding the broker when there is no leader (or replication is off). 200ms is very short, but something hybrid like "try 5 times at 200ms backoff, then every 5min" would get all use cases. I am running this on test clusters with a mirrormaker andthe error that I had in my initial test case (in the description) does not occur anymore.
          Maxime Brugidou made changes -
          Attachment KAFKA-693.patch [ 12564916 ]
          Hide
          Jun Rao added a comment -

          Another quick thought on this. Instead of using Option for offset, we could still use AtomicLong and pass in sth like -1 to indicate a non-exist offset.

          Show
          Jun Rao added a comment - Another quick thought on this. Instead of using Option for offset, we could still use AtomicLong and pass in sth like -1 to indicate a non-exist offset.
          Hide
          Maxime Brugidou added a comment -

          Looks good, It should work but I still have a pain point about PartitionTopicInfo that uses AtomicLong to track consume/fetch offsets. Using Option[AtomicLong] looks strange, because I have to change the 2 counters and make them variables... And it's probably not thread safe at all so I would need some sort of lock to "initialize" the counters.

          Show
          Maxime Brugidou added a comment - Looks good, It should work but I still have a pain point about PartitionTopicInfo that uses AtomicLong to track consume/fetch offsets. Using Option [AtomicLong] looks strange, because I have to change the 2 counters and make them variables... And it's probably not thread safe at all so I would need some sort of lock to "initialize" the counters.
          Hide
          Jun Rao added a comment -

          That's a good point. I overlooked this. Your understanding is correct. We could move the offset initialization logic into AbstractFetcherThread. The following is one way to do this. Not sure if this is the best way.

          1. In AbstractFetcher:
          Change addPartition to pass in initialOffset: Option[Long].
          If initialOffset is none, we call handleOffsetOutOfRange to get the offset. If we hit any exception while doing this, we pass the exception to the caller without adding the partition to partitionMap.

          2. In ConsumerFetcherManager.doWork():
          If we hit any exception when calling addFetcher, we add the partition back to noLeaderPartitionSet.

          3. In ConsumerFetcherThread.handleOffsetOutOfRange():
          We need to check if the offset response has any error. If so, we throw an exception to the caller.

          4. In ZookeeperConsumerConnector.addPartitionTopicInfo(): If initial offset doesn't exist in ZK, we pass in none to PartitionTopicInfo.

          5. In PartitionTopicInfo: Make fetchedOffset Option[AtomicLong].

          Show
          Jun Rao added a comment - That's a good point. I overlooked this. Your understanding is correct. We could move the offset initialization logic into AbstractFetcherThread. The following is one way to do this. Not sure if this is the best way. 1. In AbstractFetcher: Change addPartition to pass in initialOffset: Option [Long] . If initialOffset is none, we call handleOffsetOutOfRange to get the offset. If we hit any exception while doing this, we pass the exception to the caller without adding the partition to partitionMap. 2. In ConsumerFetcherManager.doWork(): If we hit any exception when calling addFetcher, we add the partition back to noLeaderPartitionSet. 3. In ConsumerFetcherThread.handleOffsetOutOfRange(): We need to check if the offset response has any error. If so, we throw an exception to the caller. 4. In ZookeeperConsumerConnector.addPartitionTopicInfo(): If initial offset doesn't exist in ZK, we pass in none to PartitionTopicInfo. 5. In PartitionTopicInfo: Make fetchedOffset Option [AtomicLong] .
          Hide
          Maxime Brugidou added a comment -

          I looked up the code in details and I am stuck because during the rebalance() operation, the ZookeeperConsumerConnector's topicRegistry is updated with some PartitionTopicInfo that needs to store the consumerOffset and fetchOffset. During addPartitionTopicInfo(), the consumer offset is read from Zookeeper, however it needs to be initialized if no offsetString is available on Zookeeper (first time starting a consumer), and we need to access the broker/leader to get the start offset (using SimpleConsumer.earliestOrLatestOffset() in addPartitionTopicInfo()).

          I digged a bit and we could probably initialize the offset later in the ConsumerFetcherManager? I could help with a patch if i get general directions because i'm not 100% familiar with the codebase yet.

          Show
          Maxime Brugidou added a comment - I looked up the code in details and I am stuck because during the rebalance() operation, the ZookeeperConsumerConnector's topicRegistry is updated with some PartitionTopicInfo that needs to store the consumerOffset and fetchOffset. During addPartitionTopicInfo(), the consumer offset is read from Zookeeper, however it needs to be initialized if no offsetString is available on Zookeeper (first time starting a consumer), and we need to access the broker/leader to get the start offset (using SimpleConsumer.earliestOrLatestOffset() in addPartitionTopicInfo()). I digged a bit and we could probably initialize the offset later in the ConsumerFetcherManager? I could help with a patch if i get general directions because i'm not 100% familiar with the codebase yet.
          Hide
          Jun Rao added a comment -

          Ok. This is actually a real problem. During rebalance, we actually try to get the leader even though we don't really need it at rebalancing time. The fix seems easy.

          In ZKRebalancerListener.addPartitionTopicInfo(), we don't really need to get the leaderId, which is not used in PartitionTopicInfo. So, we can just get rid of that code. We can also get rid of the code in rebalance() that computes leaderIdForPartitionsMap.

          Show
          Jun Rao added a comment - Ok. This is actually a real problem. During rebalance, we actually try to get the leader even though we don't really need it at rebalancing time. The fix seems easy. In ZKRebalancerListener.addPartitionTopicInfo(), we don't really need to get the leaderId, which is not used in PartitionTopicInfo. So, we can just get rid of that code. We can also get rid of the code in rebalance() that computes leaderIdForPartitionsMap.
          Maxime Brugidou made changes -
          Link This issue relates to KAFKA-691 [ KAFKA-691 ]
          Maxime Brugidou made changes -
          Field Original Value New Value
          Attachment mirror.log [ 12564382 ]
          Attachment mirror_debug.log [ 12564383 ]
          Maxime Brugidou created issue -

            People

            • Assignee:
              Maxime Brugidou
              Reporter:
              Maxime Brugidou
            • Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development