Kafka
  1. Kafka
  2. KAFKA-345

Add a listener to ZookeeperConsumerConnector to get notified on rebalance events

    Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7, 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      A sample use-case

      In our scenario we partition events by userid and then apply these to some kind of state machine, that modifies the actual state of a user. So events trigger state transitions. In order to avoid the need of loading user's state upon each event processed, we cache that. But if a user's partition is moved to another consumer and then back to the previous consumer we have stale caches and hell breaks loose. I guess the same kind of problem occurs in other scenarios like counting numbers by user, too.

      1. KAFKA-345.patch
        11 kB
        Peter Romianowski

        Issue Links

          Activity

          Hide
          Peter Romianowski added a comment -

          Added a patch against trunk.

          Show
          Peter Romianowski added a comment - Added a patch against trunk.
          Hide
          Peter Romianowski added a comment -

          I tried to add a test to verify that the listener gets called if an error occurs (ConsumerListener#afterRebalance(false)) but I failed to introduce some error into ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance.

          Any ideas how to provoke an error during rebalancing?

          Show
          Peter Romianowski added a comment - I tried to add a test to verify that the listener gets called if an error occurs (ConsumerListener#afterRebalance(false)) but I failed to introduce some error into ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance. Any ideas how to provoke an error during rebalancing?
          Hide
          Jun Rao added a comment -

          Peter, thanks for the patch. Some comments:

          1. It seems that we should add setListener in the scala version of ConsumerConnector too. Instead of calling it setListener, should we call it setRebalanceListener?

          2. Since ConsumerListener needs to be used in both the java and scala version of ConsumerConnector, should we put it in the consumer package, instead of javaapi.consumer? Also, we probably should rename it to ConsumerRebalanceListener.

          3. ZKLoadBalanceTest:
          3.1 testLoadBalance(): There seems to be a mix of space and tab. Maybe that's in the original code already, but could you fix that?
          3.2 remove unused imports

          I am not sure if there is an easy way to fail rebalance in unit test. The unit test in the patch seem sufficient to me.

          Show
          Jun Rao added a comment - Peter, thanks for the patch. Some comments: 1. It seems that we should add setListener in the scala version of ConsumerConnector too. Instead of calling it setListener, should we call it setRebalanceListener? 2. Since ConsumerListener needs to be used in both the java and scala version of ConsumerConnector, should we put it in the consumer package, instead of javaapi.consumer? Also, we probably should rename it to ConsumerRebalanceListener. 3. ZKLoadBalanceTest: 3.1 testLoadBalance(): There seems to be a mix of space and tab. Maybe that's in the original code already, but could you fix that? 3.2 remove unused imports I am not sure if there is an easy way to fail rebalance in unit test. The unit test in the patch seem sufficient to me.
          Hide
          Peter Romianowski added a comment -

          Jun,

          regarding your comments:

          1. Adding it to the scala-version makes sense, of course. Bear with me, I'm just a Java-guy. I'll add that.

          2. I'll move the listener. I did name it "ConsumerListener" and "setListener" intentionally, because I thought it would be a good extension point for future stuff, that is not necessarily related to rebalancing. That's also why I made it an abstract class and not an interface. If you still think it should be renamed, I'll do it.

          3. I'll remove the spaces and unused imports.

          Show
          Peter Romianowski added a comment - Jun, regarding your comments: 1. Adding it to the scala-version makes sense, of course. Bear with me, I'm just a Java-guy. I'll add that. 2. I'll move the listener. I did name it "ConsumerListener" and "setListener" intentionally, because I thought it would be a good extension point for future stuff, that is not necessarily related to rebalancing. That's also why I made it an abstract class and not an interface. If you still think it should be renamed, I'll do it. 3. I'll remove the spaces and unused imports.
          Hide
          Jun Rao added a comment -

          Peter,

          Regarding #2, naming it as ConsumerListener is fine.

          Show
          Jun Rao added a comment - Peter, Regarding #2, naming it as ConsumerListener is fine.

            People

            • Assignee:
              Unassigned
              Reporter:
              Peter Romianowski
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development