Description
BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) which in turn calls RemoteLogManager#onLeadershipChange (2), however, the RemoteLogManager is configured after the BrokerMetadataPublisher starts running (3, 4). This is incorrect, we either need to initialise the RemoteLogManager before we start the BrokerMetadataPublisher or we need to skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
(3) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432
(4) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515
The way to reproduce the problem is by looking at the following changes
config/kraft/broker.properties | 10 ++++++++++ .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++++++- core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++++- 3 files changed, 22 insertions(+), 2 deletions(-)diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index 2d15997f28..39d126cf87 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -127,3 +127,13 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 + +remote.log.storage.system.enable=true +remote.log.metadata.manager.listener.name=PLAINTEXT +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar +remote.log.storage.manager.impl.prefix=rsm.config. +remote.log.metadata.manager.impl.prefix=rlmm.config. +rsm.config.dir=/tmp/kafka-remote-storage +rlmm.config.remote.log.metadata.topic.replication.factor=1 +log.retention.check.interval.ms=1000 diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6555b7c0cd..e84a072abc 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable { // The endpoint for remote log metadata manager to connect to private Optional<EndPoint> endpoint = Optional.empty(); private boolean closed = false; + private boolean up = false; /** * Creates RemoteLogManager instance with the given arguments. @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable { // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); + up = true; } public RemoteStorageManager storageManager() { @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable { public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, Set<Partition> partitionsBecomeFollower, Map<String, Uuid> topicIds) { - LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); + if (!up) { + LOGGER.error("NullPointerException"); + return; + } + LOGGER.error("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = filterPartitions(partitionsBecomeLeader) .collect(Collectors.toMap( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 35499430d6..bd3f41c3d6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig, */ def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { // Before taking the lock, compute the local changes + stateChangeLogger.error("ROBIN") val localChanges = delta.localChanges(config.nodeId) val metadataVersion = newImage.features().metadataVersion() @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() - remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds())) + remoteLogManager.foreach(rlm => { + stateChangeLogger.error("JOKER") + rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds()) + }) } if (metadataVersion.isDirectoryAssignmentSupported) {
Attachments
Issue Links
- links to