Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.2.0
-
None
-
None
Description
If a broker shuts down after the cleaner calls replaceSegments with more than one segment, the partition can be left in an uncleanable state. We saw this on a few brokers after doing a rolling update. The sequence of things we saw is:
1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 into a new segment 0.
2) Cleaner logged "Swapping in cleaned segment 0 for segment(s) 0,1094621529,1094831997 in log xxx-15." and called replaceSegments.
3) 0.cleaned was renamed to 0.swap.
4) Broker shut down before deleting segments 1094621529 and 1094831997.
5) Broker started up and logged "Found log file /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.log.swap from interrupted swap operation, repairing."
6) Cleaner thread died with the exception "kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.cleaned."
I think what's happening in #6 is that when the broker started back up and repaired the log, segment 0 ended up with a bunch of messages that were also in segment 1094621529 and 1094831997 (because the new segment 0 was created from cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so offsets on disk were no longer monotonically increasing, violating the assumption of OffsetIndex. We ended up fixing this by deleting segments 1094621529 and 1094831997 manually, and then removing the line for this partition from the cleaner-offset-checkpoint file (otherwise it would reference the non-existent segment 1094621529).
This can happen even on a clean shutdown (the async deletes in replaceSegments might not happen).
Cleaner logs post-startup:
2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Beginning cleaning of log xxx-15.
2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for xxx-15...
2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range [1094621529, 1095924157).
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Offset map for log xxx-15 complete.
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 14:05:37 UTC 2015)...
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 14:05:38 UTC 2015) into 0, retaining deletes.
2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 12 13:49:27 UTC 2015) into 0, discarding deletes.
2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 12 14:04:28 UTC 2015) into 0, discarding deletes.
2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) to position 1003 no larger than the last offset appended (1095045873) to /mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.
cleaned.
at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.append(LogSegment.scala:81)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427)
at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358)
at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:321)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:320)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.clean(LogCleaner.scala:320)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
2015-04-12 15:08:05,157 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Stopped