Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-579

KafkaSystemConsumer drops SSPs on failure

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 0.9.0
    • Component/s: kafka
    • Labels:
      None

      Description

      While running SAMZA-394, I discovered a bug in KafkaSystemConsumer that causes it to stop consuming under failure scenarios. This does not cause data loss, but can wedge a container until it's restarted.

      The trigger appears to be when a BrokerProxy fetches from a broker that's still coming up, and hasn't yet claimed ownership for a TopicAndPartition. When the fetch fails, the BrokerProxy abdicate()s the TopicAndPartition, and KafkaSystemConsumer tries to refresh to get the leader. If there is no leader, the KafkaSystemConsumer drops the SSP. This happens in KafkaSystemConsumer.refreshBrokers.

      1. SAMZA-579-0.patch
        2 kB
        Chris Riccomini
      2. SAMZA-579-1.patch
        11 kB
        Chris Riccomini
      3. SAMZA-579-2.patch
        14 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Thanks! Merged and committed.

          Show
          criccomini Chris Riccomini added a comment - Thanks! Merged and committed.
          Hide
          closeuris Yan Fang added a comment -

          Look good! Thank you. +1

          Show
          closeuris Yan Fang added a comment - Look good! Thank you. +1
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching updated patch based on Yan Fang's feedback.

          1. Merged nextOffsets and droppedTopicAndPartitions into one map.
          2. Eliminated params from refreshBrokers. Every time refreshBrokers is called, it tries to refresh any pairs in topicPartitionsAndOffsets. Whenever a broker is assigned, the TopicAndPartition is removed from topicPartitionsAndOffsets.
          3. Updated abdicate and refreshDropped to manipulate topicPartitionsAndOffsets as required.
          4. Added a check in refreshBrokers (inside critical section) to prevent double-listening to a TopicandPartition.

          For (3), the only places (outside the critical section) where topicPartitionsAndOffsets is manipulated are in start() and abdicate(). Start is safe to manipulate. Abdicate is safe as well. When a BP abdicates, it adds the TP back into the topicPartitionsAndOffsets variable and calls refreshBrokers. The only point where a TP is removed is in the critical section. A TP should never be re-added to topicPartitionsAndOffsets until the one and only BP that owned it has abdicated.

          Show
          criccomini Chris Riccomini added a comment - Attaching updated patch based on Yan Fang 's feedback. Merged nextOffsets and droppedTopicAndPartitions into one map. Eliminated params from refreshBrokers. Every time refreshBrokers is called, it tries to refresh any pairs in topicPartitionsAndOffsets. Whenever a broker is assigned, the TopicAndPartition is removed from topicPartitionsAndOffsets. Updated abdicate and refreshDropped to manipulate topicPartitionsAndOffsets as required. Added a check in refreshBrokers (inside critical section) to prevent double-listening to a TopicandPartition. For (3), the only places (outside the critical section) where topicPartitionsAndOffsets is manipulated are in start() and abdicate(). Start is safe to manipulate. Abdicate is safe as well. When a BP abdicates, it adds the TP back into the topicPartitionsAndOffsets variable and calls refreshBrokers. The only point where a TP is removed is in the critical section. A TP should never be re-added to topicPartitionsAndOffsets until the one and only BP that owned it has abdicated.
          Hide
          closeuris Yan Fang added a comment -

          just one comment in the javadoc and a little confusion in the "synchronization". Thank you.

          Show
          closeuris Yan Fang added a comment - just one comment in the javadoc and a little confusion in the "synchronization". Thank you.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching updated patch. RB still at:

          https://reviews.apache.org/r/31520/

          New patch takes the right approach, and temporarily drops SSPs that are unavailable, while continuing to consume from those that are.

          1. Misc cleanup in join job classes from SAMZA-394.
          2. Added a droppedTopicAndPartitions variable in KafkaSystemConsumer to track dropped TopicAndPartitions.
          3. Add MessageSink.refreshDropped()
          4. Call MessageSink.refreshDropped() periodically from BrokerProxy thread to get dropped partitions.
          5. Added synchronization in refreshBrokers.

          For (5), I noticed that abdicate calls refreshBrokers. This can be invoked from any thread. There is a race condition where a BrokerProxy might get created twice from the same broker (just using a mutable.Map for brokerProxies), or where droppedTopicAndPartitions might get double written, and keep a dropped TP that should have been removed. I synchronized what I believe is the critical section.

          Ran SAMZA-394 tests, which were triggering this problem, and patch fixes this.

          Show
          criccomini Chris Riccomini added a comment - Attaching updated patch. RB still at: https://reviews.apache.org/r/31520/ New patch takes the right approach, and temporarily drops SSPs that are unavailable, while continuing to consume from those that are. Misc cleanup in join job classes from SAMZA-394 . Added a droppedTopicAndPartitions variable in KafkaSystemConsumer to track dropped TopicAndPartitions. Add MessageSink.refreshDropped() Call MessageSink.refreshDropped() periodically from BrokerProxy thread to get dropped partitions. Added synchronization in refreshBrokers. For (5), I noticed that abdicate calls refreshBrokers. This can be invoked from any thread. There is a race condition where a BrokerProxy might get created twice from the same broker (just using a mutable.Map for brokerProxies), or where droppedTopicAndPartitions might get double written, and keep a dropped TP that should have been removed. I synchronized what I believe is the critical section. Ran SAMZA-394 tests, which were triggering this problem, and patch fixes this.
          Hide
          criccomini Chris Riccomini added a comment -

          Before the patch, if the KafkaSystemConsumer drops the failed SSP, will it keep consume other SSPs?

          Yes, I believe so.

          If yes, do we want to add a time-out or retrying time for the "case None "? Otherwise, if the first SSP is failure, the KafkaSystemConsumer goes into an infinite loop and actually does nothing.

          I don't think we ever want to stop consuming an SSP forever. With a timeout, this is what we'd do. I think the ideal way to handle this would be to not block, but continue retrying to refresh the missing SSPs periodically. That way, you begin consuming the "dropped" SSPs when they're available, but you don't block existing SSPs when one becomes unavailable. This is what I was proposing up above. Let me have a look at how invasive this change would be.

          Show
          criccomini Chris Riccomini added a comment - Before the patch, if the KafkaSystemConsumer drops the failed SSP, will it keep consume other SSPs? Yes, I believe so. If yes, do we want to add a time-out or retrying time for the "case None "? Otherwise, if the first SSP is failure, the KafkaSystemConsumer goes into an infinite loop and actually does nothing. I don't think we ever want to stop consuming an SSP forever. With a timeout, this is what we'd do. I think the ideal way to handle this would be to not block, but continue retrying to refresh the missing SSPs periodically. That way, you begin consuming the "dropped" SSPs when they're available, but you don't block existing SSPs when one becomes unavailable. This is what I was proposing up above. Let me have a look at how invasive this change would be.
          Hide
          closeuris Yan Fang added a comment -

          Before the patch, if the KafkaSystemConsumer drops the failed SSP, will it keep consume other SSPs?

          If yes, do we want to add a time-out or retrying time for the "case None "? Otherwise, if the first SSP is failure, the KafkaSystemConsumer goes into an infinite loop and actually does nothing.

          Show
          closeuris Yan Fang added a comment - Before the patch, if the KafkaSystemConsumer drops the failed SSP, will it keep consume other SSPs? If yes, do we want to add a time-out or retrying time for the "case None "? Otherwise, if the first SSP is failure, the KafkaSystemConsumer goes into an infinite loop and actually does nothing.
          Show
          criccomini Chris Riccomini added a comment - RB at https://reviews.apache.org/r/31520/
          Hide
          criccomini Chris Riccomini added a comment -

          Yan Fang, yeah I do. I just wanted to think about it a bit.

          I am not going to pursue the optimization that I commented on above (dropping the SSP temporarily in order to unblock SSPs that have a leader) because all of this code is going to get re-written when the Kafka 0.9 consumer is released. I believe we can live without this optimization in the meantime.

          Running the SAMZA-394 test showed that this patch works.

          Show
          criccomini Chris Riccomini added a comment - Yan Fang , yeah I do. I just wanted to think about it a bit. I am not going to pursue the optimization that I commented on above (dropping the SSP temporarily in order to unblock SSPs that have a leader) because all of this code is going to get re-written when the Kafka 0.9 consumer is released . I believe we can live without this optimization in the meantime. Running the SAMZA-394 test showed that this patch works.
          Hide
          closeuris Yan Fang added a comment -

          do you want to have a RB for it ? Though it's simple.

          Show
          closeuris Yan Fang added a comment - do you want to have a RB for it ? Though it's simple.
          Hide
          criccomini Chris Riccomini added a comment -

          Right now, I just Thread.sleep(10000) in the KafkaSystemConsumer. This is a blocking operation, which will effectively stop all consuming from the BrokerProxy until a leader is available. A better solution might be to not block, but temporarily drop the SSP, and try again in a few seconds. This allows other non-abdicated SSPs to continue being consumed in the mean-time.

          Show
          criccomini Chris Riccomini added a comment - Right now, I just Thread.sleep(10000) in the KafkaSystemConsumer. This is a blocking operation, which will effectively stop all consuming from the BrokerProxy until a leader is available. A better solution might be to not block, but temporarily drop the SSP, and try again in a few seconds. This allows other non-abdicated SSPs to continue being consumed in the mean-time.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching a patch that fixes the problem. Running with the SAMZA-394 integration test shows the containers no longer wedge.

          Show
          criccomini Chris Riccomini added a comment - Attaching a patch that fixes the problem. Running with the SAMZA-394 integration test shows the containers no longer wedge.

            People

            • Assignee:
              criccomini Chris Riccomini
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development