Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16790

Calls to RemoteLogManager are made before it is configured

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.8.0
    • 3.8.0, 3.7.1
    • kraft
    • None

    Description

      BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1) which in turn calls RemoteLogManager#onLeadershipChange (2), however, the RemoteLogManager is configured after the BrokerMetadataPublisher starts running (3, 4). This is incorrect, we either need to initialise the RemoteLogManager before we start the BrokerMetadataPublisher or we need to skip calls to onLeadershipChange if the RemoteLogManager is not initialised.

      (1) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151

      (2) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737

      (3) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432

      (4) https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515

      The way to reproduce the problem is by looking at the following changes

       config/kraft/broker.properties                         | 10 ++++++++++
       .../main/java/kafka/log/remote/RemoteLogManager.java   |  8 +++++++-
       core/src/main/scala/kafka/server/ReplicaManager.scala  |  6 +++++-
       3 files changed, 22 insertions(+), 2 deletions(-)diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties
      index 2d15997f28..39d126cf87 100644
      --- a/config/kraft/broker.properties
      +++ b/config/kraft/broker.properties
      @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
       # The interval at which log segments are checked to see if they can be deleted according
       # to the retention policies
       log.retention.check.interval.ms=300000
      +
      +remote.log.storage.system.enable=true
      +remote.log.metadata.manager.listener.name=PLAINTEXT
      +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
      +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
      +remote.log.storage.manager.impl.prefix=rsm.config.
      +remote.log.metadata.manager.impl.prefix=rlmm.config.
      +rsm.config.dir=/tmp/kafka-remote-storage
      +rlmm.config.remote.log.metadata.topic.replication.factor=1
      +log.retention.check.interval.ms=1000
      diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
      index 6555b7c0cd..e84a072abc 100644
      --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
      +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
      @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
           // The endpoint for remote log metadata manager to connect to
           private Optional<EndPoint> endpoint = Optional.empty();
           private boolean closed = false;
      +    private boolean up = false;
       
           /**
            * Creates RemoteLogManager instance with the given arguments.
      @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
               // in connecting to the brokers or remote storages.
               configureRSM();
               configureRLMM();
      +        up = true;
           }
       
           public RemoteStorageManager storageManager() {
      @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
           public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
                                          Set<Partition> partitionsBecomeFollower,
                                          Map<String, Uuid> topicIds) {
      -        LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
      +        if (!up) {
      +            LOGGER.error("NullPointerException");
      +            return;
      +        }
      +        LOGGER.error("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
       
               Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = filterPartitions(partitionsBecomeLeader)
                       .collect(Collectors.toMap(
      diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
      index 35499430d6..bd3f41c3d6 100644
      --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
      +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
      @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig,
          */
         def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
           // Before taking the lock, compute the local changes
      +    stateChangeLogger.error("ROBIN")
           val localChanges = delta.localChanges(config.nodeId)
           val metadataVersion = newImage.features().metadataVersion()
       
      @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig,
               replicaFetcherManager.shutdownIdleFetcherThreads()
               replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
       
      -        remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds()))
      +        remoteLogManager.foreach(rlm => {
      +          stateChangeLogger.error("JOKER")
      +          rlm.onLeadershipChange(leaderChangedPartitions.asJava, followerChangedPartitions.asJava, localChanges.topicIds())
      +        })
             }
       
             if (metadataVersion.isDirectoryAssignmentSupported) { 

      Attachments

        Issue Links

          Activity

            People

              muralibasani Muralidhar Basani
              christo_lolov Christo Lolov
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: