diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java index a381ccd..6eea635 100644 --- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java +++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java @@ -17,6 +17,7 @@ package kafka.bridge.hadoop; import java.io.IOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import kafka.javaapi.producer.Producer; @@ -62,7 +63,9 @@ public class KafkaRecordWriter extends RecordWriter if (value instanceof byte[]) valBytes = (byte[]) value; else if (value instanceof BytesWritable) - valBytes = ((BytesWritable) value).getBytes(); + // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the + // intended size of the byte array contained. We need to use BytesWritable.getLength for the true size. + valBytes = Arrays.copyOf(((BytesWritable) value).getBytes(), ((BytesWritable) value).getLength()); else throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");