Kafka
  1. Kafka
  2. KAFKA-228

Reduce duplicate messages served by the kafka consumer for uncompressed topics

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: None
    • Labels:
      None

      Description

      Kafka guarantees at-least once delivery of messages.The high level consumer provides highly available partitioned consumption of data within the same consumer group. In the event of broker failures or consumer failures within a group, the high level consumer rebalances and redistributes the topic partitions evenly amongst the consumers in a group. With the current design, during this rebalancing operation, Kafka introduces duplicates in the consumed data.

      This JIRA improves the rebalancing operation and the consumer iterator design to guarantee 0 duplicates while consuming uncompressed topics. There will be a small number of duplicates while serving compressed data, but it will be bound by the compression batch size.

      1. kafka-228.patch
        23 kB
        Neha Narkhede
      2. kafka-228_v2.patch
        5 kB
        Jun Rao
      3. kafka-228-v3.patch
        21 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          The following things caused duplicates in the current rebalancing design in the kafka consumer -

          1. The rebalancing logic comprised of the following steps - release partition ownership, commit offsets, assign partitions, own partitions, restart fetchers. The sequence of the above operations caused duplicate messages to be delivered. For example, since partitions were released before stopping fetchers, some partitions could be owned by another consumer, and hence two consumers in the same group would continue delivering the same messages to the end user. This would change only after the fetchers were stopped and restarted with the new partition ownership decision.
          2. Since fetchers were not stopped after releasing partition ownership and until a successful rebalancing operation was completed, even if the consumer failed to rebalance after n retries, the consumer iterator would continue serving messages assuming the old partition ownership decision.
          3. In the current logic, if a rebalancing operation fails, all partition ownership was being released, the internal state was being reset before starting another attempt of rebalancing. Since the internal state was reset before committing offsets, no consumer offsets were being written to zookeeper. Due to this, until the rebalancing operation succeeded, another consumer could own some partitions and start consuming data from an older offset. This lead to duplicate messages.
          4. The consumer iterator had no knowledge of the rebalancing operation. So even if the fetcher queues are cleared before restarting the fetchers to fetch fro
          m the newly assigned partitions, the consumer iterator would continue returning messages from the current data chunk it is iterating upon. This lead to a few
          more duplicates.

          The above causes are listed in the descending order of the amount of duplicates introduced.

          This patch reduces the duplicate messages served by the kafka consumer to 0, for uncompressed topics. Changes include -

          1. Re factor the rebalancing logic to - stop fetchers, clear the fetcher queues, commit consumer offsets, clear the current data chunk in the consumer iterato
          r, release partition ownership, assign and own partitions, restart fetchers
          2. Fix the consumer iterator to synchronize the consumption of data with clearing the current data chunk and committing offsets.

          Show
          Neha Narkhede added a comment - The following things caused duplicates in the current rebalancing design in the kafka consumer - 1. The rebalancing logic comprised of the following steps - release partition ownership, commit offsets, assign partitions, own partitions, restart fetchers. The sequence of the above operations caused duplicate messages to be delivered. For example, since partitions were released before stopping fetchers, some partitions could be owned by another consumer, and hence two consumers in the same group would continue delivering the same messages to the end user. This would change only after the fetchers were stopped and restarted with the new partition ownership decision. 2. Since fetchers were not stopped after releasing partition ownership and until a successful rebalancing operation was completed, even if the consumer failed to rebalance after n retries, the consumer iterator would continue serving messages assuming the old partition ownership decision. 3. In the current logic, if a rebalancing operation fails, all partition ownership was being released, the internal state was being reset before starting another attempt of rebalancing. Since the internal state was reset before committing offsets, no consumer offsets were being written to zookeeper. Due to this, until the rebalancing operation succeeded, another consumer could own some partitions and start consuming data from an older offset. This lead to duplicate messages. 4. The consumer iterator had no knowledge of the rebalancing operation. So even if the fetcher queues are cleared before restarting the fetchers to fetch fro m the newly assigned partitions, the consumer iterator would continue returning messages from the current data chunk it is iterating upon. This lead to a few more duplicates. The above causes are listed in the descending order of the amount of duplicates introduced. This patch reduces the duplicate messages served by the kafka consumer to 0, for uncompressed topics. Changes include - 1. Re factor the rebalancing logic to - stop fetchers, clear the fetcher queues, commit consumer offsets, clear the current data chunk in the consumer iterato r, release partition ownership, assign and own partitions, restart fetchers 2. Fix the consumer iterator to synchronize the consumption of data with clearing the current data chunk and committing offsets.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Some comments below:

          1. This jira is a duplicate of kafka-198. We need to mark it.
          2. ZookeeperConsumerConnector
          2.1 commitOffsets(): the logging "committing all offsets" should be in trace or debug level
          2.2 The patch commits all offsets for clearing every stream. This is unnecessary. We just need to commit all offsets once after we cleared all streams. Also, we don't need to clear all streams. We only need to clear a stream whose fetch queue needs to be cleared. Here is one way of doing that. We keep a reference to stream in each fetch queue (there is a 1:1 mapping btw stream and fetch queue). In closeFetchers(), we (1) stop the current fetcher; (2) for each fetch queue to be cleared: clear the queue and clear the corresponding stream; (3) commit all offsets. Then, we don't need Fetcher.clearFetcherQueues and we don't need to pass in kafkaMessageStreams in Fetcher.startConnections. Also, ConsumerIterator.clearCurrentChunk doesn't need to take any input parameters.

          3. ConsumerIterator:
          The logic is a bit complicated and I am not sure if it's really necessary. To me, it seems all we need is to make currentDataChunk an AtomticReference. clearCurrentChunk() simply sets the reference to null. This will be the only synchronization that we need (no need for lock). Because of the ordering in 2.2, this will make sure the next hasNext() call on the stream blocks until the Fetcher is started again.

          Show
          Jun Rao added a comment - Thanks for the patch. Some comments below: 1. This jira is a duplicate of kafka-198. We need to mark it. 2. ZookeeperConsumerConnector 2.1 commitOffsets(): the logging "committing all offsets" should be in trace or debug level 2.2 The patch commits all offsets for clearing every stream. This is unnecessary. We just need to commit all offsets once after we cleared all streams. Also, we don't need to clear all streams. We only need to clear a stream whose fetch queue needs to be cleared. Here is one way of doing that. We keep a reference to stream in each fetch queue (there is a 1:1 mapping btw stream and fetch queue). In closeFetchers(), we (1) stop the current fetcher; (2) for each fetch queue to be cleared: clear the queue and clear the corresponding stream; (3) commit all offsets. Then, we don't need Fetcher.clearFetcherQueues and we don't need to pass in kafkaMessageStreams in Fetcher.startConnections. Also, ConsumerIterator.clearCurrentChunk doesn't need to take any input parameters. 3. ConsumerIterator: The logic is a bit complicated and I am not sure if it's really necessary. To me, it seems all we need is to make currentDataChunk an AtomticReference. clearCurrentChunk() simply sets the reference to null. This will be the only synchronization that we need (no need for lock). Because of the ordering in 2.2, this will make sure the next hasNext() call on the stream blocks until the Fetcher is started again.
          Hide
          Neha Narkhede added a comment -

          Thanks for reviewing the patch ! Here are my comments -

          1. Missed the fact that there was a bug. Marked this as a duplicate of the bug. Technically, this is an improvement, not really a bug fix.

          2.1 This info level logging is just moved from its previous location (rebalance() API) to its new location (inside commitOffsets() API). I think it is important to have this at info level to be able to understand the flow of the rebalancing operation and also debug any future issues with data duplication/data loss. Since this is not done too often, I thought it wouldn't pollute the logs.

          >> Also, we don't need to clear all streams
          2.2 According to causes 1. & 2. in my list above, you need to stop the fetchers, clear queues, clear streams BEFORE releasing the partition ownership, to avoid data duplication. Since you release all the partitions, you need to stop all the fetchers and clear all the queues.

          3. To reduce the duplicates to 0, returning a message through makeNext() needs to be exclusive to committing offsets and clearing the current iterator. In addition to this, each of those needs to execute atomically. This is why we need a lock. Also, we need to clear the 'current' iterator, not set the currentDataChunk to null.

          Show
          Neha Narkhede added a comment - Thanks for reviewing the patch ! Here are my comments - 1. Missed the fact that there was a bug. Marked this as a duplicate of the bug. Technically, this is an improvement, not really a bug fix. 2.1 This info level logging is just moved from its previous location (rebalance() API) to its new location (inside commitOffsets() API). I think it is important to have this at info level to be able to understand the flow of the rebalancing operation and also debug any future issues with data duplication/data loss. Since this is not done too often, I thought it wouldn't pollute the logs. >> Also, we don't need to clear all streams 2.2 According to causes 1. & 2. in my list above, you need to stop the fetchers, clear queues, clear streams BEFORE releasing the partition ownership, to avoid data duplication. Since you release all the partitions, you need to stop all the fetchers and clear all the queues. 3. To reduce the duplicates to 0, returning a message through makeNext() needs to be exclusive to committing offsets and clearing the current iterator. In addition to this, each of those needs to execute atomically. This is why we need a lock. Also, we need to clear the 'current' iterator, not set the currentDataChunk to null.
          Hide
          Jun Rao added a comment -

          2.1. The problem is that commitOffset is called by auto commit too. We don't want to add info level logging for every auto commit.

          2.2. Well, in general, rebalance is selective. Not every rebalance affects all topics. Suppose that there are 2 topics X and Y. X is on broker 1,2 and 3 and Y is only on broker 2 and 3. Consider a consumer group with 2 consumers c1 and c2 that consume both topics X and Y. Initially, if all brokers are alive, after rebalance, the assignment can be something like, c1 <- X (broker1, broker2), Y(broker2); c2<- X (broker3), Y(broker3). Suppose now broker1 is down. After rebalance, the assignment can be something like, c1 <- X (broker2), Y(broker2); c2<- X (broker3), Y(broker3). As you can see, topic Y doesn't really need to rebalance since the partition assignment doesn't change. Our current code handles this by not clearing the queue for topic Y in this case. The fetcher will still be restarted. It just picks up from where it left.

          3. First of all, it's not clear if the synchronization in the patch gives the exclusive access that you want. This is mainly because the lock is temporarily released in the makeNext() call. This allows other concurrent callers to sneak in. For example, it could be that makeNext() call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates.

          Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering if we can commit offsets outside of consumerIterator.

          Show
          Jun Rao added a comment - 2.1. The problem is that commitOffset is called by auto commit too. We don't want to add info level logging for every auto commit. 2.2. Well, in general, rebalance is selective. Not every rebalance affects all topics. Suppose that there are 2 topics X and Y. X is on broker 1,2 and 3 and Y is only on broker 2 and 3. Consider a consumer group with 2 consumers c1 and c2 that consume both topics X and Y. Initially, if all brokers are alive, after rebalance, the assignment can be something like, c1 <- X (broker1, broker2), Y(broker2); c2<- X (broker3), Y(broker3). Suppose now broker1 is down. After rebalance, the assignment can be something like, c1 <- X (broker2), Y(broker2); c2<- X (broker3), Y(broker3). As you can see, topic Y doesn't really need to rebalance since the partition assignment doesn't change. Our current code handles this by not clearing the queue for topic Y in this case. The fetcher will still be restarted. It just picks up from where it left. 3. First of all, it's not clear if the synchronization in the patch gives the exclusive access that you want. This is mainly because the lock is temporarily released in the makeNext() call. This allows other concurrent callers to sneak in. For example, it could be that makeNext() call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates. Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering if we can commit offsets outside of consumerIterator.
          Hide
          Neha Narkhede added a comment -

          2.1 Fair enough. I'll move it out of commitOffsets() API

          >> Well, in general, rebalance is selective
          2.2. There is one case of rebalancing (which you've listed), where not clearing the queue would help. However, the code gets slightly more complicated. But, I will make the changes and include it in the next patch.

          >> First of all, it's not clear if the synchronization in the patch gives the exclusive access that you want. This is mainly because the lock is temporarily released in the makeNext() call. This allows other concurrent callers to sneak in. For example, it could be that makeNext() call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates.
          3. The purpose of adding the locking is to do our best to safely reduce the number of duplicates served by the consumer iterator. Locking needs to be done in such a way that the lock is always released before entering a potentially blocking operation. The case you've pointed out seems very corner case, at least from the test runs (using KAFKA-227). For example, over hundreds of iterations of that test, no duplicates were reported. If you try to "fix" this case, you will risk a potential deadlock situation, which we must avoid. Given that, this amount of locking seems reasonable to me.

          >> Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering if we can commit offsets outside of consumerIterator.
          3. That protects against the duplication of the last data chunk. The best place to move this, out of the consumer iterator, is in ZookeeperConsumerConnector's closeFetchers(), after clearing after clearing the queue and clearing the current iterator. If this change is made, the number of times we write to zookeeper will also reduce

          In our offline chat, you mentioned you want to try to refactor the patch to simplify the consumer iterator code, by removing the lock altogether, and only depend on the atomic references. I was wondering if you'd like to give that a try and upload another patch ?

          Show
          Neha Narkhede added a comment - 2.1 Fair enough. I'll move it out of commitOffsets() API >> Well, in general, rebalance is selective 2.2. There is one case of rebalancing (which you've listed), where not clearing the queue would help. However, the code gets slightly more complicated. But, I will make the changes and include it in the next patch. >> First of all, it's not clear if the synchronization in the patch gives the exclusive access that you want. This is mainly because the lock is temporarily released in the makeNext() call. This allows other concurrent callers to sneak in. For example, it could be that makeNext() call gets the next chunk in getNextDataChunk, but hasn't updated currentDataChunk yet. At this moment, the fetcher queue is cleared and clearCurrentChunk is called. The latter saves the current offset (which will be used to set the fetch offset after rebalance) and sets currentDataChunk to null. After that, the makeNext() call continues in getNextDataChunk and sets currentDataChunk to a non-null value. This chunk will be fetched again after rebalance and thus introduce duplicates. 3. The purpose of adding the locking is to do our best to safely reduce the number of duplicates served by the consumer iterator. Locking needs to be done in such a way that the lock is always released before entering a potentially blocking operation. The case you've pointed out seems very corner case, at least from the test runs (using KAFKA-227 ). For example, over hundreds of iterations of that test, no duplicates were reported. If you try to "fix" this case, you will risk a potential deadlock situation, which we must avoid. Given that, this amount of locking seems reasonable to me. >> Second, calling commitOffsets inside consumerIterator seems a bit complicated. I am wondering if we can commit offsets outside of consumerIterator. 3. That protects against the duplication of the last data chunk. The best place to move this, out of the consumer iterator, is in ZookeeperConsumerConnector's closeFetchers(), after clearing after clearing the queue and clearing the current iterator. If this change is made, the number of times we write to zookeeper will also reduce In our offline chat, you mentioned you want to try to refactor the patch to simplify the consumer iterator code, by removing the lock altogether, and only depend on the atomic references. I was wondering if you'd like to give that a try and upload another patch ?
          Hide
          Jun Rao added a comment -

          Attach patch v2. It implements ConsumerIterator in an alternative way that doesn't require locking.

          To apply, apply patch v1, revert ConsumerIterator, then apply patch v2.

          Show
          Jun Rao added a comment - Attach patch v2. It implements ConsumerIterator in an alternative way that doesn't require locking. To apply, apply patch v1, revert ConsumerIterator, then apply patch v2.
          Hide
          Neha Narkhede added a comment -

          kafka-228-v2.patch had a bunch of compilation errors since it didn't use the AtomicReference's correctly. But after fixing that, I feel I like the simplicity of the ConsumerIterator in the patch. I used the new test added as part of KAFKA-227 to test both patches, and it seems like v2 is only slightly worse than v1. Thinking more about it, v2 runs a risk of duplicating an entire data chunk only if a new chunk is fetched from the queue and before setting the iterator to the data in the newly fetched chunk, the iterator is set to null. So the consumer continues to serve this newly fetched data potentially in parallel with another consumer.

          But, since fetching a new data chunk is fairly rare with a large enough fetch size, it is less probable that we hit this case.

          Show
          Neha Narkhede added a comment - kafka-228-v2.patch had a bunch of compilation errors since it didn't use the AtomicReference's correctly. But after fixing that, I feel I like the simplicity of the ConsumerIterator in the patch. I used the new test added as part of KAFKA-227 to test both patches, and it seems like v2 is only slightly worse than v1. Thinking more about it, v2 runs a risk of duplicating an entire data chunk only if a new chunk is fetched from the queue and before setting the iterator to the data in the newly fetched chunk, the iterator is set to null. So the consumer continues to serve this newly fetched data potentially in parallel with another consumer. But, since fetching a new data chunk is fairly rare with a large enough fetch size, it is less probable that we hit this case.
          Hide
          Neha Narkhede added a comment -

          This patch addresses the comments on patch v1 and includes the changes suggested in v2 -

          1. The info level statement is moved out of the commitOffsets() API
          2. Only certain fetch queues are cleared, not all
          3. Offsets are not committed in the clearCurrentChunk() API, but in closeFetchers() instead.

          Show
          Neha Narkhede added a comment - This patch addresses the comments on patch v1 and includes the changes suggested in v2 - 1. The info level statement is moved out of the commitOffsets() API 2. Only certain fetch queues are cleared, not all 3. Offsets are not committed in the clearCurrentChunk() API, but in closeFetchers() instead.
          Hide
          Jun Rao added a comment -

          Thanks for the new patch. Overall, looks pretty good. A few extra comments:

          4. ConsumerIterator: Could we make currentDataChunk a local variable in makeNext. This is an existing problem, but it would be good if we can fix it in this patch. Also, currentTopicInfo doesn't have to atomic since it's never updated concurrently.

          5. It's better to set the KafkaMessageStreams in the constructor of ZKRebalanceListener. This way, even if the ZK listener gets triggered before the ZookeeperConsumerConnector.consume completes, KafkaMessageStreams is available in the rebalance call in the listener.

          6. Fetcher: remove shutdown and keep only stopConnectionsToAllBrokers

          Show
          Jun Rao added a comment - Thanks for the new patch. Overall, looks pretty good. A few extra comments: 4. ConsumerIterator: Could we make currentDataChunk a local variable in makeNext. This is an existing problem, but it would be good if we can fix it in this patch. Also, currentTopicInfo doesn't have to atomic since it's never updated concurrently. 5. It's better to set the KafkaMessageStreams in the constructor of ZKRebalanceListener. This way, even if the ZK listener gets triggered before the ZookeeperConsumerConnector.consume completes, KafkaMessageStreams is available in the rebalance call in the listener. 6. Fetcher: remove shutdown and keep only stopConnectionsToAllBrokers
          Hide
          Neha Narkhede added a comment -

          4. currentDataChunk is a local variable in makeNext now.

          5. KafkaMessageStreams are set in the constructor of ZKRebalanceListener.

          6. Removed the shutdown() API and changed references to stopConnectionsToAllBrokers

          7. currentDataChunk doesn't have to be an AtomicReference.

          Show
          Neha Narkhede added a comment - 4. currentDataChunk is a local variable in makeNext now. 5. KafkaMessageStreams are set in the constructor of ZKRebalanceListener. 6. Removed the shutdown() API and changed references to stopConnectionsToAllBrokers 7. currentDataChunk doesn't have to be an AtomicReference.

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development