diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index af9c650..405a898 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -18,6 +18,7 @@ package kafka.bridge.hadoop;
 
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import kafka.javaapi.producer.Producer;
@@ -56,7 +57,10 @@ public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<Nul
   @Override
   public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException
   {
-    Message msg = new Message(value.getBytes());
+    // BytesWritable.getBytes returns its internal buffer, so .length would refer to its capacity, not the
+    // intended size of the byte array contained.  We need to use BytesWritable.getLength for the true size.
+    Message msg = new Message(Arrays.copyOf(value.getBytes(), value.getLength()));
+
     msgList.add(new ProducerData<Integer, Message>(this.topic, msg));
     totalSize += msg.size();
 
