Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17241 KIP-853 follow-ups
  3. KAFKA-16927

Handle expanding leader endpoints

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 3.9.0
    • kraft
    • None

    Description

      Restarting an inactive controller fails to start if the active leader has more endpoint than the latest voter set. The easiest to reproduce is with the following configuration

      cat kafka.properties | grep controller.quorum
      controller.quorum.voters=0@controller-0:1234,1@controller-1:1234,2@controller-2:1234

      This is what is executed in the QuorumState loading code:

                    if (leaderEndpoints.isEmpty()) {
                        ...
                    } else {
                        initialState = new FollowerState(
                            time,
                            election.epoch(),
                            election.leaderId(),
                            leaderEndpoints,
                            voters.voterIds(),
                            Optional.empty(),
                            fetchTimeoutMs,
                            logContext
                        );
                    }

      If the leader has two endpoints it will send the following BEGIN_QUORUM_EPOCH request:

       "leaderEndpoints":[{"name":"CONTROLLER_PLAINTEXT","host":"controller-0","port":1234},{"name":"CONTROLLER","host":"controller-0","port":4321}]

      And this code doesn't handle that correctly:

                } else if (
                        leaderId.isPresent() &&
                        (!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size())
                ) {
                    // The request or response indicates the leader of the current epoch
                    // which are currently unknown or the replica has discovered more endpoints
                    transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs);
                }

      After adding a test for this, the test fails with the following:

      Gradle Test Run :raft:test > Gradle Test Executor 44 > KafkaRaftClientTest > testHandleBeginQuorumRequestMoreEndpoints() FAILED
          java.lang.AssertionError: Assertion failed with an exception
              at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:453)
              at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
              at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
              at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)
              at org.apache.kafka.raft.RaftClientTestContext.pollUntil(RaftClientTestContext.java:617)
              at org.apache.kafka.raft.RaftClientTestContext.pollUntilResponse(RaftClientTestContext.java:624)
              at org.apache.kafka.raft.KafkaRaftClientTest.testHandleBeginQuorumRequestMoreEndpoints(KafkaRaftClientTest.java:993)
      
              Caused by:
              java.lang.IllegalStateException: Cannot transition to Follower with leader 829 and epoch 3 from state FollowerState(fetchTimeoutMs=50000, epoch=3, leader=829, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10819}), voters=[828, 829], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty)
                  at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:480)
                  at org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:732)
                  at org.apache.kafka.raft.KafkaRaftClient.maybeTransition(KafkaRaftClient.java:2434)
                  at org.apache.kafka.raft.KafkaRaftClient.handleBeginQuorumEpochRequest(KafkaRaftClient.java:1018)
                  at org.apache.kafka.raft.KafkaRaftClient.handleRequest(KafkaRaftClient.java:2565)
                  at org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:2613)
                  at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:3314)
                  at org.apache.kafka.raft.RaftClientTestContext.lambda$pollUntil$1(RaftClientTestContext.java:618)
                  at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
                  at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
                  ... 6 more
      
      1 test completed, 1 failed 

      Attachments

        Issue Links

          Activity

            People

              jsancio José Armando García Sancio
              alyssahuang Alyssa Huang
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: