This happens when an existing follower becomes the new leader and the current leader starts following the new leader.
The existing follower closes the fetcher thread and transitions to become a leader
The current leader truncates its log to high water mark and starts following the new leader
The messageset that is received by the old follower during this transition contains only zero bytes. When we try to iterate through this messageset, we fail and throw the above exception.
What causes these zero bytes to be present in the messageset? It looks like when the old leader truncated its log, it was also trying to send bytes to the follower. These bytes were outside the truncated region. Somehow, the bytes after the highwatermark all became zeros.
It turns out that in jdk 1.6 there is a bug in truncateTo that truncates the file but does not update the postion of the file. This is fixed in kafka by explicitly setting the position after the truncate call. However, a simple program below verifies that reading the file channel after the truncated region (without setting the position) is totally fine and does not return any bytes
// create a channel for a file
val path = "/home/myid/outfile1"
val fileAccess = new RandomAccessFile(path, "rw")
val fc = fileAccess.getChannel
// create random buffer
val b = ByteBuffer.allocate(100)
// write the buffer to the channel
var pos = fc.position() // position is 100
var size = fc.size() // size is 100
// truncate the channel
size = fc.size() // size is 50
pos = fc.position() // position is 100
// transfer the truncated portition to a channel
val path1 = "/home/myid/outfile2"
val f2 = new RandomAccessFile(path1, "rw")
val fc1 = f2.getChannel
val transferred = fc.transferTo(50, 50, fc1) // transferred is 0
Further, if we add the 3" step below after step 3 above, it can be seen that step 4 does return non zero bytes and they all contain 0 bytes.
// write more bytes
pos = fc.position() // position is 200
size = fc.size() // size is 200
The code above shows that appending to a file without setting the position after truncate does expose the zero bytes to the reader. But in kafka, truncate/set position and append are all synchronized. This means we should not hit the issue above.
This could mean there is a race condition in FileChannelImpl that could somehow cause this. The code snippet below from transferTo method from FileChannelImpl might explain what we see.
long sz = size(); – > checks size. size() is synchronized with other FileChannelImpl methods
if (position > sz)
return 0; --> This is what is returned in step 4 above in the first case. The size is smaller than the position requested. However, truncate can happen after this line.
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);
// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0) // the size check above could have been good above but at this point the size is smaller than the requested
return n; // position. transferToDirectly calls transferTo0 which could just read the zero bytes written by truncate.
Few open questions
1. Does truncate zero out the bytes synchronously or lazily? If it is lazy, we could also get junk bytes instead of zeros
2. How to fix it in kafka. One possible fix is to ensure that the MessageSet iterator throws invalid message when it encounters 0 byte size or if crc does not match the message. The follower can then try to refetch the offset for that topic partition or just fail (atleast we know the cause).