Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
3.8.0, 3.7.1, 3.9.0
-
Important
Description
When Tiered Storage is configured, retention.bytes defines the limit for the amount of data stored in the filesystem and in remote storage. However a failure while offloading to remote storage can cause segments to be dropped before the retention limit is met.
What happens
Assuming a topic configured with retention.bytes=4294967296 (4GB) and a local.retention.bytes=1073741824 (1GB, equal to segment.bytes) we would expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment locally (the local segment) and possibly more if the remote storage is offline. i.e. segments in the following RemoteLogSegmentStates in the RemoteLogMetadataManager (RLMM) :
- Segment 3 (COPY_SEGMENT_FINISHED)
- Segment 2 (COPY_SEGMENT_FINISHED)
- Segment 1 (COPY_SEGMENT_FINISHED)
Let's assume the RLMM starts failing when segment 4 rolls. At the first iteration of an RLMTask we will have -
- copyLogSegmentsToRemote : is called first
- RLMM becomes aware of Segment 4 and adds it to the metadata:
- Segment 4 (COPY_SEGMENT_STARTED),
- Segment 3 (COPY_SEGMENT_FINISHED),
- Segment 2 (COPY_SEGMENT_FINISHED),
- Segment 1 (COPY_SEGMENT_FINISHED)
- An exception is raised during the copy operation (copyLogSegmentData in RemoteStorageManager) which is caught with the error message “Error occurred while copying log segments of partition” and no further copy will be attempted for the duration of this RLMTask.
- At that point the Segment will never move to COPY_SEGMENT_FINISHED but will transition to DELETE_SEGMENT_STARTED eventually before being cleaned up when the associated segment is deleted.
- RLMM becomes aware of Segment 4 and adds it to the metadata:
- cleanupExpiredRemoteLogSegments is then called
- Retention size is computed in buildRetentionSizeData as the sum of all the segments size regardless of their state so computed size of the topic is 1 (local) + 4 (remote)
- Segment 1 as being the oldest will be dropped.
At the second iteration after remote.log.manager.task.interval.ms (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 in a COPY_SEGMENT_STARTED state each with a different RemoteLogSegmentId and Segment 2 will be dropped. The same will happen to Segment 3 after another iteration.
At that point, we now have the RLMM composed of 4 copies of Segment 4 in COPY_SEGMENT_STARTED state. Segment 4 is marked for deletion increasing the LSO at the same time and causing the UnifiedLog to delete the local and remote data for Segment 4 including its metadata.
Under those circumstances Kafka can quickly delete segments that were not meant for deletion causing a data loss.
Steps to reproduce the problem:
1. Enable tiered storage
mkdir -p /tmp/tieredStorage/kafka-tiered-storage/ cat <<EOF >> config/kraft/server.properties remote.log.storage.system.enable=True remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage remote.log.manager.task.interval.ms=5000 remote.log.metadata.manager.listener.name=PLAINTEXT rlmm.config.remote.log.metadata.topic.replication.factor=1 rsm.config.dir=/tmp/tieredStorage EOF
2. Start a Kafka server with the following classpath. This is needed so we can use test class LocalTieredStorage as an implementation of RemoteStorageManager.
export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}" export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties bin/kafka-server-start.sh config/kraft/server.properties
3. In a separate shell, create the topic and produce enough records to fill the remote log
bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 \
--config retention.bytes=1000000000 --config segment.bytes=100000000 \
--config remote.storage.enable=true --config local.retention.bytes=1
bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
--throughput -1 --record-size 1000 \
--producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
4. In a separate shell, watch the remote log directory content
watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
5. Once all logs are sent to the remote storage (when the server logs stops, should take around 2min), stop the Kafka server
6. Edit the file LocalTieredStorage#L309 in copyLogSegmentData() in order to throw a RemoteStorageException and disable the ability to store new remote segments.
7. Rebuild Kafka
./gradlew testJar
8. Restart the Kafka server
bin/kafka-server-start.sh config/kraft/server.properties
9. Send enough data for one segment rollup
bin/kafka-producer-perf-test.sh \ --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \ --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
All data in the remote directory will start getting deleted when we would expect just no more writes to happen to the remote storage.