Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17062

RemoteLogManager - RemoteStorageException causes data loss

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.8.0, 3.7.1, 3.9.0
    • 3.9.0
    • Tiered-Storage
    • Important

    Description

      When Tiered Storage is configured, retention.bytes defines the limit for the amount of data stored in the filesystem and in remote storage. However a failure while offloading to remote storage can cause segments to be dropped before the retention limit is met.

      What happens

      Assuming a topic configured with retention.bytes=4294967296 (4GB) and a local.retention.bytes=1073741824 (1GB, equal to segment.bytes) we would expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment locally (the local segment) and possibly more if the remote storage is offline. i.e. segments in the following RemoteLogSegmentStates in the RemoteLogMetadataManager (RLMM) :

      • Segment 3 (COPY_SEGMENT_FINISHED)
      • Segment 2 (COPY_SEGMENT_FINISHED)
      • Segment 1 (COPY_SEGMENT_FINISHED)

      Let's assume the RLMM starts failing when segment 4 rolls. At the first iteration of an RLMTask we will have -

      • copyLogSegmentsToRemote : is called first
        • RLMM becomes aware of Segment 4 and adds it to the metadata:
          • Segment 4 (COPY_SEGMENT_STARTED),
          • Segment 3 (COPY_SEGMENT_FINISHED),
          • Segment 2 (COPY_SEGMENT_FINISHED),
          • Segment 1 (COPY_SEGMENT_FINISHED)
        • An exception is raised during the copy operation (copyLogSegmentData in RemoteStorageManager) which is caught with the error message “Error occurred while copying log segments of partition” and no further copy will be attempted for the duration of this RLMTask.
        • At that point the Segment will never move to COPY_SEGMENT_FINISHED but will transition to DELETE_SEGMENT_STARTED eventually before being cleaned up when the associated segment is deleted.
      • cleanupExpiredRemoteLogSegments is then called
        • Retention size is computed in buildRetentionSizeData as the sum of all the segments size regardless of their state so computed size of the topic is 1 (local) + 4 (remote)
        • Segment 1 as being the oldest will be dropped.

      At the second iteration after remote.log.manager.task.interval.ms (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 in a COPY_SEGMENT_STARTED state each with a different RemoteLogSegmentId and Segment 2 will be dropped. The same will happen to Segment 3 after another iteration.

      At that point, we now have the RLMM composed of 4 copies of Segment 4 in COPY_SEGMENT_STARTED state. Segment 4 is marked for deletion increasing the LSO at the same time and causing the UnifiedLog to delete the local and remote data for Segment 4 including its metadata.

      Under those circumstances Kafka can quickly delete segments that were not meant for deletion causing a data loss.

      Steps to reproduce the problem:

      1. Enable tiered storage

      mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
      cat <<EOF >> config/kraft/server.properties
      remote.log.storage.system.enable=True
      remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
      remote.log.manager.task.interval.ms=5000
      remote.log.metadata.manager.listener.name=PLAINTEXT
      rlmm.config.remote.log.metadata.topic.replication.factor=1
      rsm.config.dir=/tmp/tieredStorage
      EOF
      

      2. Start a Kafka server with the following classpath. This is needed so we can use test class LocalTieredStorage as an implementation of RemoteStorageManager.

      export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
      export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
      bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
      bin/kafka-server-start.sh config/kraft/server.properties
      

      3. In a separate shell, create the topic and produce enough records to fill the remote log

      bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 \
         --config retention.bytes=1000000000 --config segment.bytes=100000000 \
         --config remote.storage.enable=true --config local.retention.bytes=1
      bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
         --throughput -1 --record-size 1000 \
         --producer-props acks=1 batch.size=100000  bootstrap.servers=localhost:9092
      

      4. In a separate shell, watch the remote log directory content

      watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
      

      5. Once all logs are sent to the remote storage (when the server logs stops, should take around 2min), stop the Kafka server
      6. Edit the file LocalTieredStorage#L309 in copyLogSegmentData() in order to throw a RemoteStorageException and disable the ability to store new remote segments.
      7. Rebuild Kafka

       ./gradlew testJar
      

      8. Restart the Kafka server

      bin/kafka-server-start.sh config/kraft/server.properties
      

      9. Send enough data for one segment rollup

      bin/kafka-producer-perf-test.sh \
        --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
        --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
      

      All data in the remote directory will start getting deleted when we would expect just no more writes to happen to the remote storage.

      Attachments

        Activity

          People

            showuon Luke Chen
            guillaumemallet Guillaume Mallet
            Votes:
            0 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: