Kafka
  1. Kafka
  2. KAFKA-691

Fault tolerance broken with replication factor 1

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a "stickiness" then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing.

      In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker.

      I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node.

      1. KAFKA-691-v2.patch
        7 kB
        Maxime Brugidou
      2. KAFKA-691-v1.patch
        6 kB
        Maxime Brugidou
      3. kafka-691_extra.patch
        3 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          Maxime Brugidou added a comment -

          I think the work-around is not really acceptable for me since it will consume 3x the resources (because replication of 3 is the minimum acceptable) and it will still make the cluster less available anyway (unless i have only 3 brokers).

          The thing is that 0.7 was making the cluster 100% available (for my use case, accepting data loss) as long a single broker was alive.

          A way to handle this would be to:
          1. Have a lot of partitions per topic (more than the # of brokers)
          2. Have something that rebalances the partitions and make sure a broker has a at least a partition for each topic (to make every topic "available")
          3. Have a setting in the consumer/producer that say "I don't care about partitioning, just produce/consume wherever you can"

          Show
          Maxime Brugidou added a comment - I think the work-around is not really acceptable for me since it will consume 3x the resources (because replication of 3 is the minimum acceptable) and it will still make the cluster less available anyway (unless i have only 3 brokers). The thing is that 0.7 was making the cluster 100% available (for my use case, accepting data loss) as long a single broker was alive. A way to handle this would be to: 1. Have a lot of partitions per topic (more than the # of brokers) 2. Have something that rebalances the partitions and make sure a broker has a at least a partition for each topic (to make every topic "available") 3. Have a setting in the consumer/producer that say "I don't care about partitioning, just produce/consume wherever you can"
          Hide
          Jun Rao added a comment -

          One thing we can do is to change the partitioner api so that it takes # of partitions and for each partition, an indicator whether a partition is available or not. The we can change the default partitioner to only route a message to the available partitions, if a key is not provided.

          Show
          Jun Rao added a comment - One thing we can do is to change the partitioner api so that it takes # of partitions and for each partition, an indicator whether a partition is available or not. The we can change the default partitioner to only route a message to the available partitions, if a key is not provided.
          Hide
          Maxime Brugidou added a comment -

          I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy already – just send a ReassignPartition command when you add a broker)

          I could probably implement this very quickly, I'm just not sure of how you get the availability of a partition, but i'll try to figure it out and submit a first patch tomorrow.

          Show
          Maxime Brugidou added a comment - I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy already – just send a ReassignPartition command when you add a broker) I could probably implement this very quickly, I'm just not sure of how you get the availability of a partition, but i'll try to figure it out and submit a first patch tomorrow.
          Hide
          Jay Kreps added a comment -

          That would be awesome. If you don't mind just give the proposed set of changes on the JIRA first and lets get everyone on board with how it should work since it is a reasonably important change (or, if you don't mind revising your patch we can start with that).

          Show
          Jay Kreps added a comment - That would be awesome. If you don't mind just give the proposed set of changes on the JIRA first and lets get everyone on board with how it should work since it is a reasonably important change (or, if you don't mind revising your patch we can start with that).
          Hide
          Jun Rao added a comment -

          DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not available.

          There is another tricky issue. If a partition is not available, when do we refresh the metadata to check if the partition becomes available again? Currently, we refresh the metadata if we fail to send the data. However, if we always route the messages to available partitions, we may never fail to send. One possible solution is that if there is at least one partition not available in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of time has passed (e.g., 10 mins).

          Show
          Jun Rao added a comment - DefaultEventHander.getPartitionListForTopic() returns Seq [PartitionAndLeader] . If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not available. There is another tricky issue. If a partition is not available, when do we refresh the metadata to check if the partition becomes available again? Currently, we refresh the metadata if we fail to send the data. However, if we always route the messages to available partitions, we may never fail to send. One possible solution is that if there is at least one partition not available in Seq [PartitionAndLeader] , we refresh the metadata if a configurable amount of time has passed (e.g., 10 mins).
          Hide
          Maxime Brugidou added a comment -

          Here is a first draft (v1) patch.

          1. Added the consumer property "producer.metadata.refresh.interval.ms" defaults to 600000 (10min)

          2. The metadata is refreshed every 10min (only if a message is sent), and the set of topics to refresh is tracked in the topicMetadataToRefresh Set (cleared after every refresh) - I think the added value of refreshing regardless of partition availability is to detect new partitions

          3. The good news is that I didn't touch the Partitioner API, I only changed the code to use available partitions if the key is null (as suggested by Jun), it will also throw a UnknownTopicOrPartitionException("No leader for any partition") if no partition is available at all

          Let me know what you think about this patch. I ran a producer with that code successfully and tested with a broker down.

          I now have some concerns about the consumer: the refresh.leader.backoff.ms config could help me (if i increase it to say, 10min) BUT the rebalance fails in any case since there is no leader for some partitions

          I don't have a good workaround yet for that, any help/suggestion appreciated.

          Show
          Maxime Brugidou added a comment - Here is a first draft (v1) patch. 1. Added the consumer property "producer.metadata.refresh.interval.ms" defaults to 600000 (10min) 2. The metadata is refreshed every 10min (only if a message is sent), and the set of topics to refresh is tracked in the topicMetadataToRefresh Set (cleared after every refresh) - I think the added value of refreshing regardless of partition availability is to detect new partitions 3. The good news is that I didn't touch the Partitioner API, I only changed the code to use available partitions if the key is null (as suggested by Jun), it will also throw a UnknownTopicOrPartitionException("No leader for any partition") if no partition is available at all Let me know what you think about this patch. I ran a producer with that code successfully and tested with a broker down. I now have some concerns about the consumer: the refresh.leader.backoff.ms config could help me (if i increase it to say, 10min) BUT the rebalance fails in any case since there is no leader for some partitions I don't have a good workaround yet for that, any help/suggestion appreciated.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Overall, the patch is pretty good and is well thought out. Some comments:

          1. DefaultEventHandler:
          1.1 In handle(), I don't think we need to add the if test in the following statement. The reason is that a message could fail to be sent because the leader changes immediately after the previous metadata refresh. Normally, leaders are elected very quickly. So, it makes sense to refresh the metadata again.
          if (topicMetadataToRefresh.nonEmpty)
          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
          1.2 In handle(), it seems that it's better to call the following code before dispatchSerializedData().
          if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefresh > topicMetadataRefreshInterval)

          { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet)) topicMetadataToRefresh.clear lastTopicMetadataRefresh = SystemTime.milliseconds }

          1.3 getPartition(): If none of the partitions is available, we should throw LeaderNotAvailableException, instead of UnknownTopicOrPartitionException.

          2. DefaultPartitioner: Since key is not expected to be null, we should remove the code that deals with null key.

          3. The consumer side logic is fine. The consumer rebalance is only triggered when there are changes in partitions, not when there are changes in the availability of the partition. The rebalance logic doesn't depend on a partition being available. If a partition is not available, ConsumerFetcherManager will keep refreshing metadata. If you have a replication factor of 1, you will need to set a larger refresh.leader.backoff.ms, if a broker is expected to go down for a long time.

          Show
          Jun Rao added a comment - Thanks for the patch. Overall, the patch is pretty good and is well thought out. Some comments: 1. DefaultEventHandler: 1.1 In handle(), I don't think we need to add the if test in the following statement. The reason is that a message could fail to be sent because the leader changes immediately after the previous metadata refresh. Normally, leaders are elected very quickly. So, it makes sense to refresh the metadata again. if (topicMetadataToRefresh.nonEmpty) Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet)) 1.2 In handle(), it seems that it's better to call the following code before dispatchSerializedData(). if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefresh > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet)) topicMetadataToRefresh.clear lastTopicMetadataRefresh = SystemTime.milliseconds } 1.3 getPartition(): If none of the partitions is available, we should throw LeaderNotAvailableException, instead of UnknownTopicOrPartitionException. 2. DefaultPartitioner: Since key is not expected to be null, we should remove the code that deals with null key. 3. The consumer side logic is fine. The consumer rebalance is only triggered when there are changes in partitions, not when there are changes in the availability of the partition. The rebalance logic doesn't depend on a partition being available. If a partition is not available, ConsumerFetcherManager will keep refreshing metadata. If you have a replication factor of 1, you will need to set a larger refresh.leader.backoff.ms, if a broker is expected to go down for a long time.
          Hide
          Maxime Brugidou added a comment -

          Thanks for your feedback, I updated it (v2) according to your notes (1. and 2.).

          for 3. I believe you are right, except that:
          3.1 It seems (correct me if i'm wrong) that a rebalance happen at the consumer initialization, so that means a consumer can't start if a broker is down
          3.2 Can a rebalance be triggered when a partition is added or moved? Having a broker down shouldn't prevent me from reassigning partitions or adding partitions.

          Show
          Maxime Brugidou added a comment - Thanks for your feedback, I updated it (v2) according to your notes (1. and 2.). for 3. I believe you are right, except that: 3.1 It seems (correct me if i'm wrong) that a rebalance happen at the consumer initialization, so that means a consumer can't start if a broker is down 3.2 Can a rebalance be triggered when a partition is added or moved? Having a broker down shouldn't prevent me from reassigning partitions or adding partitions.
          Hide
          Jun Rao added a comment -

          Thanks for patch v2. Committed to 0.8 by renaming lastTopicMetadataRefresh to lastTopicMetadataRefreshTime and removing an unused comment.

          3.1 Rebalance happens during consumer initialization. It only needs the partition data to be in ZK and doesn't require all brokers to be up. Of course, if a broker is not up, the consumer may not be able to consume data from it. ConsumerFetcherManager is responsible for checking if a partition becomes available again.

          3.2 If the partition path changes in ZK, a rebalance will be triggered.

          Show
          Jun Rao added a comment - Thanks for patch v2. Committed to 0.8 by renaming lastTopicMetadataRefresh to lastTopicMetadataRefreshTime and removing an unused comment. 3.1 Rebalance happens during consumer initialization. It only needs the partition data to be in ZK and doesn't require all brokers to be up. Of course, if a broker is not up, the consumer may not be able to consume data from it. ConsumerFetcherManager is responsible for checking if a partition becomes available again. 3.2 If the partition path changes in ZK, a rebalance will be triggered.
          Hide
          Maxime Brugidou added a comment -

          Thanks for committing the patch.

          3.1 Are you sure that the rebalance doesn't require all partitions to have a leader? My experience earlier today was that the rebalance would fail and throw ConsumerRebalanceFailedException after having stopped all fetchers and cleared all queues. If you are sure then i'll try to reproduce the behavior I encountered, and maybe open a separate JIRA?

          Show
          Maxime Brugidou added a comment - Thanks for committing the patch. 3.1 Are you sure that the rebalance doesn't require all partitions to have a leader? My experience earlier today was that the rebalance would fail and throw ConsumerRebalanceFailedException after having stopped all fetchers and cleared all queues. If you are sure then i'll try to reproduce the behavior I encountered, and maybe open a separate JIRA?
          Hide
          Jun Rao added a comment -

          3.1 It shouldn't. However, if you can reproduce this problem, please file a new jira.

          Show
          Jun Rao added a comment - 3.1 It shouldn't. However, if you can reproduce this problem, please file a new jira.
          Hide
          Jun Rao added a comment -

          Another potential issue is that for producers that produce many topics (like migrationTool and mirrorMaker), the time-based refreshing may need to get the metadata for many topics. This means that the metadata request is likely to timeout. One solution is to break topics into batches in BrokerPartitionInfo.updateInfo() and issue a metadata request per batch.

          Show
          Jun Rao added a comment - Another potential issue is that for producers that produce many topics (like migrationTool and mirrorMaker), the time-based refreshing may need to get the metadata for many topics. This means that the metadata request is likely to timeout. One solution is to break topics into batches in BrokerPartitionInfo.updateInfo() and issue a metadata request per batch.
          Hide
          Maxime Brugidou added a comment -

          Should i make another patch? I'll try on Monday.

          1. It would probably require yet another config variable like "producer.metadata.request.batch.size" or something like that.
          2. Should it be batched for every updateInfo() or just during the metadata refresh? It could help if we do the former because failing messages from many different topics could probably never go through if the metadata request timeouts.
          3. Isn'it getting a little convoluted? Maybe i am missing something but the producer side is getting trickier.
          4. Please note that I also opened KAFKA-693 about the consumer side. And I'd love to submit a patch but the rebalance logic seems complex so I'd prefer to have some insights first before going in the wrong direction.

          Show
          Maxime Brugidou added a comment - Should i make another patch? I'll try on Monday. 1. It would probably require yet another config variable like "producer.metadata.request.batch.size" or something like that. 2. Should it be batched for every updateInfo() or just during the metadata refresh? It could help if we do the former because failing messages from many different topics could probably never go through if the metadata request timeouts. 3. Isn'it getting a little convoluted? Maybe i am missing something but the producer side is getting trickier. 4. Please note that I also opened KAFKA-693 about the consumer side. And I'd love to submit a patch but the rebalance logic seems complex so I'd prefer to have some insights first before going in the wrong direction.
          Hide
          Jun Rao added a comment -

          It would be great if you can provide a patch.

          1,2,3. Yes, we will need a new config. We should do batching in updateinfo(). This does make the producer side logic a bit more complicated. We have been thinking about making getMetadata faster. When we get there, we can revisit the batching logic.

          Show
          Jun Rao added a comment - It would be great if you can provide a patch. 1,2,3. Yes, we will need a new config. We should do batching in updateinfo(). This does make the producer side logic a bit more complicated. We have been thinking about making getMetadata faster. When we get there, we can revisit the batching logic.
          Hide
          Jay Kreps added a comment -

          Does batching make sense versus just having people increase the timeout?

          Show
          Jay Kreps added a comment - Does batching make sense versus just having people increase the timeout?
          Hide
          Jun Rao added a comment -

          That's a good point. Increasing the timeout will work for most cases. If a broker goes down, the client request will get a socket exception immediately, independent of the timeout. So setting a large timeout doesn't hurt. When the broker host goes down and the client is waiting for a response from the server, I think the client will have to wait until the timeout. If we set a larger timeout, it means that the client has to wait longer before realizing the broker is down. However, since this is a rarer case, I think setting a larger timeout for now is probably good enough.

          Show
          Jun Rao added a comment - That's a good point. Increasing the timeout will work for most cases. If a broker goes down, the client request will get a socket exception immediately, independent of the timeout. So setting a large timeout doesn't hurt. When the broker host goes down and the client is waiting for a response from the server, I think the client will have to wait until the timeout. If we set a larger timeout, it means that the client has to wait longer before realizing the broker is down. However, since this is a rarer case, I think setting a larger timeout for now is probably good enough.
          Hide
          Maxime Brugidou added a comment -

          So I wait for your feedback first, but I guess that increasing the time out is good enough, although it's 1500ms by default which is very short.

          Show
          Maxime Brugidou added a comment - So I wait for your feedback first, but I guess that increasing the time out is good enough, although it's 1500ms by default which is very short.
          Hide
          Jun Rao added a comment -

          The last patch introduced a bug. DefaultEventHander.getPartition() is expected to return the index of the partitionList, instead of the actual partition id. Attach a patch that fixes the issue.

          Show
          Jun Rao added a comment - The last patch introduced a bug. DefaultEventHander.getPartition() is expected to return the index of the partitionList, instead of the actual partition id. Attach a patch that fixes the issue.
          Hide
          Jun Rao added a comment -

          Attach the right patch (kafka-691_extra.patch).

          Show
          Jun Rao added a comment - Attach the right patch (kafka-691_extra.patch).
          Hide
          Jun Rao added a comment -

          Actually, the current code works since partitionId is always btw 0 and num.partition-1 and therefore it happens to also be the index of the partitionList. This patch just makes the code a bit better to understand.

          Show
          Jun Rao added a comment - Actually, the current code works since partitionId is always btw 0 and num.partition-1 and therefore it happens to also be the index of the partitionList. This patch just makes the code a bit better to understand.

            People

            • Assignee:
              Maxime Brugidou
              Reporter:
              Jay Kreps
            • Votes:
              2 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development