Description
The code tries to verify the checksum, but fails because the data available isn't the same. In KafkaETLContext:
protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext())
else return false;
}
Note that the message payload is used and the message checksum is included in the key. The in SimpleKafkaETLMapper:
@Override
public void map(KafkaETLKey key, BytesWritable val,
OutputCollector<LongWritable, Text> collector,
Reporter reporter) throws IOException {
byte[] bytes = KafkaETLUtils.getBytes(val);
//check the checksum of message
Message message = new Message(bytes);
long checksum = key.getChecksum();
if (checksum != message.checksum())
throw new IOException ("Invalid message checksum "
+ message.checksum() + ". Expected " + key + ".");
the Message object is initialized with the payload bytes and a new checksum is calculated. The problem is that the original message checksum also contains the key so checksum verification fails...