Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6995

Make config "internal.leave.group.on.close" public

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • None
    • None
    • consumer, streams

    Description

      We are proposing to make the config "internal.leave.group.on.close" public. The reason is that for heavy state application the sticky assignment won't work because each stream worker will leave group during rolling restart, and there is a possibility that some members are left and rejoined while others are still awaiting restart. This would then cause multiple rebalance because after the ongoing rebalance is done, we are expecting late members to rejoin and move state from `stable` to `prepareBalance`. To solve this problem, heavy state application needs to use this config to avoid member list update, so that at most one rebalance will be triggered at a proper time when all the members are rejoined during rolling restart. This should just be one line change.

      Code here:

      • <code>internal.leave.group.on.close</code>
      • Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
      • won't occur until <code>session.timeout.ms</code> expires.
        *
      • <p>
      • Note: this is an internal configuration and could be changed in the future in a backward incompatible way
        *
        */
        static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";

      Attachments

        Issue Links

          Activity

            bchen225242 Boyang Chen added a comment -

            Original config was added here: https://issues.apache.org/jira/browse/KAFKA-4881

            bchen225242 Boyang Chen added a comment - Original config was added here:  https://issues.apache.org/jira/browse/KAFKA-4881
            mjsax Matthias J. Sax added a comment - - edited

            Not sure, if I understand the issue. What do you mean by "are left and rejoined while others are still awaiting restart" ?

            To solve this problem, heavy state application needs to use this config 

            Use to do what? Note, that the config is set to false by Kafka Streams. I don't understand why exposing the config would help (you can only set it to true what seems to contradict what the ticket describes). Can you elaborate?

            so that at most one rebalance will be triggered at a proper time when all the members are rejoined during rolling restart.

            Not sure what you imply here? When you do a rolling restart, how do you do it? Do you restart multiple instances at once? Or do you bounce instances one-by-one?

            mjsax Matthias J. Sax added a comment - - edited Not sure, if I understand the issue. What do you mean by "are left and rejoined while others are still awaiting restart" ? To solve this problem, heavy state application needs to use this config  Use to do what? Note, that the config is set to false by Kafka Streams. I don't understand why exposing the config would help (you can only set it to true what seems to contradict what the ticket describes). Can you elaborate? so that at most one rebalance will be triggered at a proper time when all the members are rejoined during rolling restart. Not sure what you imply here? When you do a rolling restart, how do you do it? Do you restart multiple instances at once? Or do you bounce instances one-by-one?
            bchen225242 Boyang Chen added a comment - - edited

            Basically leave group request will make the rebalance faster because we could detect consumer offline faster. In our case of rolling restart, it is favorable not to do so because leaving group explicitly will break the existing member list, which means there are always hosts restarted early and leave the group, while hosts restarted late will not be considered for rebalance. The incomplete rebalance will trigger multiple times until the consumer list is stabilized.

            Where did Kafka Streams set this config to false? I didn't find that code, could you elaborate on that? mjsax

            bchen225242 Boyang Chen added a comment - - edited Basically leave group request will make the rebalance faster because we could detect consumer offline faster. In our case of rolling restart, it is favorable not to do so because leaving group explicitly will break the existing member list, which means there are always hosts restarted early and leave the group, while hosts restarted late will not be considered for rebalance. The incomplete rebalance will trigger multiple times until the consumer list is stabilized. Where did Kafka Streams set this config to false? I didn't find that code, could you elaborate on that? mjsax

            In current code base it's set hard-code as Consumer config: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L710

            The idea is to reduce number or rebalances. If the consumer sends a leave group request (ie, before the change) when Streams is closed, an immediate rebalance is triggered. If the instance is restarted, a second rebalance is triggered. Thus, a single rolling bounce requires two rebalances (ie, a store is migrated again and migrated back).

            With the change, no leave group request is sent, and thus, when the instance is stopped nothing happens initially. If the restart happens quickly enough (ie, before the consumer times out), there will only be a single rebalance for one rolling bounce. Furthermore, the newly started instance will detect the local store and thus, the store will not be migrated at all in the single rebalance.

            In summary, if you bounce a `N` instance one-by-one, you will get `N` rebalances without any task/store migration at all.

            Does this make sense?

            Because the config reduces the number of rebalances, I am still not sure if I can follow you explanation. Why do you want to disable it, and why this does help for your case to reduce the number of rebalances if  you disable it? You should get more rebalances, ie, 2xN instead of N.

            mjsax Matthias J. Sax added a comment - In current code base it's set hard-code as Consumer config: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L710 The idea is to reduce number or rebalances. If the consumer sends a leave group request (ie, before the change) when Streams is closed, an immediate rebalance is triggered. If the instance is restarted, a second rebalance is triggered. Thus, a single rolling bounce requires two rebalances (ie, a store is migrated again and migrated back). With the change, no leave group request is sent, and thus, when the instance is stopped nothing happens initially. If the restart happens quickly enough (ie, before the consumer times out), there will only be a single rebalance for one rolling bounce. Furthermore, the newly started instance will detect the local store and thus, the store will not be migrated at all in the single rebalance. In summary, if you bounce a `N` instance one-by-one, you will get `N` rebalances without any task/store migration at all. Does this make sense? Because the config reduces the number of rebalances, I am still not sure if I can follow you explanation. Why do you want to disable it, and why this does help for your case to reduce the number of rebalances if  you disable it? You should get more rebalances, ie, 2xN instead of N.
            bchen225242 Boyang Chen added a comment -

            mjsaxthanks for the explanation! I realized that the config was set to false once you pointed out, where previously I misunderstood the config. Just one thing I want to confirm is that, whether should we make this config public so that developer has an option to set it true? The thing is that if we are dealing with stateless application, it would be ideal to trigger rebalance when some machine gets killed or go offline in devOps, because the state transfer is 0 and standby task could take over pretty quick.

            I will create another ticket to track one new issue I discovered yesterday. Thanks!

            bchen225242 Boyang Chen added a comment - mjsax thanks for the explanation! I realized that the config was set to false once you pointed out, where previously I misunderstood the config. Just one thing I want to confirm is that, whether should we make this config public so that developer has an option to set it true? The thing is that if we are dealing with stateless application, it would be ideal to trigger rebalance when some machine gets killed or go offline in devOps, because the state transfer is 0 and standby task could take over pretty quick. I will create another ticket to track one new issue I discovered yesterday. Thanks!

            You are right, that for stateless application, fail-over would be delayed until session.timeout.ms passes and the group coordinator detect the missing heartbeat. I am still hesitant to make it public, because it's quite advanced. If you really want to modify it, you can still do is as of today by providing a custom `KafkaClientSupplier` to `KafkaStreams` constructor. There, you can modify the consumer config before the consumer is actually created and thus, you can overwrite the hard coded value from "false" to "true" if you need.

            Not sure what guozhang thinks about making the config public. I am not completely against it, just not 100% convinced about it yet.

            mjsax Matthias J. Sax added a comment - You are right, that for stateless application, fail-over would be delayed until session.timeout.ms passes and the group coordinator detect the missing heartbeat. I am still hesitant to make it public, because it's quite advanced. If you really want to modify it, you can still do is as of today by providing a custom `KafkaClientSupplier` to `KafkaStreams` constructor. There, you can modify the consumer config before the consumer is actually created and thus, you can overwrite the hard coded value from "false" to "true" if you need. Not sure what guozhang thinks about making the config public. I am not completely against it, just not 100% convinced about it yet.
            bchen225242 Boyang Chen added a comment -

            Thanks for the explanation, I understand the concern here. I agree this is an advanced config for new users, but may worth exposing because this makes end user clear about what kind of rebalance behavior will trigger after a service concussion. I could work on the documentation to make this clear to the end user what this means, and by setting this to `true` or `false` will have what corresponding outcomes. 

            If we have clear documentation on this, it would be ideal to leave this config to the end user.

            bchen225242 Boyang Chen added a comment - Thanks for the explanation, I understand the concern here. I agree this is an advanced config for new users, but may worth exposing because this makes end user clear about what kind of rebalance behavior will trigger after a service concussion. I could work on the documentation to make this clear to the end user what this means, and by setting this to `true` or `false` will have what corresponding outcomes.  If we have clear documentation on this, it would be ideal to leave this config to the end user.
            guozhang Guozhang Wang added a comment -

            We intentionally make it internal than public since we leave-group request is considered not public API and we do not expose it via a config (mjsax's point is also correct, that it is quite implementation oriented that we would want to abstract from the users).

            We already had some initial discussions about further improve our rebalance protocol in a general way, than making this config public to resolve the incremental rebalance scenario.

            guozhang Guozhang Wang added a comment - We intentionally make it internal than public since we leave-group request is considered not public API and we do not expose it via a config ( mjsax 's point is also correct, that it is quite implementation oriented that we would want to abstract from the users). We already had some initial discussions about further improve our rebalance protocol in a general way, than making this config public to resolve the incremental rebalance scenario.

            One idea just coming to my mind might be, to check if a topology is stateful or stateless and automatically adjust the config accordingly. For "power users" they can still use the `KafkaClientSupplier" workaround if they really need to.

            mjsax Matthias J. Sax added a comment - One idea just coming to my mind might be, to check if a topology is stateful or stateless and automatically adjust the config accordingly. For "power users" they can still use the `KafkaClientSupplier" workaround if they really need to.
            bchen225242 Boyang Chen added a comment -

            mjsax That would be a viable solution. However, me as an end user would be beneficial to know that leaving group on close would introduce what kind of outcome. Even though we could adjust the config based on stateful or stateless nature of the job, if the job is stateful but maintains a really lightweight state which me as an user feels not necessary to worry about state transfer comparing with starving the process. In this case, I would like to see a really quick rebalance (faster than minimum session timeout) instead of waiting until session timeout.

            bchen225242 Boyang Chen added a comment - mjsax That would be a viable solution. However, me as an end user would be beneficial to know that leaving group on close would introduce what kind of outcome. Even though we could adjust the config based on stateful or stateless nature of the job, if the job is stateful but maintains a really lightweight state which me as an user feels not necessary to worry about state transfer comparing with starving the process. In this case, I would like to see a really quick rebalance (faster than minimum session timeout) instead of waiting until session timeout.
            bchen225242 Boyang Chen added a comment -

            We are planning to deprecate this config with the introduction of KIP-345. 

            bchen225242 Boyang Chen added a comment - We are planning to deprecate this config with the introduction of KIP-345. 
            jorgenringen Jørgen added a comment -

            Are there any other ways to proactively leave the group when a KS stops gracefully, except "internal.leave.group.on.close"? We have a lot of stateless applications where we want to trigger rebalance immediately if an instance is stopped to reduce downtime?

            Calling kafka-consumer#close explicitly sends a leave-group to trigger rebalance. From kafka definitive guide:

            When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. 

            This would be very convenient for many of our kafka-streams usecases as well. The new default in KIP-735 also makes this a bigger problem (45s session timeout).

            jorgenringen Jørgen added a comment - Are there any other ways to proactively leave the group when a KS stops gracefully, except "internal.leave.group.on.close"? We have a lot of stateless applications where we want to trigger rebalance immediately if an instance is stopped to reduce downtime? Calling kafka-consumer#close explicitly sends a leave-group to trigger rebalance. From kafka definitive guide: When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing.  This would be very convenient for many of our kafka-streams usecases as well. The new default in KIP-735 also makes this a bigger problem (45s session timeout).

            No. By default it's disabled to send leave-group-request. The workaround are describe in this ticket.

            If you believe the current behavior should be improved, feel free to file a new ticket (and maybe link back to this one).

            mjsax Matthias J. Sax added a comment - No. By default it's disabled to send leave-group-request. The workaround are describe in this ticket. If you believe the current behavior should be improved, feel free to file a new ticket (and maybe link back to this one).

            People

              bchen225242 Boyang Chen
              bchen225242 Boyang Chen
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: