Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.8.2.1
-
None
-
None
Description
We're seeing some strange behaviour in brokers: a replica will sometimes schedule all segments in a partition for deletion, and then immediately start replicating them back, triggering our check for under-replicating topics.
This happens on average a couple of times a week, for different brokers and topics.
We have per-topic retention.ms and retention.bytes configuration, the topics where we've seen this happen are hitting the size limit.
Attachments
Attachments
- kafka_log.txt
- 14 kB
- HĂĄkon Hitland
- kafka_log_trace.txt
- 24 kB
- HĂĄkon Hitland
- Screen Shot 2015-10-10 at 6.54.44 PM.png
- 690 kB
- Chinmay Soman
Issue Links
- contains
-
KAFKA-2621 nextOffsetMetadata should be changed after rolling a new log segment
- Resolved
Activity
Thanks for the reply. Checking the logs, we did get the "Error when processing fetch request" error in the leader mentioned in KAFKA-2143, so it could be the same issue.
I don't see anything in our logs about a leader change, so I don't think it is caused by an unclean election, like some of the comments suggest.
What is the partition replication factor?
Also, can you search for "start offset" in the server log of the broker who truncates its log?
We use a replication factor of 3.
The only line with "start offset" that day is the one in the attached log:
[2015-08-24 18:32:32,299] WARN [ReplicaFetcherThread-3-0], Replica 3 for partition [log.event,3] reset its fetch offset from 10200597616 to current leader 0's start offset 10200597616 (kafka.server.ReplicaFetcherThread)
e: the leader error reads:
[2015-08-24 18:32:32,145] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [log.event,3] offset 10349592111 from follower with correlation id 141609587. Possible cause: Request for offset 10349592111 but we only have log segments in the range 10200597616 to 10349592109. (kafka.server.ReplicaManager)
hakon, the leader changes will be logged in the controller log. Do you see anything there? Also, we have a jmx metrics kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
that tells you if there is any unclean leader election.
"kafka.controller:name=UncleanLeaderElectionsPerSec,type=ControllerStats" has Count = 0 on all brokers.
There is nothing in any controller.log from that day except for one broker checking leader imbalance.
([2015-08-24 00:02:32,650] TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController)
[2015-08-24 00:02:32,684] DEBUG [Controller 0]: preferred replicas by broker Map([...]) (kafka.controller.KafkaController)
[2015-08-24 00:02:32,684] DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController)
[2015-08-24 00:02:32,685] TRACE [Controller 0]: leader imbalance ratio for broker 2 is 0.000000 (kafka.controller.KafkaController)
, etc.)
Interesting, so there is nothing in broker 0's log about truncating the log for partition [log.event,3]?
It seems that this is reproducible. Would it be possible for you to enable TRACE level logging for class kafka.log.Log on every broker. This will log the new log end offset after each message append. When this issue happens again, we can verify if it is indeed that the follower uses a fetch offset that's larger than the leader's log end offset.
I don't think enabling trace logging would be practical in our production environment, unfortunately.
We see the error regularly in production, but I haven't been able to reproduce it locally.
Could you then try the following? In the above situation, go to broker 0's log dir for partition [log.event,3]. Get the name of the last log segment (the .log file). Then run the following
bin/kafka-run-class.sh kafka.tools.DumpLogSegments [logsegmentname]
This will print out the offset of each message. In the normal case, those offsets should be monotonically increasing. Could you check if there is any out of sequence offsets in the output especially close to 10349592109?
I don't see any out of sequence offsets.
Here are a couple of recent examples.
If I run with --deep-iteration, all offsets are present and sequential.
The result on the replica is identical to the leader.
—
[2015-09-02 23:43:03,379] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [log.event,3] offset 10591627212 from follower with correlation id 391785394. Possible cause: Request for offset 10591627212 but we only have log segments in the range 10444248800 to 10591627211. (kafka.server.ReplicaManager)
offset: 10591627210 position: 994954613 isvalid: true payloadsize: 674 magic: 0 compresscodec: SnappyCompressionCodec crc: 4144791071
offset: 10591627211 position: 994955313 isvalid: true payloadsize: 1255 magic: 0 compresscodec: SnappyCompressionCodec crc: 1011806998
offset: 10591627213 position: 994956594 isvalid: true payloadsize: 1460 magic: 0 compresscodec: SnappyCompressionCodec crc: 4145284502
offset: 10591627215 position: 994958080 isvalid: true payloadsize: 1719 magic: 0 compresscodec: SnappyCompressionCodec crc: 444418110
[2015-09-03 11:44:02,483] ERROR [Replica Manager on Broker 3]: Error when processing fetch request for partition [log.count,5] offset 69746066284 from follower with correlation id 239821628. Possible cause: Request for offset 69746066284 but we only have log segments in the range 68788206610 to 69746066280. (kafka.server.ReplicaManager)
offset: 69746066278 position: 464897345 isvalid: true payloadsize: 674 magic: 0 compresscodec: SnappyCompressionCodec crc: 3013732329
offset: 69746066279 position: 464898045 isvalid: true payloadsize: 234 magic: 0 compresscodec: SnappyCompressionCodec crc: 3286064200
offset: 69746066283 position: 464898305 isvalid: true payloadsize: 486 magic: 0 compresscodec: SnappyCompressionCodec crc: 747917524
offset: 69746066285 position: 464898817 isvalid: true payloadsize: 342 magic: 0 compresscodec: SnappyCompressionCodec crc: 4283754786
offset: 69746066286 position: 464899185 isvalid: true payloadsize: 233 magic: 0 compresscodec: SnappyCompressionCodec crc: 2129213572
Thanks. Then the log looks normal. The only thing that I can recommend now is to try reproducing the issue locally and apply the trace level logging.
Also, since you are using snappy, you may want to apply the fixes in 0.8.2.2 (https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/RELEASE_NOTES.html) once it's out. They may not be related to the issue that you are seeing here though.
Attached trace log from leader. Filtered to only lines for the relevant partition.
I was able to enable trace logging on a production server, and have captured logs from the leader when the error happens.
It looks like the attempted read happens right before the log is actually appended. I don't see any other abnormal behaviour.
Looking at the code in question, I think I have an idea of how it might happen:
kafka.log.Log uses a lock to synchronize writes, but not reads.
Assume a write W1 has gotten as far as FileMessageSet.append() and has just executed _size.getAndAdd(written)
Now a concurrent read R1 comes in. In FileMessageSet.read(), it can get a new message set with end = math.min(this.start + position + size, sizeInBytes()). This includes the message that was just written in W1.
The read finishes, and a new read R2 starts. R2 tries to continue from W1, but in Log.read() it finds that startOffset is larger than nextOffsetMetadata.messageOffset and throws an exception.
(By the way, Log.read() can potentially read nextOffsetMetadata multiple times, with no guarantee that it hasn't changed. It's not obvious to me that this is correct.)
Finally, W1 updates nextOffsetMetadata in Log.updateLogEndOffset(), too late for R2 which has already triggered a log truncation on the replica.
Some possible solutions:
- Synchronize access to nextOffsetMetadata in Log.read()
- Clamp reads in Log.read() to never go beyond the current message offset.
hakon Yes, that's correct.
The log appending does the following two things:
1. Append message to log
2. Update Log.nextOffsetMetadata.messageOffset.
If two follower reads come between 1 and 2. There will be a out of range exception. I think the fix is to read up to Log.nextOffsetMetadata.messageOffset for replicas instead of max size.
Are you interested in submitting a patch?
I don't think I can provide a patch at the moment, I would appreciate if someone more familiar with the code fixed it.
GitHub user becketqin reopened a pull request:
https://github.com/apache/kafka/pull/204
KAFKA-2477: Fix a race condition between log append and fetch that causes OffsetOutOfRangeException.
Tried two fixes. I prefer the second approach because it saves an additional offset search.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/becketqin/kafka KAFKA-2477
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/204.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #204
commit e7610fb69a4007ae661a768635e930355c8caa76
Author: Jiangjie Qin <becket.qin@gmail.com>
Date: 2015-09-11T02:17:12Z
KAFKA-2477: Fix a race condition between log append and fetch that causes OffsetOutOfRangeException
commit 45364d76e756fc6075924b3a07651b7fbbcc391a
Author: Jiangjie Qin <becket.qin@gmail.com>
Date: 2015-09-11T03:06:35Z
A second fix that avoids an addition offset search
junrao I just marked this ticket for 0.9. The patch is small. Will you be able to take a look?
hakon, thanks a lot for the update. This seems like a real issue. Also, the point that you made on "Log.read() can potentially read nextOffsetMetadata multiple times" is also relevant. In Log.read(), we have the following code:
// check if the offset is valid and in range
val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
This seems wrong. If nextOffsetMetadata changes after the if test, we could return a larger fetchOffsetMetadata in FetchDataInfo that we should. This will potentially affect the computation of things like isr. In this case, we should get a reference of nextOffsetMetadata first and use that to do the if test and as the return value.
Log.read() also references nextOffsetMetadata again in the last line. I am not sure if the comment is correct. The last message will never be deleted, so it seems that we can never reach the last statement.
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
becket_qin, do you want to fix nextOffsetMetadata in your patch too?
junrao I'll fix the nexOffsetMetadata in the patch.
Would the following case be what the last line was trying to address?
1. Leader is on broker 1, HW=X, LEO=Y, Y > X
2. A fetch request from follower goes to broker 1 to fetch from offset Z. Assume X < Z < Y.
3. Broker 1 proceeds with fetch request and enters Log.read()
4. Leader migration occurs, log on broker 1 got truncated to X.
In this case, because the FetchRequest has passed the leader check before leader migration, no NotLeaderForPartitionException would be thrown. Also because the read does not grab any lock, the log truncation might occur before the actual message search occur.
Issue resolved by pull request 204
https://github.com/apache/kafka/pull/204
HĂĄkon Hitland, the fix is committed in trunk. Do you think you can test this out in a test environment?
We're hitting this issue quite often (every 15-20 mins) and is a problem since its eating up the already scarce disk / Network resource. At the moment we're running 0.8.2.0. Is there any plan to backport this patch given the severity of the issue ?
cpsoman Interesting, the only other org I had heard this affecting was seeing it at approximately the same frequency as the original report – maybe once or twice a week. It seemed random, so the events could sometimes clump together (e.g. see two in one day), but the average rate was pretty low. It was an annoyance rather than a serious issue.
The patch does seem to apply trivially to 0.8.2.2.
ewencp I'm attaching the screenshot of Max lag observed for the different brokers which describes the behaviour.
Also here's the pertinent log:
========
[2015-10-10 22:17:17,759] 3171793337 [kafka-request-handler-0] WARN kafka.server.ReplicaManager - [Replica Manager on Broker 70]: Error when processing fetch request for partition [...topic...,62] offset 91963211 from follower with correlation id 176614372. Possible cause: Request for offset 91963211 but we only have log segments in the range 55923986 to 91963210.
[2015-10-10 22:17:17,759] 3171793337 [kafka-request-handler-4] WARN kafka.server.ReplicaManager - [Replica Manager on Broker 70]: Error when processing fetch request for partition [...topic...,62] offset 91963211 from follower with correlation id 152788081. Possible cause: Request for offset 91963211 but we only have log segments in the range 55923986 to 91963210.
[2015-10-10 22:17:20,256] 3171795834 [kafka-scheduler-4] INFO kafka.cluster.Partition - Partition [...topic...,62] on broker 70: Shrinking ISR for partition [hp.event.user.driver_app.experiment,62] from 70,69,71 to 70
========
Btw, this doesn't happen everywhere but is definitely seen in our biggest cluster (with way more partitions / node). Maybe it has something to do with scale ?
cpsoman I think the likelihood of the issue is related to the scale as you observed, because there would be potentially more threads trying to read/write from the same log segment.
It looks there are a few other patches on the files touched by this patch since 0.8.2.0. However, I checked the code of 0.8.2.0, it seems the code blocks related to this patch are still the same as the latest trunk. So you should be able to patch 0.8.2.0 easily although the patch itself may not apply.
cpsoman Beyond applying to 0.8.2.0 with the patch, any reason not to update to 0.8.2.2 and apply the patch on top of that, where it definitely applies cleanly? It looks like 8 patches, and some of the patches on top of 0.8.2.0 are likely to be useful if you might have a large number of partitions or use snappy compression, among other key fixes. Maybe you're not hitting any of the critical fixes in those releases, but since they're low risk maybe catching up with the latest release and only having a minor patch would simplify things?
ewencp Totally agree. Its just that the current 0.8.2.0 version we're using : has been "tampered" with. I see a lot of commits from our previous team here and I need to be careful not to break anything. At the moment, I've applied the patch on top of our current version and tested in staging. I'll be rolling it out on our biggest cluster soon to validate whether it works.
Attached example log from a broker