Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15602

Breaking change in 3.4.0 ByteBufferSerializer



    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
    • 3.4.2, 3.5.2, 3.7.0, 3.6.1
    • producer
    • None


      This PR claims to have solved the situation described by KAFKA-4852, namely, to have ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 offsets (or, put another way, to honor the buffer's position() as the start point to consume bytes from). Unfortunately, it failed to actually do this, and instead changed the expectations for how an input ByteBuffer's limit and position should be set before being provided to send() on a producer configured with ByteBufferSerializer. Code that worked with pre-3.4.0 releases now produce 0-length messages instead of the intended messages, effectively introducing a breaking change for existing users of the serializer in the wild.

      Here are a few different inputs and serialized outputs under pre-3.4.0 and 3.4.0+ to summarize the breaking change:

      buffer argument 3.3.2 serialized output 3.4.0+ serialized output
      ByteBuffer.wrap("test".getBytes(UTF_8)) len=4
      len=4 val=test
      ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip() len=4
      len=0 val=
      ByteBuffer.allocate(8).put("test".getBytes(UTF_8)) len=8 val=test<0><0><0><0> len=4 val=test
      ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
      len=4 val=test
      ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3) len=4 val=test len=1 val=t

      Notably, plain-wrappers of byte arrays continue to work under both versions due to the special case in the serializer for them. I suspect that this is the dominant use-case, which is why this has apparently gone un-reported to this point. The wrapped-with-offset case fails for both cases for different reasons (the expected value would be "est"). As demonstrated here, you can ensure that a manually assembled ByteBuffer will work under both versions by ensuring that your buffers start have position == limit == message-length (and an actual desired start position of 0). Clearly, though, behavior has changed dramatically for the second and third case there, with the 3.3.2 behavior, in my experience, aligning better with naive expectations.

      Previously, the serializer would just rewind() the buffer and respect the limit as the indicator as to how much data was in the buffer. So, essentially, the prevailing contract was that the data from position 0 (always!) up to the limit on the buffer would be serialized; so it was really just the limit that was honored. So if, per the original issue, you have a byte[] array wrapped with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() with position = 3 indicating the desired start point to read from, but effectively ignored by the serializer due to the rewind().

      So while the serializer didn't work when presenting a ByteBuffer view onto a sub-view of a backing array, it did however follow expected behavior when employing standard patterns to populate ByteBuffers backed by larger-than-necessary arrays and using limit() to identify the end of actual data, consistent with conventional usage of flip() to switch from writing to a buffer to setting it up to be read from (e.g., to be passed into a producer.send() call). E.g.,

      ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
      ... // some sequence of 
      bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH ... 
      bb.flip(); /* logically, this says "I am done writing, let's set this up for reading"; pragmatically, it sets the limit to the current position so that whoever reads the buffer knows when to stop reading, and sets the position to zero so it knows where to start reading from */ 

      Technically, you wouldn't even need to use flip() there, since position is ignored; it would sufficient to just call bb.limit(bb.position()). Notably, a buffer constructed using any variant of ByteBuffer.wrap() is essentially immediately in read-mode with position indicating the start and limit the end.

      With the change introduced in 3.4.0, however, the contract changes dramatically, and the code just presented produces a 0-byte message. As indicated above, it also continues to fail if you just passed in an offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described by KAFKA-4852:

      public void testByteBufferSerializerOnOffsetWrappedBytes() {
          final byte[] bytes = "Hello".getBytes(UTF_8);
          try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
                      // FAILS: this will yield "H", not "ello"
                      serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, bytes.length - 1)));

      What happened here?

      The resulting PR, it seems, focussed on a flawed proposed test case in the first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce that here with commented annotations from me: 

      @Test // flawed proposed test case
      public void testByteBufferSerializer() {
          final byte[] bytes = "Hello".getBytes(UTF_8);
          final ByteBuffer buffer = ByteBuffer.allocate(7);
          // buffer.flip();   <-- would make the test work
          try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
              assertArrayEquals(bytes, serializer.serialize(topic, buffer));

      In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 2 0-value bytes. I contend that this was actually expected and correct behavior, as the ByteBuffer had never had its limit set, so the implicit and mildly expected contract was never actually abided by. If there was a buffer.flip() after the .put(bytes) call, as the calling code should have applied, however, then the test would have succeeded. In short, by trying to make this test case succeed, I think this PR represented nothing more than a misunderstanding for how one should prepare ByteBuffers for reading, but has managed to result in a breaking change. The breaking nature of this was actually briefly noted in PR comments but kind of shrugged off with some test changes and explanatory comments on the class.

      Obviously a correction to restore 3.3.2 behavior would represent another breaking change for users that are running on 3.4+, unless they were also somewhat surprisingly configuring buffers for position() == limit() before passing them to send. Arguably, it would also be a breaking change (though possibly not one of great consequence) if either version was changed to correctly handle the wrapped-with-offset case as represented in the original ticket.

      I do not have much experience contending with a situation like this, but at the risk of jumping to a solution here, I wonder if the only way to really move forward safely and unambiguously here is to remove ByteBufferSerializer as it stands and replace it with a differently named substitute that handles both the plain-wrapped special case and just serializes content from position() to limit(), forcing an evaluation by users when upgrading as to whether the provided byte buffer is correctly configured or not. Of course, a change like that would have be released at an appropriate version level, too, so I don't know exactly what the desired interim behavior would be (deprecation?). I believe I would be eager to contribute to a fix, but obviously I would need guidance from maintainers regarding the correct path forward semantically.


        Issue Links



              mjsax Matthias J. Sax
              luke.kirby Luke Kirby
              1 Vote for this issue
              5 Start watching this issue