Kafka
  1. Kafka
  2. KAFKA-861

IndexOutOfBoundsException while fetching data from leader

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      2013-04-09 16:36:50,051] ERROR [ReplicaFetcherThread-0-261], Error due to (kafka.server.ReplicaFetcherThread)
      kafka.common.KafkaException: error processing data for topic firehoseUpdates partititon 14 offset 53531364
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:136)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
      at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
      Caused by: java.lang.IndexOutOfBoundsException
      at java.nio.Buffer.checkIndex(Buffer.java:512)
      at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:121)
      at kafka.message.Message.compressionCodec(Message.scala:202)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:174)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:197)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145)
      at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
      at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
      at scala.collection.IterableLike$class.isEmpty(IterableLike.scala:92)
      at kafka.message.MessageSet.isEmpty(MessageSet.scala:67)
      at scala.collection.TraversableLike$class.lastOption(TraversableLike.scala:512)
      at kafka.message.MessageSet.lastOption(MessageSet.scala:67)

      1. KAFKA-861.patch
        3 kB
        Sriram Subramanian
      2. KAFKA-861-v2.patch
        5 kB
        Sriram Subramanian

        Activity

        Sriram Subramanian created issue -
        Hide
        Sriram Subramanian added a comment -

        This happens when an existing follower becomes the new leader and the current leader starts following the new leader.

        The existing follower closes the fetcher thread and transitions to become a leader
        The current leader truncates its log to high water mark and starts following the new leader

        The messageset that is received by the old follower during this transition contains only zero bytes. When we try to iterate through this messageset, we fail and throw the above exception.

        What causes these zero bytes to be present in the messageset? It looks like when the old leader truncated its log, it was also trying to send bytes to the follower. These bytes were outside the truncated region. Somehow, the bytes after the highwatermark all became zeros.

        It turns out that in jdk 1.6 there is a bug in truncateTo that truncates the file but does not update the postion of the file. This is fixed in kafka by explicitly setting the position after the truncate call. However, a simple program below verifies that reading the file channel after the truncated region (without setting the position) is totally fine and does not return any bytes

        1.
        // create a channel for a file
        val path = "/home/myid/outfile1"
        val fileAccess = new RandomAccessFile(path, "rw")
        val fc = fileAccess.getChannel

        2.
        // create random buffer
        val b = ByteBuffer.allocate(100)
        new Random().nextBytes(b.array())

        // write the buffer to the channel
        fc.write(b)
        var pos = fc.position() // position is 100
        var size = fc.size() // size is 100

        3.
        // truncate the channel
        fc.truncate(50)
        size = fc.size() // size is 50
        pos = fc.position() // position is 100

        4.
        // transfer the truncated portition to a channel
        val path1 = "/home/myid/outfile2"
        val f2 = new RandomAccessFile(path1, "rw")
        val fc1 = f2.getChannel
        val transferred = fc.transferTo(50, 50, fc1) // transferred is 0

        Further, if we add the 3" step below after step 3 above, it can be seen that step 4 does return non zero bytes and they all contain 0 bytes.

        3"

        // write more bytes
        b.rewind()
        fc.write(b)
        pos = fc.position() // position is 200
        size = fc.size() // size is 200

        The code above shows that appending to a file without setting the position after truncate does expose the zero bytes to the reader. But in kafka, truncate/set position and append are all synchronized. This means we should not hit the issue above.

        This could mean there is a race condition in FileChannelImpl that could somehow cause this. The code snippet below from transferTo method from FileChannelImpl might explain what we see.

        long sz = size(); – > checks size. size() is synchronized with other FileChannelImpl methods
        if (position > sz)
        return 0; --> This is what is returned in step 4 above in the first case. The size is smaller than the position requested. However, truncate can happen after this line.
        int icount = (int)Math.min(count, Integer.MAX_VALUE);
        if ((sz - position) < icount)
        icount = (int)(sz - position);

        long n;

        // Attempt a direct transfer, if the kernel supports it
        if ((n = transferToDirectly(position, icount, target)) >= 0) // the size check above could have been good above but at this point the size is smaller than the requested
        return n; // position. transferToDirectly calls transferTo0 which could just read the zero bytes written by truncate.

        Few open questions
        1. Does truncate zero out the bytes synchronously or lazily? If it is lazy, we could also get junk bytes instead of zeros
        2. How to fix it in kafka. One possible fix is to ensure that the MessageSet iterator throws invalid message when it encounters 0 byte size or if crc does not match the message. The follower can then try to refetch the offset for that topic partition or just fail (atleast we know the cause).

        Show
        Sriram Subramanian added a comment - This happens when an existing follower becomes the new leader and the current leader starts following the new leader. The existing follower closes the fetcher thread and transitions to become a leader The current leader truncates its log to high water mark and starts following the new leader The messageset that is received by the old follower during this transition contains only zero bytes. When we try to iterate through this messageset, we fail and throw the above exception. What causes these zero bytes to be present in the messageset? It looks like when the old leader truncated its log, it was also trying to send bytes to the follower. These bytes were outside the truncated region. Somehow, the bytes after the highwatermark all became zeros. It turns out that in jdk 1.6 there is a bug in truncateTo that truncates the file but does not update the postion of the file. This is fixed in kafka by explicitly setting the position after the truncate call. However, a simple program below verifies that reading the file channel after the truncated region (without setting the position) is totally fine and does not return any bytes 1. // create a channel for a file val path = "/home/myid/outfile1" val fileAccess = new RandomAccessFile(path, "rw") val fc = fileAccess.getChannel 2. // create random buffer val b = ByteBuffer.allocate(100) new Random().nextBytes(b.array()) // write the buffer to the channel fc.write(b) var pos = fc.position() // position is 100 var size = fc.size() // size is 100 3. // truncate the channel fc.truncate(50) size = fc.size() // size is 50 pos = fc.position() // position is 100 4. // transfer the truncated portition to a channel val path1 = "/home/myid/outfile2" val f2 = new RandomAccessFile(path1, "rw") val fc1 = f2.getChannel val transferred = fc.transferTo(50, 50, fc1) // transferred is 0 Further, if we add the 3" step below after step 3 above, it can be seen that step 4 does return non zero bytes and they all contain 0 bytes. 3" // write more bytes b.rewind() fc.write(b) pos = fc.position() // position is 200 size = fc.size() // size is 200 The code above shows that appending to a file without setting the position after truncate does expose the zero bytes to the reader. But in kafka, truncate/set position and append are all synchronized. This means we should not hit the issue above. This could mean there is a race condition in FileChannelImpl that could somehow cause this. The code snippet below from transferTo method from FileChannelImpl might explain what we see. long sz = size(); – > checks size. size() is synchronized with other FileChannelImpl methods if (position > sz) return 0; --> This is what is returned in step 4 above in the first case. The size is smaller than the position requested. However, truncate can happen after this line. int icount = (int)Math.min(count, Integer.MAX_VALUE); if ((sz - position) < icount) icount = (int)(sz - position); long n; // Attempt a direct transfer, if the kernel supports it if ((n = transferToDirectly(position, icount, target)) >= 0) // the size check above could have been good above but at this point the size is smaller than the requested return n; // position. transferToDirectly calls transferTo0 which could just read the zero bytes written by truncate. Few open questions 1. Does truncate zero out the bytes synchronously or lazily? If it is lazy, we could also get junk bytes instead of zeros 2. How to fix it in kafka. One possible fix is to ensure that the MessageSet iterator throws invalid message when it encounters 0 byte size or if crc does not match the message. The follower can then try to refetch the offset for that topic partition or just fail (atleast we know the cause).
        Hide
        Sriram Subramanian added a comment -

        Fixes the size = 0 in the iterator.The fetcher thread logs and does not update the offset and retries again in the next loop for that topic partition.

        Show
        Sriram Subramanian added a comment - Fixes the size = 0 in the iterator.The fetcher thread logs and does not update the offset and retries again in the next loop for that topic partition.
        Sriram Subramanian made changes -
        Field Original Value New Value
        Attachment KAFKA-861.patch [ 12578813 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. A couple of comments:

        1. ByteBufferMessageSet: Instead of check for size <= 0, it's probably better to check for size < Message.MinHeaderSize.

        2. AbstractFetcher: Could we add a comment when we catch InvalidMessageException to explain the particular problem that we have seen and why refetching the data will likely solve the issue?

        Show
        Jun Rao added a comment - Thanks for the patch. A couple of comments: 1. ByteBufferMessageSet: Instead of check for size <= 0, it's probably better to check for size < Message.MinHeaderSize. 2. AbstractFetcher: Could we add a comment when we catch InvalidMessageException to explain the particular problem that we have seen and why refetching the data will likely solve the issue?
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch. Good catch!

        1. AbstractFetcherThread -
        1.1 Let's change the logging to "[%s,%d]" for printing the topic partition. This is what most of the code is standardized upon right now. It will make it easier to grep through the logs.
        1.2 While you're in there, do you mind fixing this typo - "partititon"

        2 ByteBufferMessageSet: +1 on Jun's suggestion above.

        Show
        Neha Narkhede added a comment - Thanks for the patch. Good catch! 1. AbstractFetcherThread - 1.1 Let's change the logging to " [%s,%d] " for printing the topic partition. This is what most of the code is standardized upon right now. It will make it easier to grep through the logs. 1.2 While you're in there, do you mind fixing this typo - "partititon" 2 ByteBufferMessageSet: +1 on Jun's suggestion above.
        Sriram Subramanian made changes -
        Attachment KAFKA-861-v2.patch [ 12578846 ]
        Hide
        Neha Narkhede added a comment -

        +1 on v2

        Show
        Neha Narkhede added a comment - +1 on v2
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. We should do "size < Message.MinHeaderSize" instead of "size <= Message.MinHeaderSize" in ByteBufferMessagesSet since it's possible to have a valid message whose size is exactly Message.MinHeaderSize. Committed to 0.8 with the change.

        Great investigation.

        Show
        Jun Rao added a comment - Thanks for patch v2. We should do "size < Message.MinHeaderSize" instead of "size <= Message.MinHeaderSize" in ByteBufferMessagesSet since it's possible to have a valid message whose size is exactly Message.MinHeaderSize. Committed to 0.8 with the change. Great investigation.
        Jun Rao made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            Sriram Subramanian
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development