Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15495

Partition truncated when the only ISR member restarts with an empty disk

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1, 2.8.2, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 3.6.1
    • None
    • None
    • None

    Description

      Assume a topic-partition has just a single leader replica in the ISR. Assume next that this replica goes offline. This replica's log will define the contents of that partition when the replica restarts, which is correct behavior. However, assume now that the replica has a disk failure, and we then replace the failed disk with a new, empty disk that we also format with the storage tool so it has the correct cluster ID. If we then restart the broker, the topic-partition will have no data in it, and any other replicas that might exist will truncate their logs to match, which results in data loss. See below for a step-by-step demo of how to reproduce this using KRaft (the issue impacts ZK-based implementations as well, but we supply only a KRaft-based reproduce case here):

      Note that implementing Eligible leader Replicas (https://issues.apache.org/jira/browse/KAFKA-15332) will resolve this issue.

      STEPS TO REPRODUCE:

      Create a single broker cluster with single controller. The standard files under config/kraft work well:

      bin/kafka-storage.sh random-uuid
      J8qXRwI-Qyi2G0guFTiuYw

      #ensure we start clean
      /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs

      bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/controller.properties
      bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker.properties

      bin/kafka-server-start.sh config/kraft/controller.properties
      bin/kafka-server-start.sh config/kraft/broker.properties

      bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 --partitions 1 --replication-factor 1

      #create __consumer-offsets topics
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 --from-beginning
      ^C

      #confirm that __consumer_offsets topic partitions are all created and on broker with node id 2
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

      Now create 2 more brokers, with node IDs 11 and 12

      cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 's/localhost:9092/localhost:9011/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > config/kraft/broker11.properties
      cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 's/localhost:9092/localhost:9012/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > config/kraft/broker12.properties

      #ensure we start clean
      /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12

      bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker11.properties
      bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker12.properties

      bin/kafka-server-start.sh config/kraft/broker11.properties
      bin/kafka-server-start.sh config/kraft/broker12.properties

      #create a topic with a single partition replicated on two brokers
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 --partitions 1 --replication-factor 2

      #reassign partitions onto brokers with Node IDs 11 and 12
      echo '{"partitions":[

      {"topic": "foo2","partition": 0,"replicas": [11,12]}

      ], "version":1}' > /tmp/reassign.json

      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassign.json --execute
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassign.json --verify

      #make preferred leader 11 the actual leader if it not
      bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --all-topic-partitions --election-type preferred

      #Confirm both brokers are in ISR and 11 is the leader
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 12,11

      #Emit some messages to the topic
      bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
      1
      2
      3
      4
      5
      ^C

      #confirm we see the messages
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 --from-beginning
      1
      2
      3
      4
      5
      ^C

      #Again confirm both brokers are in ISR, leader is 11
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 12,11

      #kill non-leader broker
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11

      #kill leader broker
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11

      #Note that bringing the non-leader broker 12 back up at this point has no effect: it is offline with no leader, only node 11 is in the ISR, and the partition cannot return until node 11 returns
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11

      #erase and reformat leader broker’s disk, and then restart the leader with that empty disk. (Note that follower broker remains untouched/unchanged if it wasn’t started, or it is started and is waiting for 11 to come back)
      /bin/rm -rf /tmp/kraft-broker-logs11
      bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker11.properties
      bin/kafka-server-start.sh config/kraft/broker11.properties

      #if node 12 was running it will emit log messages indicating truncation
      #INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating partition foo2-0 with TruncationState(offset=0, completed=true) due to leader epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3, endOffset=0) (kafka.server.ReplicaFetcherThread)

      #Leader broker is the leader again (the below output either will or won’t show node 12 as being in the ISR depending on whether it had been running or not, respectively)
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11

      #read from topic-partition: it is now empty
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 --from-beginning

      #produce a message to it, message will appear on console consumer
      bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
      1
      ^C

      #restart follower broker if it had not already been restarted, and will emit a log message indicating the log was truncated:
      bin/kafka-server-start.sh config/kraft/broker12.properties

      1. WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12] Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to (offset=0, segment=[0:0]) (kafka.log.UnifiedLog)

      #follower is back in the ISR
      bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
      Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
      Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11,12

      #can redo consumer to again show data is gone
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 --from-beginning

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rndgstn Ron Dagostino
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: