Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • core

    Description

      Since the FetchResponse already has errorcode, there is no need to keep errorcode in ByteBufferMessageSet any more.

      Attachments

        1. kafka-458-v1.patch
          6 kB
          Swapnil Ghike
        2. kafka-458-v2.patch
          17 kB
          Swapnil Ghike
        3. kafka-458-v3.patch
          31 kB
          Swapnil Ghike
        4. kafka-458-v4.patch
          31 kB
          Swapnil Ghike

        Activity

          swapnilghike Swapnil Ghike added a comment -

          There was a method to calculate hashcode in ByteBufferMessageSet, but it was never used. So I have removed it. Also removed the method hasError in KafkaETLContext.java as it was not used anymore.

          swapnilghike Swapnil Ghike added a comment - There was a method to calculate hashcode in ByteBufferMessageSet, but it was never used. So I have removed it. Also removed the method hasError in KafkaETLContext.java as it was not used anymore.
          junrao Jun Rao added a comment -

          Thanks for patch v1. You only fixed the ByteBufferMessageSet in javaapi. There is another ByteBufferMessageSet in kafka.messsage that needs to be fixed. Also, in ByteBufferMessageSet, we should get rid of those get* methods and use the val directly.

          junrao Jun Rao added a comment - Thanks for patch v1. You only fixed the ByteBufferMessageSet in javaapi. There is another ByteBufferMessageSet in kafka.messsage that needs to be fixed. Also, in ByteBufferMessageSet, we should get rid of those get* methods and use the val directly.
          swapnilghike Swapnil Ghike added a comment -

          Changes from last patch:
          1. Removed errorcode from message/ByteBufferMessageSet.
          2. Modified 4 unit tests to check for errorcode in FetchResponse.data.partitionDataArray(j).error

          swapnilghike Swapnil Ghike added a comment - Changes from last patch: 1. Removed errorcode from message/ByteBufferMessageSet. 2. Modified 4 unit tests to check for errorcode in FetchResponse.data .partitionDataArray(j).error
          junrao Jun Rao added a comment -

          Thanks for patch v2. Some comments:

          1. ByteBufferMessageSet: To be consistent btw the java and the scala version, we should have a public api serialized() to get the serialized byteBuffer and make buffer a private field. Also, the javaapi version needs to define hashCode since it may be used in a map.

          2. ByteBufferMessageSet, KafkaETLContext: remove unused imports.

          junrao Jun Rao added a comment - Thanks for patch v2. Some comments: 1. ByteBufferMessageSet: To be consistent btw the java and the scala version, we should have a public api serialized() to get the serialized byteBuffer and make buffer a private field. Also, the javaapi version needs to define hashCode since it may be used in a map. 2. ByteBufferMessageSet, KafkaETLContext: remove unused imports.
          jkreps Jay Kreps added a comment -

          Jun, what would this serialized api be for? ByteBufferMessageSet already has a buffer field which has the serialized ByteBuffer. Other message sets can be copied to a ByteBufferMessageSet if desired, do we need a new api in the interface? I recommend we just add the @BeanProperty to the existing buffer field.

          jkreps Jay Kreps added a comment - Jun, what would this serialized api be for? ByteBufferMessageSet already has a buffer field which has the serialized ByteBuffer. Other message sets can be copied to a ByteBufferMessageSet if desired, do we need a new api in the interface? I recommend we just add the @BeanProperty to the existing buffer field.
          jkreps Jay Kreps added a comment -

          Swapnil, this patch look fantastic, very thorough clean up. One issue: in ByteBufferMessageSet you leave equals() overridden but I think hashCode is no longer overridden (basically the equals uses the initial offset but the hash code doesn't). This can cause some subtle issues with use of collections, and is probably a bug.

          jkreps Jay Kreps added a comment - Swapnil, this patch look fantastic, very thorough clean up. One issue: in ByteBufferMessageSet you leave equals() overridden but I think hashCode is no longer overridden (basically the equals uses the initial offset but the hash code doesn't). This can cause some subtle issues with use of collections, and is probably a bug.
          jkreps Jay Kreps added a comment -

          Also, I think KafkaApis.readMessageSet should no longer catch the the exception and return an Either[Short,MessageSet] I think. This was a bit of a hack and makes the code in KafkaApis.readMessageSets a little hard to read (i.e. the reference to left and right would be more clear as a try/catch since that is what it is).

          jkreps Jay Kreps added a comment - Also, I think KafkaApis.readMessageSet should no longer catch the the exception and return an Either [Short,MessageSet] I think. This was a bit of a hack and makes the code in KafkaApis.readMessageSets a little hard to read (i.e. the reference to left and right would be more clear as a try/catch since that is what it is).
          junrao Jun Rao added a comment -

          Jay, we just need a consistent api to get the byteBuffer in the scala and the java version of ByteBufferMessageSet. @BeanProperty probably works too.

          junrao Jun Rao added a comment - Jay, we just need a consistent api to get the byteBuffer in the scala and the java version of ByteBufferMessageSet. @BeanProperty probably works too.
          swapnilghike Swapnil Ghike added a comment -

          2.1 As discussed with Jun, since any use of javaapi/ByteBufferMessageSet must happen through the iterator, I have removed the getSerialized() method and made the buffer field private. The buffer field in non javaapi version of ByteBufferMessageSet is a public val. The getSerialized() method is removed from non javaapi version of ByteBufferMessageSet, MessageSet and FileMessageSet as well (where it merely threw an UnsupportedOperationException). This introduces a need for a typecast in ProducerRequest.writeTo(buffer) and it perhaps could look like another hack introduced to get rid of the UnsupportedOperationException hack in FileMessageSet. This could be taken care of with KAFKA-289.

          2.2. Removed unused imports.

          2.3 The hashCode method is now overridden. Jay, thanks for catching that!

          2.4 Just to be sure, did you mean that the behaviour achieved by Either should be achieved by throwing an exception in KafkaApis.readMessageSet and catching it in KafkaApis.readMessageSets? That's what I have done in this patch.

          2.5 Removed testValidBytes() in javaapi/ByteBufferMessageSetTest because it was merely running the test on underlying.validBytes and there is already another test that does the same thing on non javaapi version of ByteBufferMessageSet. Moved testValidBytesWithCompression() from javaapi/ByteBufferMessageSetTest to the other ByteBufferMessageSetTest.

          swapnilghike Swapnil Ghike added a comment - 2.1 As discussed with Jun, since any use of javaapi/ByteBufferMessageSet must happen through the iterator, I have removed the getSerialized() method and made the buffer field private. The buffer field in non javaapi version of ByteBufferMessageSet is a public val. The getSerialized() method is removed from non javaapi version of ByteBufferMessageSet, MessageSet and FileMessageSet as well (where it merely threw an UnsupportedOperationException). This introduces a need for a typecast in ProducerRequest.writeTo(buffer) and it perhaps could look like another hack introduced to get rid of the UnsupportedOperationException hack in FileMessageSet. This could be taken care of with KAFKA-289 . 2.2. Removed unused imports. 2.3 The hashCode method is now overridden. Jay, thanks for catching that! 2.4 Just to be sure, did you mean that the behaviour achieved by Either should be achieved by throwing an exception in KafkaApis.readMessageSet and catching it in KafkaApis.readMessageSets? That's what I have done in this patch. 2.5 Removed testValidBytes() in javaapi/ByteBufferMessageSetTest because it was merely running the test on underlying.validBytes and there is already another test that does the same thing on non javaapi version of ByteBufferMessageSet. Moved testValidBytesWithCompression() from javaapi/ByteBufferMessageSetTest to the other ByteBufferMessageSetTest.
          junrao Jun Rao added a comment -

          Thanks for patch v3. Some comments:

          1. Could you rebase?

          2. javaapi.ByteBufferMessageSet.hash(): should we just call the underlying hashcode?

          3. KafkaApis.readMessageSet(fetchRequest): Could you fix the indentation of try/catch?

          4. PartitionTopicInfo.enqueueError can be removed since it's no longer used.

          junrao Jun Rao added a comment - Thanks for patch v3. Some comments: 1. Could you rebase? 2. javaapi.ByteBufferMessageSet.hash(): should we just call the underlying hashcode? 3. KafkaApis.readMessageSet(fetchRequest): Could you fix the indentation of try/catch? 4. PartitionTopicInfo.enqueueError can be removed since it's no longer used.
          swapnilghike Swapnil Ghike added a comment -

          Modified the patch.

          swapnilghike Swapnil Ghike added a comment - Modified the patch.
          junrao Jun Rao added a comment -

          Thanks for patch v4. Committed to 0.8.

          junrao Jun Rao added a comment - Thanks for patch v4. Committed to 0.8.

          People

            swapnilghike Swapnil Ghike
            junrao Jun Rao
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 72h
                72h
                Remaining:
                Remaining Estimate - 72h
                72h
                Logged:
                Time Spent - Not Specified
                Not Specified