Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.1.1, 2.0.1
-
None
-
None
-
None
Description
When changing a partition's log directories for a follower broker, we move all the data related to that partition to the other log dir (as per KIP-113). On a successful move, we rename the original directory by adding a suffix consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to `test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`)
We copy every log file and initialize a new leader epoch file cache. The problem is that we do not update the associated `Replica` class' leader epoch cache - it still points to the old `LeaderEpochFileCache` instance.
This results in a FileNotFound exception when the broker is [elected as a leader for the partition. This has the unintended side effect of marking the log directory as offline, resulting in all partitions from that log directory becoming unavailable for the specific broker.
Exception and logs
I reproduced this locally by running two brokers. The steps to reproduce:
Create partition replicated across two brokers (A, B) with leader A Move partition leadership to B Alter log dirs on A Move partition leadership back to A
This results in a log directory structure on broker B similar to this:
├── new_dir │ ├── cleaner-offset-checkpoint │ ├── log-start-offset-checkpoint │ ├── meta.properties │ ├── recovery-point-offset-checkpoint │ ├── replication-offset-checkpoint │ └── test_log_dir-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ └── leader-epoch-checkpoint └── old_dir ├── cleaner-offset-checkpoint ├── log-start-offset-checkpoint ├── meta.properties ├── recovery-point-offset-checkpoint ├── replication-offset-checkpoint └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete ├── 00000000000000000000.index ├── 00000000000000000000.log ├── 00000000000000000000.timeindex ├── 00000000000000000009.snapshot └── leader-epoch-checkpoint
[2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0] test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN [LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3, startOffset=9) caused truncation of conflicting entries ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries. (kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR Error while writing to checkpoint file /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel) java.io.FileNotFoundException: /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or directory) at java.base/java.io.FileOutputStream.open0(Native Method) at java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:238) at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:188) at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64) at kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp(LeaderEpochFileCache.scala:62) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:52) at kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:395) at kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:394) at scala.Option.foreach(Option.scala:257) at kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:394) at kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:367) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.cluster.Partition.makeLeader(Partition.scala:367) at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1162) at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1160) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1160) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1072) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:110) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.base/java.lang.Thread.run(Thread.java:844) [2019-03-04 15:36:56,864] INFO [ReplicaManager broker=0] Stopping serving replicas in dir /logs/old_dir (kafka.server.ReplicaManager)
As you can see from the stack trace, `Replica#epochs`'s `LeaderEpochFileCache` still points to the old `/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint` file
Attachments
Issue Links
- is related to
-
KAFKA-7897 Invalid use of epoch cache with old message format versions
- Resolved