Kafka
  1. Kafka
  2. KAFKA-946

Kafka Hadoop Consumer fails when verifying message checksum

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: contrib
    • Labels:
      None

      Description

      The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:

      protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
      if (_messageIt != null && _messageIt.hasNext())

      { MessageAndOffset messageAndOffset = _messageIt.next(); ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); key.set(_index, _offset, messageAndOffset.message().checksum()); _offset = messageAndOffset.nextOffset(); //increase offset _count ++; //increase count return true; }

      else return false;
      }

      Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:

      @Override
      public void map(KafkaETLKey key, BytesWritable val,
      OutputCollector<LongWritable, Text> collector,
      Reporter reporter) throws IOException {

      byte[] bytes = KafkaETLUtils.getBytes(val);

      //check the checksum of message
      Message message = new Message(bytes);
      long checksum = key.getChecksum();
      if (checksum != message.checksum())
      throw new IOException ("Invalid message checksum "
      + message.checksum() + ". Expected " + key + ".");

      the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...

        Activity

        Sam Meder created issue -
        Sam Meder made changes -
        Field Original Value New Value
        Attachment hadoop_consumer.patch [ 12588428 ]
        Hide
        Sam Meder added a comment -

        Attached a patch that simply passes the full message buffer instead of just the payload...

        Show
        Sam Meder added a comment - Attached a patch that simply passes the full message buffer instead of just the payload...
        Sam Meder made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jay Kreps added a comment -

        Ack, well that's broken.

        Richard--is this a reasonable thing to do?
        Jun--any concern with this going in 0.8?

        A meta question is how to handle the poor state of maintenance of this Hadoop code. Started a discussion on that on the mailing list...

        Show
        Jay Kreps added a comment - Ack, well that's broken. Richard--is this a reasonable thing to do? Jun--any concern with this going in 0.8? A meta question is how to handle the poor state of maintenance of this Hadoop code. Started a discussion on that on the mailing list...
        Hide
        Jay Kreps added a comment -

        From Richard:

        Sorry, It took a while to remember the context.
        In a Kafka Message, the checksum is created on the whole message: header and payload included.

        The contrib code passes only the Message payload to the mapper, and not the whole buffer. I believe the reason for this is that we wanted to pass just the message data (not any of the kafka special bits) for the mapper handle. The Message that is created in the SimpleKafkaETLMapper is then creating using the incorrect payload bytes. It can be argued that this is desirable. For instance, Mappers can decode the byte buffer directly into Avro without stripping away the header or dealing with kafka Messages at all.

        Also, changing the KafakETLContext code could be affect a lot of users. This is definitely not a backwards compatible change. It can also be argued that the BytesWriteable only contains the payload code, and that checksum-ing of the message should've occurred well before the Mapper gets the message.

        However, I think that Sam's fix still has merit. It would be good for the KafkaETLContext to pass the Message buffer instead of the payload and the RecordReader could strip away the kafka bits before giving the payload to the Mapper. Perhaps put in a config switch to either get just payload or the whole kafka message buffer?

        Additional thoughts:
        I assume there are plenty of users of this code. If there's anyone who uses the KafkaETLContext directly, they'll find the patch's changes to break their stuff. However, for those who are using KafkaETLContext through the KafkaETLRecordReader (as they should), then there is a way to make it backwards compatible.

        The checksumming and payload stripping code could go into the RecordReader rather than the KafkaETLContext.

        If the scope of these changes are too big, I'd just fix the SimpleKafkaETLMapper to not parse the payload bytes.

        Show
        Jay Kreps added a comment - From Richard: Sorry, It took a while to remember the context. In a Kafka Message, the checksum is created on the whole message: header and payload included. The contrib code passes only the Message payload to the mapper, and not the whole buffer. I believe the reason for this is that we wanted to pass just the message data (not any of the kafka special bits) for the mapper handle. The Message that is created in the SimpleKafkaETLMapper is then creating using the incorrect payload bytes. It can be argued that this is desirable. For instance, Mappers can decode the byte buffer directly into Avro without stripping away the header or dealing with kafka Messages at all. Also, changing the KafakETLContext code could be affect a lot of users. This is definitely not a backwards compatible change. It can also be argued that the BytesWriteable only contains the payload code, and that checksum-ing of the message should've occurred well before the Mapper gets the message. However, I think that Sam's fix still has merit. It would be good for the KafkaETLContext to pass the Message buffer instead of the payload and the RecordReader could strip away the kafka bits before giving the payload to the Mapper. Perhaps put in a config switch to either get just payload or the whole kafka message buffer? Additional thoughts: I assume there are plenty of users of this code. If there's anyone who uses the KafkaETLContext directly, they'll find the patch's changes to break their stuff. However, for those who are using KafkaETLContext through the KafkaETLRecordReader (as they should), then there is a way to make it backwards compatible. The checksumming and payload stripping code could go into the RecordReader rather than the KafkaETLContext. If the scope of these changes are too big, I'd just fix the SimpleKafkaETLMapper to not parse the payload bytes.
        Jun Rao made changes -
        Fix Version/s 0.8 [ 12317244 ]
        Hide
        Jun Rao added a comment -

        Sorry for not looking at this earlier. I think that we can include the fix in 0.8 since it's simple enough. It doesn't apply to 0.8 though. Could you rebase?

        Show
        Jun Rao added a comment - Sorry for not looking at this earlier. I think that we can include the fix in 0.8 since it's simple enough. It doesn't apply to 0.8 though. Could you rebase?
        Sam Meder made changes -
        Attachment hadoop_consumer_1.patch [ 12604734 ]
        Sam Meder made changes -
        Attachment hadoop_consumer.patch [ 12588428 ]
        Hide
        Sam Meder added a comment -

        Rebased patch attached.

        Show
        Sam Meder added a comment - Rebased patch attached.
        Hide
        Jun Rao added a comment -

        Thanks for the rebased patch. +1 and committed to 0.8.

        Show
        Jun Rao added a comment - Thanks for the rebased patch. +1 and committed to 0.8.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Assignee Sam Meder [ smeder ]
        Resolution Fixed [ 1 ]
        Jun Rao made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Sam Meder
            Reporter:
            Sam Meder
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development