Kafka
  1. Kafka
  2. KAFKA-345

Add a listener to ZookeeperConsumerConnector to get notified on rebalance events

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7, 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      A sample use-case

      In our scenario we partition events by userid and then apply these to some kind of state machine, that modifies the actual state of a user. So events trigger state transitions. In order to avoid the need of loading user's state upon each event processed, we cache that. But if a user's partition is moved to another consumer and then back to the previous consumer we have stale caches and hell breaks loose. I guess the same kind of problem occurs in other scenarios like counting numbers by user, too.

      1. KAFKA-345_2014-11-15_01:00:55.patch
        25 kB
        Jiangjie Qin
      2. KAFKA-345_2014-11-15_01:19:56.patch
        31 kB
        Jiangjie Qin
      3. KAFKA-345_2014-11-17_17:42:42.patch
        41 kB
        Jiangjie Qin
      4. KAFKA-345.patch
        14 kB
        Jiangjie Qin
      5. KAFKA-345.patch
        11 kB
        Peter Romianowski

        Issue Links

          Activity

          Hide
          Gwen Shapira added a comment -

          Looking at trunk, I think this got in through a different patch... closing this one.

          Show
          Gwen Shapira added a comment - Looking at trunk, I think this got in through a different patch... closing this one.
          Hide
          Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/28025/diff/
          against branch origin/trunk

          Show
          Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/28025/diff/ against branch origin/trunk
          Hide
          Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/28025/diff/
          against branch origin/trunk

          Show
          Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/28025/diff/ against branch origin/trunk
          Hide
          Jiangjie Qin added a comment -

          Updated reviewboard https://reviews.apache.org/r/28025/diff/
          against branch origin/trunk

          Show
          Jiangjie Qin added a comment - Updated reviewboard https://reviews.apache.org/r/28025/diff/ against branch origin/trunk
          Hide
          Jiangjie Qin added a comment -

          There are some other considerations regarding adding the callback to old consumer as well. First, it's a backward compatible patch, if user does not wire in the callback, there is no impact. So current user will not be affected. Secondly, it is not too complicated to add the callback and it might take some time for the new producer to be ready for production, hence it seems to worth making this available for the transitional period. I think it could also potentially provide some references for how the callback could be used in new producer.

          Show
          Jiangjie Qin added a comment - There are some other considerations regarding adding the callback to old consumer as well. First, it's a backward compatible patch, if user does not wire in the callback, there is no impact. So current user will not be affected. Secondly, it is not too complicated to add the callback and it might take some time for the new producer to be ready for production, hence it seems to worth making this available for the transitional period. I think it could also potentially provide some references for how the callback could be used in new producer.
          Hide
          Jiangjie Qin added a comment -

          Neha Narkhede It is mainly because recently we have many mirror maker issue that leads to a hard kill, which cause data loss. So we want to eliminate the data loss in mirror maker as soon as possible. KAFKA-1650 is opened for this issue. In order to do that, we need to turn off auto offset commit. That means we are almost guaranteed to have duplicates on consumer rebalance. So we want to add a callback to avoid duplicates on consumer rebalance.

          Show
          Jiangjie Qin added a comment - Neha Narkhede It is mainly because recently we have many mirror maker issue that leads to a hard kill, which cause data loss. So we want to eliminate the data loss in mirror maker as soon as possible. KAFKA-1650 is opened for this issue. In order to do that, we need to turn off auto offset commit. That means we are almost guaranteed to have duplicates on consumer rebalance. So we want to add a callback to avoid duplicates on consumer rebalance.
          Hide
          Neha Narkhede added a comment -

          Jiangjie Qin Given that we are close to starting development on the new consumer that exposes such a listener, just wondering what is the motivation behind including one in the older consumer at this moment?

          Show
          Neha Narkhede added a comment - Jiangjie Qin Given that we are close to starting development on the new consumer that exposes such a listener, just wondering what is the motivation behind including one in the older consumer at this moment?
          Hide
          Jiangjie Qin added a comment -

          Created reviewboard https://reviews.apache.org/r/28025/diff/
          against branch origin/trunk

          Show
          Jiangjie Qin added a comment - Created reviewboard https://reviews.apache.org/r/28025/diff/ against branch origin/trunk
          Hide
          Jun Rao added a comment -

          Peter,

          Regarding #2, naming it as ConsumerListener is fine.

          Show
          Jun Rao added a comment - Peter, Regarding #2, naming it as ConsumerListener is fine.
          Hide
          Peter Romianowski added a comment -

          Jun,

          regarding your comments:

          1. Adding it to the scala-version makes sense, of course. Bear with me, I'm just a Java-guy. I'll add that.

          2. I'll move the listener. I did name it "ConsumerListener" and "setListener" intentionally, because I thought it would be a good extension point for future stuff, that is not necessarily related to rebalancing. That's also why I made it an abstract class and not an interface. If you still think it should be renamed, I'll do it.

          3. I'll remove the spaces and unused imports.

          Show
          Peter Romianowski added a comment - Jun, regarding your comments: 1. Adding it to the scala-version makes sense, of course. Bear with me, I'm just a Java-guy. I'll add that. 2. I'll move the listener. I did name it "ConsumerListener" and "setListener" intentionally, because I thought it would be a good extension point for future stuff, that is not necessarily related to rebalancing. That's also why I made it an abstract class and not an interface. If you still think it should be renamed, I'll do it. 3. I'll remove the spaces and unused imports.
          Hide
          Jun Rao added a comment -

          Peter, thanks for the patch. Some comments:

          1. It seems that we should add setListener in the scala version of ConsumerConnector too. Instead of calling it setListener, should we call it setRebalanceListener?

          2. Since ConsumerListener needs to be used in both the java and scala version of ConsumerConnector, should we put it in the consumer package, instead of javaapi.consumer? Also, we probably should rename it to ConsumerRebalanceListener.

          3. ZKLoadBalanceTest:
          3.1 testLoadBalance(): There seems to be a mix of space and tab. Maybe that's in the original code already, but could you fix that?
          3.2 remove unused imports

          I am not sure if there is an easy way to fail rebalance in unit test. The unit test in the patch seem sufficient to me.

          Show
          Jun Rao added a comment - Peter, thanks for the patch. Some comments: 1. It seems that we should add setListener in the scala version of ConsumerConnector too. Instead of calling it setListener, should we call it setRebalanceListener? 2. Since ConsumerListener needs to be used in both the java and scala version of ConsumerConnector, should we put it in the consumer package, instead of javaapi.consumer? Also, we probably should rename it to ConsumerRebalanceListener. 3. ZKLoadBalanceTest: 3.1 testLoadBalance(): There seems to be a mix of space and tab. Maybe that's in the original code already, but could you fix that? 3.2 remove unused imports I am not sure if there is an easy way to fail rebalance in unit test. The unit test in the patch seem sufficient to me.
          Hide
          Peter Romianowski added a comment -

          I tried to add a test to verify that the listener gets called if an error occurs (ConsumerListener#afterRebalance(false)) but I failed to introduce some error into ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance.

          Any ideas how to provoke an error during rebalancing?

          Show
          Peter Romianowski added a comment - I tried to add a test to verify that the listener gets called if an error occurs (ConsumerListener#afterRebalance(false)) but I failed to introduce some error into ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance. Any ideas how to provoke an error during rebalancing?
          Hide
          Peter Romianowski added a comment -

          Added a patch against trunk.

          Show
          Peter Romianowski added a comment - Added a patch against trunk.

            People

            • Assignee:
              Jiangjie Qin
              Reporter:
              Peter Romianowski
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development