Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • controller

    Description

      Before starting work on delete topic support, uploading a patch first to address some minor hiccups that touch a bunch of files:

      1. Change PartitionOfflineException to PartitionUnavailableException because in the partition state machine we mark a partition offline when its leader is down, whereas the PartitionOfflineException is thrown when all the assigned replicas of the partition are down.
      2. Change PartitionOfflineRate to UnavailablePartitionRate
      3. Remove default leader selector from partition state machine's handleStateChange. We can specify null as default when we don't need to use a leader selector.
      4. Include controller info in the client id of LeaderAndIsrRequest.
      5. Rename controllerContext.allleaders to something more meaningful - partitionLeadershipInfo.
      6. We don't need to put partition in OnlinePartition state in partition state machine initializeLeaderAndIsrForPartition, the state change occurs in handleStateChange.
      7. Add todo in handleStateChanges
      8. Left a comment above ReassignedPartitionLeaderSelector that reassigned replicas are already in the ISR (this is not true for other leader selectors), renamed the vals in the selector.

      Attachments

        1. kafka-813-v1.patch
          40 kB
          Swapnil Ghike
        2. kafka-813-v2.patch
          43 kB
          Swapnil Ghike
        3. kafka-813-v3.patch
          41 kB
          Swapnil Ghike
        4. kafka-813-v4.patch
          39 kB
          Swapnil Ghike

        Activity

          swapnilghike Swapnil Ghike added a comment -

          Unit tests pass.

          swapnilghike Swapnil Ghike added a comment - Unit tests pass.
          junrao Jun Rao added a comment -

          Thanks for the patch. Some comments:

          1. For 1 and 2, it seems that in this patch, we are treating PartitionOfflineException and PartitionUnavailableException the same. Is it better to just pick PartitionOfflineException (and remove PartitionUnavailableException) since it introduces fewer changes?

          2. PartiitonStateChange.handleStateChanges(): Is it a good idea to allow leaderSelector to be null? We will have to check and make sure that leaderSelector is null every time we use it.

          junrao Jun Rao added a comment - Thanks for the patch. Some comments: 1. For 1 and 2, it seems that in this patch, we are treating PartitionOfflineException and PartitionUnavailableException the same. Is it better to just pick PartitionOfflineException (and remove PartitionUnavailableException) since it introduces fewer changes? 2. PartiitonStateChange.handleStateChanges(): Is it a good idea to allow leaderSelector to be null? We will have to check and make sure that leaderSelector is null every time we use it.
          nehanarkhede Neha Narkhede added a comment -

          Thanks for the cleanup patch, Swapnil. Here are some minor suggestions after which we can check it in -

          1. ControllerBrokerRequestBatch
          1.1 Can we explicitly pass in controller id and controller client id ? It is better to avoid object passing if we can.
          1.2 How about clientId() instead of toString(). This is because it makes it very explicit the purpose of the usage of that API. It is unlikely anything else would want to call toString() on KafkaController

          2. PartitionStateMachine
          Let's not set the leaderSelector to null as default. Each invocation of handleStateChanges should pass in the correct leader selector for better code readability.

          nehanarkhede Neha Narkhede added a comment - Thanks for the cleanup patch, Swapnil. Here are some minor suggestions after which we can check it in - 1. ControllerBrokerRequestBatch 1.1 Can we explicitly pass in controller id and controller client id ? It is better to avoid object passing if we can. 1.2 How about clientId() instead of toString(). This is because it makes it very explicit the purpose of the usage of that API. It is unlikely anything else would want to call toString() on KafkaController 2. PartitionStateMachine Let's not set the leaderSelector to null as default. Each invocation of handleStateChanges should pass in the correct leader selector for better code readability.
          swapnilghike Swapnil Ghike added a comment - - edited

          @Neha: 2. If the target state is not OnlinePartition, then we don't need to pass in any leader selector.

          swapnilghike Swapnil Ghike added a comment - - edited @Neha: 2. If the target state is not OnlinePartition, then we don't need to pass in any leader selector.
          nehanarkhede Neha Narkhede added a comment -

          Correct, another way to do this is to defined a NoOpLeaderSelector and pass that in. The NoOpLeaderSelector can just return the leadership info from partitionLeadershipInfo. At least, that way, the leader selector logic is explicit which was the purpose of the change.

          nehanarkhede Neha Narkhede added a comment - Correct, another way to do this is to defined a NoOpLeaderSelector and pass that in. The NoOpLeaderSelector can just return the leadership info from partitionLeadershipInfo. At least, that way, the leader selector logic is explicit which was the purpose of the change.
          swapnilghike Swapnil Ghike added a comment -

          Addressing comments:
          junrao:
          1. Renamed the exception to PartitionNoReplicaOnlineException, since
          i. the old name caused confusion with OfflinePartition state of the partition state machine
          ii. this exception is thrown when all the replicas assigned to a topic-partition are down, or there are no replicas assigned to a topic-partition

          2. Created a new NoOpLeaderSelector and set it as the default.

          nehanarkhede:
          1.1. Passing the controllerId and clientId separately to ControllerBrokerRequestBatch
          1.2. Renamed KafkaController.toString to clientId

          2. Created a new NoOpLeaderSelector, thanks for the great suggestion, and set it as the default.
          i. It returns the existing leader and Isr and replica assignment, so the logs will first contain a warning issued by selectLeader() and then the Zk write will log an error.
          ii. I think it is ok to set the no-op leader selector as the default, because we don't need to pass any leader selector when the target state is not OnlinePartition, and there could be little confusion in there about why no leader selector was passed. Earlier we had cases where the target state was OnlinePartition, but it was not clear which leader selector was used, since we provided OfflinePartitionLeaderSelector as the default to handleStateChanges().

          Additional changes:
          1. Removed OfflinePartitionRate as discussed offline. Instead, created a new gauge OfflinePartitionsCount, which would measure the count of partitions whose individual "leaders" are down. The reason to make this change is that OfflinePartitionsRate would provide information about how many partitions go Offline in a given time window, but does not give information about how many partitions don't come back to Online State and stay Offline since the last time window.

          Marking the jira as blocker because of the metric change.

          swapnilghike Swapnil Ghike added a comment - Addressing comments: junrao : 1. Renamed the exception to PartitionNoReplicaOnlineException, since i. the old name caused confusion with OfflinePartition state of the partition state machine ii. this exception is thrown when all the replicas assigned to a topic-partition are down, or there are no replicas assigned to a topic-partition 2. Created a new NoOpLeaderSelector and set it as the default. nehanarkhede : 1.1. Passing the controllerId and clientId separately to ControllerBrokerRequestBatch 1.2. Renamed KafkaController.toString to clientId 2. Created a new NoOpLeaderSelector, thanks for the great suggestion, and set it as the default. i. It returns the existing leader and Isr and replica assignment, so the logs will first contain a warning issued by selectLeader() and then the Zk write will log an error. ii. I think it is ok to set the no-op leader selector as the default, because we don't need to pass any leader selector when the target state is not OnlinePartition, and there could be little confusion in there about why no leader selector was passed. Earlier we had cases where the target state was OnlinePartition, but it was not clear which leader selector was used, since we provided OfflinePartitionLeaderSelector as the default to handleStateChanges(). Additional changes: 1. Removed OfflinePartitionRate as discussed offline. Instead, created a new gauge OfflinePartitionsCount, which would measure the count of partitions whose individual "leaders" are down. The reason to make this change is that OfflinePartitionsRate would provide information about how many partitions go Offline in a given time window, but does not give information about how many partitions don't come back to Online State and stay Offline since the last time window. Marking the jira as blocker because of the metric change.
          junrao Jun Rao added a comment -

          Thanks for patch v2. Some more comments.

          20. PartitionStateMachine.initializeLeaderAndIsrForPartition(): why is the following line removed?
          partitionState.put(topicAndPartition, OnlinePartition)

          21. PartitionNoReplicaOnlineException seems long. Is it better to use NoReplicaOnlineException?

          22. KafkaController: To compute the gauge OfflinePartitionsCount, we need to synchronize on controller lock.

          junrao Jun Rao added a comment - Thanks for patch v2. Some more comments. 20. PartitionStateMachine.initializeLeaderAndIsrForPartition(): why is the following line removed? partitionState.put(topicAndPartition, OnlinePartition) 21. PartitionNoReplicaOnlineException seems long. Is it better to use NoReplicaOnlineException? 22. KafkaController: To compute the gauge OfflinePartitionsCount, we need to synchronize on controller lock.
          nehanarkhede Neha Narkhede added a comment - - edited

          Thanks for patch v2 -

          1. KafkaController
          1.1 The change to OfflinePartitionsCount looks good. However, there is a distinction between a partition whose leader is not alive but other replicas are so leader election will happen and a partition for which all replicas are dead. In the latter case, there can be no leader for that partition which is a much more dangerous state for a partition to be in. I suggest two metrics, OfflinePartitionsCount to indicate the former and UnavailablePartitionsCount to indicate the latter
          1.2 I wonder why ActiveControllerCount and OfflinePartitionsCount are not part of ControllerStats ?

          2. NoOpLeaderSelector
          Minor code style suggestion - return is not required here.

          3. PartitionStateMachine
          We don't have to define the noOpLeaderSelector in the controller since it is used only in PartitionStateMachine.handleStateChanges(). Let's move it there. The reason offlineLeaderSelector is there since both the controller and the partition state machine access it.

          Nit pick - Can we change PartitionNoReplicaOnlineException to NoReplicaOnlineForPartitionException or simply NoReplicaOnlineException ?

          nehanarkhede Neha Narkhede added a comment - - edited Thanks for patch v2 - 1. KafkaController 1.1 The change to OfflinePartitionsCount looks good. However, there is a distinction between a partition whose leader is not alive but other replicas are so leader election will happen and a partition for which all replicas are dead. In the latter case, there can be no leader for that partition which is a much more dangerous state for a partition to be in. I suggest two metrics, OfflinePartitionsCount to indicate the former and UnavailablePartitionsCount to indicate the latter 1.2 I wonder why ActiveControllerCount and OfflinePartitionsCount are not part of ControllerStats ? 2. NoOpLeaderSelector Minor code style suggestion - return is not required here. 3. PartitionStateMachine We don't have to define the noOpLeaderSelector in the controller since it is used only in PartitionStateMachine.handleStateChanges(). Let's move it there. The reason offlineLeaderSelector is there since both the controller and the partition state machine access it. Nit pick - Can we change PartitionNoReplicaOnlineException to NoReplicaOnlineForPartitionException or simply NoReplicaOnlineException ?
          swapnilghike Swapnil Ghike added a comment -

          A couple of comments/questions before uploading the next patch:

          junrao:
          20. Because initializeLeaderAndIsrForPartition() is called at only one place, in handleStateChange(). And handleStateChange() puts the partition in Online state.
          21. Ok, renaming to NoReplicaOnlineException.
          22. Ok, also please read below.

          nehanarkhede:
          1.1 I agree that having all the replicas assigned to a partition down is a dangerous situation. I guess that will be reflected if the OfflinePartitionsCount is non-zero for a while, and then we can dig into Zk. An OfflinePartitionsCount that remains non-zero for a while could also indicate another dangerous situation where the replicas are up but leader is not being assigned. So, i think that clues provided by an OfflinePartitionsCount that remains non-zero for a few minutes subsume the clues given by UnavailablePartitionsCount, and we may not specifically need the latter. Thoughts?

          2. Argh, yes, thanks.

          3. I agree that Controller does not need to know about a useless leader selector. Moving NoOpLeaderSelector object to partition state machine.

          junrao,nehanarkhede: :1.2 I believe the reason was to initialize the gauges when the controller object is created. However, we can move the gauges to ControllerStats and force their initialization like server.registerStats(). So perhaps it would be good if we could decide which jmx beans need to be force-initialized and at which places in the code. Accordingly I can make the relevant changes.

          swapnilghike Swapnil Ghike added a comment - A couple of comments/questions before uploading the next patch: junrao : 20. Because initializeLeaderAndIsrForPartition() is called at only one place, in handleStateChange(). And handleStateChange() puts the partition in Online state. 21. Ok, renaming to NoReplicaOnlineException. 22. Ok, also please read below. nehanarkhede : 1.1 I agree that having all the replicas assigned to a partition down is a dangerous situation. I guess that will be reflected if the OfflinePartitionsCount is non-zero for a while, and then we can dig into Zk. An OfflinePartitionsCount that remains non-zero for a while could also indicate another dangerous situation where the replicas are up but leader is not being assigned. So, i think that clues provided by an OfflinePartitionsCount that remains non-zero for a few minutes subsume the clues given by UnavailablePartitionsCount, and we may not specifically need the latter. Thoughts? 2. Argh, yes, thanks. 3. I agree that Controller does not need to know about a useless leader selector. Moving NoOpLeaderSelector object to partition state machine. junrao , nehanarkhede : :1.2 I believe the reason was to initialize the gauges when the controller object is created. However, we can move the gauges to ControllerStats and force their initialization like server.registerStats(). So perhaps it would be good if we could decide which jmx beans need to be force-initialized and at which places in the code. Accordingly I can make the relevant changes.
          nehanarkhede Neha Narkhede added a comment -

          1.1 Makes sense, we can just live with the existing counter for now.
          Also, let's register the controller stats at startup. That way, they at least show a value of 0 instead of nan.

          nehanarkhede Neha Narkhede added a comment - 1.1 Makes sense, we can just live with the existing counter for now. Also, let's register the controller stats at startup. That way, they at least show a value of 0 instead of nan.
          swapnilghike Swapnil Ghike added a comment -

          Patch v3 takes care of the comments made above.

          swapnilghike Swapnil Ghike added a comment - Patch v3 takes care of the comments made above.
          junrao Jun Rao added a comment -

          Thanks for patch v3. Looks good. Just one more comment:

          30. KafkaController: Is it really useful to include port in clientId?

          junrao Jun Rao added a comment - Thanks for patch v3. Looks good. Just one more comment: 30. KafkaController: Is it really useful to include port in clientId?
          nehanarkhede Neha Narkhede added a comment -

          +1 on patch v3

          nehanarkhede Neha Narkhede added a comment - +1 on patch v3
          swapnilghike Swapnil Ghike added a comment -

          30. port has been included to maintain consistency with other such clientIds. I agree that it's not specifically useful since the hostname gives enough information for log debugging etc.

          swapnilghike Swapnil Ghike added a comment - 30. port has been included to maintain consistency with other such clientIds. I agree that it's not specifically useful since the hostname gives enough information for log debugging etc.
          junrao Jun Rao added a comment -

          Ok. That's fine then. One more comment:

          31. PartitionStateMachine.electLeaderForPartition(): In the following code, is there any value in wrapping a NoReplicaOnlineException over another NoReplicaOnlineException?
          case noReplicaOnlineEx: NoReplicaOnlineException =>
          val failMsg = "All replicas %s for partition %s are dead. Marking this partition offline."
          .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition)
          throw new NoReplicaOnlineException(failMsg, noReplicaOnlineEx)

          junrao Jun Rao added a comment - Ok. That's fine then. One more comment: 31. PartitionStateMachine.electLeaderForPartition(): In the following code, is there any value in wrapping a NoReplicaOnlineException over another NoReplicaOnlineException? case noReplicaOnlineEx: NoReplicaOnlineException => val failMsg = "All replicas %s for partition %s are dead. Marking this partition offline." .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) throw new NoReplicaOnlineException(failMsg, noReplicaOnlineEx)
          swapnilghike Swapnil Ghike added a comment -

          Thanks for pointing that out. Attached patch v4. When the leader goes down, the state change log will show the OfflinePartition state change. So yes, there is no need to wrap it in another NoReplicaOnlineException.

          swapnilghike Swapnil Ghike added a comment - Thanks for pointing that out. Attached patch v4. When the leader goes down, the state change log will show the OfflinePartition state change. So yes, there is no need to wrap it in another NoReplicaOnlineException.
          junrao Jun Rao added a comment -

          Thanks for patch v4. +1 and committed to 0.8.

          junrao Jun Rao added a comment - Thanks for patch v4. +1 and committed to 0.8.

          People

            swapnilghike Swapnil Ghike
            swapnilghike Swapnil Ghike
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: