Description
Restarting an inactive controller fails to start if the active leader has more endpoint than the latest voter set. The easiest to reproduce is with the following configuration
cat kafka.properties | grep controller.quorum controller.quorum.voters=0@controller-0:1234,1@controller-1:1234,2@controller-2:1234
This is what is executed in the QuorumState loading code:
if (leaderEndpoints.isEmpty()) { ... } else { initialState = new FollowerState( time, election.epoch(), election.leaderId(), leaderEndpoints, voters.voterIds(), Optional.empty(), fetchTimeoutMs, logContext ); }
If the leader has two endpoints it will send the following BEGIN_QUORUM_EPOCH request:
"leaderEndpoints":[{"name":"CONTROLLER_PLAINTEXT","host":"controller-0","port":1234},{"name":"CONTROLLER","host":"controller-0","port":4321}]
And this code doesn't handle that correctly:
} else if ( leaderId.isPresent() && (!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size()) ) { // The request or response indicates the leader of the current epoch // which are currently unknown or the replica has discovered more endpoints transitionToFollower(epoch, leaderId.getAsInt(), leaderEndpoints, currentTimeMs); }
After adding a test for this, the test fails with the following:
Gradle Test Run :raft:test > Gradle Test Executor 44 > KafkaRaftClientTest > testHandleBeginQuorumRequestMoreEndpoints() FAILED java.lang.AssertionError: Assertion failed with an exception at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:453) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) at org.apache.kafka.raft.RaftClientTestContext.pollUntil(RaftClientTestContext.java:617) at org.apache.kafka.raft.RaftClientTestContext.pollUntilResponse(RaftClientTestContext.java:624) at org.apache.kafka.raft.KafkaRaftClientTest.testHandleBeginQuorumRequestMoreEndpoints(KafkaRaftClientTest.java:993) Caused by: java.lang.IllegalStateException: Cannot transition to Follower with leader 829 and epoch 3 from state FollowerState(fetchTimeoutMs=50000, epoch=3, leader=829, leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10819}), voters=[828, 829], highWatermark=Optional.empty, fetchingSnapshot=Optional.empty) at org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:480) at org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:732) at org.apache.kafka.raft.KafkaRaftClient.maybeTransition(KafkaRaftClient.java:2434) at org.apache.kafka.raft.KafkaRaftClient.handleBeginQuorumEpochRequest(KafkaRaftClient.java:1018) at org.apache.kafka.raft.KafkaRaftClient.handleRequest(KafkaRaftClient.java:2565) at org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:2613) at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:3314) at org.apache.kafka.raft.RaftClientTestContext.lambda$pollUntil$1(RaftClientTestContext.java:618) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) ... 6 more 1 test completed, 1 failed
Attachments
Issue Links
- links to