diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 709a609..417b4b3 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -44,7 +44,7 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
    *  We set the default to a million bytes so that the server will not reject the batch of messages
    *  with a MessageSizeTooLargeException. The actual size will be smaller after compression.
    */
-  public static final int KAFKA_QUEUE_SIZE = 1000000;
+  public static final int KAFKA_QUEUE_BYTES = 1000000;
 
   public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
   private static final Map<String, String> kafkaConfigMap;
@@ -119,8 +119,7 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
     }
 
     // KafkaOutputFormat specific parameters
-    final int queueSize = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.size", KAFKA_QUEUE_SIZE);
-    job.setInt(KAFKA_CONFIG_PREFIX + ".queue.size", queueSize);
+    final int queueBytes = job.getInt(KAFKA_CONFIG_PREFIX + ".queue.bytes", KAFKA_QUEUE_BYTES);
 
     if (uri.getScheme().equals("kafka")) {
       // using the direct broker list
@@ -140,6 +139,6 @@ public class KafkaOutputFormat<K, V> extends OutputFormat<K, V>
       throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
 
     Producer<Object, byte[]> producer = new Producer<Object, byte[]>(new ProducerConfig(props));
-    return new KafkaRecordWriter<K, V>(producer, topic, queueSize);
+    return new KafkaRecordWriter<K, V>(producer, topic, queueBytes);
   }
 }
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index 6eea635..1704eb3 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -32,14 +32,14 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
   protected String topic;
 
   protected List<KeyedMessage<Object, byte[]>> msgList = new LinkedList<KeyedMessage<Object, byte[]>>();
-  protected int totalSize = 0;
-  protected int queueSize;
+  protected int totalBytes = 0;
+  protected int queueBytes;
 
-  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueSize)
+  public KafkaRecordWriter(Producer<Object, byte[]> producer, String topic, int queueBytes)
   {
     this.producer = producer;
     this.topic = topic;
-    this.queueSize = queueSize;
+    this.queueBytes = queueBytes;
   }
 
   protected void sendMsgList() throws IOException
@@ -52,7 +52,7 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
         throw new IOException(e);           // all Kafka exceptions become IOExceptions
       }
       msgList.clear();
-      totalSize = 0;
+      totalBytes = 0;
     }
   }
 
@@ -69,12 +69,15 @@ public class KafkaRecordWriter<K,V> extends RecordWriter<K,V>
     else
       throw new IllegalArgumentException("KafkaRecordWriter expects byte array value to publish");
 
-    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
-    totalSize += valBytes.length;
+    totalBytes += valBytes.length;
 
     // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch
-    if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE)
+    // If the new message is going to make the message list tip over 1 million bytes, send the
+    // message list now.
+    if (totalBytes > queueBytes || msgList.size() >= Short.MAX_VALUE)
       sendMsgList();
+
+    msgList.add(new KeyedMessage<Object, byte[]>(this.topic, key, valBytes));
   }
 
   @Override
