From c84832d83ba029176ff760244749e29c54828e89 Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sat, 13 Jun 2015 18:32:43 -0400
Subject: [PATCH 1/6] Use -1 as sigil value for normal publish

---
 .../java/org/apache/kafka/clients/producer/internals/RecordBatch.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 06182db..8cee0d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -61,7 +61,7 @@ public final class RecordBatch {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
-            this.records.append(0L, key, value);
+            this.records.append(-1L, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
-- 
2.4.4


From 9ef700bce517e1f8f23efe1528382058da3bc767 Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sat, 13 Jun 2015 23:38:40 -0400
Subject: [PATCH 2/6] Set all offsets in a message set to -1 when unspecified

---
 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala | 12 ++++++------
 core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala  |  2 +-
 .../scala/unit/kafka/message/ByteBufferMessageSetTest.scala  |  8 ++++----
 3 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 5a32de8..1968f5e 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -27,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong
 
 object ByteBufferMessageSet {
 
-  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
+  private def create(offsetCounter: () => Long, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
     if(messages.size == 0) {
       MessageSet.Empty.buffer
     } else if(compressionCodec == NoCompressionCodec) {
       val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
       for(message <- messages)
-        writeMessage(buffer, message, offsetCounter.getAndIncrement)
+        writeMessage(buffer, message, offsetCounter())
       buffer.rewind()
       buffer
     } else {
@@ -43,7 +43,7 @@ object ByteBufferMessageSet {
         val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
         try {
           for (message <- messages) {
-            offset = offsetCounter.getAndIncrement
+            offset = offsetCounter()
             output.writeLong(offset)
             output.writeInt(message.size)
             output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
@@ -125,15 +125,15 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   private var shallowValidByteCount = -1
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(() => -1, compressionCodec, messages:_*))
   }
 
   def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
-    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(() => offsetCounter.getAndIncrement(), compressionCodec, messages:_*))
   }
 
   def this(messages: Message*) {
-    this(NoCompressionCodec, new AtomicLong(0), messages: _*)
+    this(NoCompressionCodec, messages: _*)
   }
 
   def getBuffer = buffer
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 02cf668..829b9c3 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -31,7 +31,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   
   def createMessageSet(messages: Seq[Message]): FileMessageSet = {
     val set = new FileMessageSet(tempFile())
-    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+    set.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0L), messages: _*))
     set.flush()
     set
   }
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 07bc317..9cc7de3 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -145,12 +145,12 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     val compressedMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
                                                       messages = messages.map(_.message).toBuffer:_*)
     // check uncompressed offsets 
-    checkOffsets(messages, 0)
-    var offset = 1234567
+    checkOffsets(messages, -1)
+    val offset = 1234567
     checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset)
 
     // check compressed messages
-    checkOffsets(compressedMessages, 0)
+    checkOffsets(compressedMessages, -1)
     checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
   }
   
@@ -159,7 +159,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     var offset = baseOffset
     for(entry <- messages) {
       assertEquals("Unexpected offset in message set iterator", offset, entry.offset)
-      offset += 1
+      if (offset != -1) offset += 1
     }
   }
 
-- 
2.4.4


From 5a270bacd9030ea453b23ac12b92e6a89d91f0c1 Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sun, 12 Jul 2015 17:02:47 -0400
Subject: [PATCH 3/6] Allow producer to specify expected offset

---
 .../kafka/clients/producer/KafkaProducer.java      |  2 +-
 .../kafka/clients/producer/ProducerRecord.java     | 31 +++++++++++++++++++---
 .../producer/internals/RecordAccumulator.java      |  8 +++---
 .../clients/producer/internals/RecordBatch.java    |  4 +--
 .../producer/internals/RecordAccumulatorTest.java  | 26 +++++++++---------
 .../clients/producer/internals/SenderTest.java     |  6 ++---
 6 files changed, 51 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 03b8dd2..cf05a2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -385,7 +385,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.offset(), serializedKey, serializedValue, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 75cd51e..782f92b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -14,7 +14,7 @@ package org.apache.kafka.clients.producer;
 
 /**
  * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
- * partition number, and an optional key and value.
+ * partition number, an optional offset, and an optional key and value.
  * <p>
  * If a valid partition number is specified that partition will be used when sending the record. If no partition is
  * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
@@ -24,6 +24,7 @@ public final class ProducerRecord<K, V> {
 
     private final String topic;
     private final Integer partition;
+    private final long offset;
     private final K key;
     private final V value;
 
@@ -32,19 +33,33 @@ public final class ProducerRecord<K, V> {
      * 
      * @param topic The topic the record will be appended to
      * @param partition The partition to which the record should be sent
+     * @param offset The expected offset for the record, or -1 to leave unspecified
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Integer partition, K key, V value) {
+    public ProducerRecord(String topic, Integer partition, long offset, K key, V value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
         this.partition = partition;
+        this.offset = offset;
         this.key = key;
         this.value = value;
     }
 
     /**
+     * Creates a record to be sent to a specified topic and partition
+     * 
+     * @param topic The topic the record will be appended to
+     * @param partition The partition to which the record should be sent
+     * @param key The key that will be included in the record
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Integer partition, K key, V value) {
+        this(topic, partition, -1, key, value);
+    }
+
+    /**
      * Create a record to be sent to Kafka
      * 
      * @param topic The topic the record will be appended to
@@ -93,11 +108,18 @@ public final class ProducerRecord<K, V> {
         return partition;
     }
 
+    /**
+     * The expected offset for this record (or -1 if no expected offset
+     */
+    public long offset() {
+        return offset;
+    }
+
     @Override
     public String toString() {
         String key = this.key == null ? "null" : this.key.toString();
         String value = this.value == null ? "null" : this.value.toString();
-        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
+        return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", offset=" + offset + ", key=" + key + ", value=" + value;
     }
 
     @Override
@@ -115,6 +137,8 @@ public final class ProducerRecord<K, V> {
             return false;
         else if (topic != null ? !topic.equals(that.topic) : that.topic != null) 
             return false;
+        else if (offset != that.offset)
+            return false;
         else if (value != null ? !value.equals(that.value) : that.value != null) 
             return false;
 
@@ -125,6 +149,7 @@ public final class ProducerRecord<K, V> {
     public int hashCode() {
         int result = topic != null ? topic.hashCode() : 0;
         result = 31 * result + (partition != null ? partition.hashCode() : 0);
+        result = 31 * result + (int) offset;
         result = 31 * result + (key != null ? key.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
         return result;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a152bd7..0d5adc0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -154,7 +154,7 @@ public final class RecordAccumulator {
      * @param value The value for the record
      * @param callback The user-supplied callback to execute when the request is complete
      */
-    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, long offset, byte[] key, byte[] value, Callback callback) throws InterruptedException {
         // We keep track of the number of appending thread to make sure we do not miss batches in
         // abortIncompleteBatches().
         appendsInProgress.incrementAndGet();
@@ -166,7 +166,7 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    FutureRecordMetadata future = last.tryAppend(offset, key, value, callback);
                     if (future != null)
                         return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -182,7 +182,7 @@ public final class RecordAccumulator {
                     throw new IllegalStateException("Cannot send after the producer is closed.");
                 RecordBatch last = dq.peekLast();
                 if (last != null) {
-                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    FutureRecordMetadata future = last.tryAppend(offset, key, value, callback);
                     if (future != null) {
                         // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                         free.deallocate(buffer);
@@ -191,7 +191,7 @@ public final class RecordAccumulator {
                 }
                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(offset, key, value, callback));
 
                 dq.addLast(batch);
                 incomplete.add(batch);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 8cee0d8..e464e7e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -57,11 +57,11 @@ public final class RecordBatch {
      * 
      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
+    public FutureRecordMetadata tryAppend(long offset, byte[] key, byte[] value, Callback callback) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
-            this.records.append(-1L, key, value);
+            this.records.append(offset, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5b2e4ff..8c485b1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -70,10 +70,10 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time,  metricTags);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, null);
+            accum.append(tp1, -1L, key, value, null);
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
-        accum.append(tp1, key, value, null);
+        accum.append(tp1, -1L, key, value, null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
@@ -92,7 +92,7 @@ public class RecordAccumulatorTest {
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], null);
+        accum.append(tp1, -1, key, new byte[2 * batchSize], null);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
@@ -100,7 +100,7 @@ public class RecordAccumulatorTest {
     public void testLinger() throws Exception {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, value, null);
+        accum.append(tp1, -1L, key, value, null);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -122,7 +122,7 @@ public class RecordAccumulatorTest {
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, null);
+                accum.append(tp, -1L, key, value, null);
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -143,7 +143,7 @@ public class RecordAccumulatorTest {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value, null);
+                            accum.append(new TopicPartition(topic, i % numParts), -1L, key, value, null);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -183,7 +183,7 @@ public class RecordAccumulatorTest {
 
         // Partition on node1 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, null);
+            accum.append(tp1, -1L, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@@ -192,14 +192,14 @@ public class RecordAccumulatorTest {
 
         // Add partition on node2 only
         for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, null);
+            accum.append(tp3, -1L, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
         for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, null);
+            accum.append(tp2, -1L, key, value, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@@ -214,7 +214,7 @@ public class RecordAccumulatorTest {
         final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags);
 
         long now = time.milliseconds();
-        accum.append(tp1, key, value, null);
+        accum.append(tp1, -1L, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
         Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@@ -226,7 +226,7 @@ public class RecordAccumulatorTest {
         accum.reenqueue(batches.get(0).get(0), now);
 
         // Put message for partition 1 into accumulator
-        accum.append(tp2, key, value, null);
+        accum.append(tp2, -1L, key, value, null);
         result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
 
@@ -250,7 +250,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE;
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, null);
+            accum.append(new TopicPartition(topic, i % 3), -1L, key, value, null);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
         
@@ -281,7 +281,7 @@ public class RecordAccumulatorTest {
             }
         }
         for (int i = 0; i < 100; i++)
-            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback());
+            accum.append(new TopicPartition(topic, i % 3), -1L, key, value, new TestCallback());
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
         assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8b1805d..07e737e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -72,7 +72,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -99,7 +99,7 @@ public class SenderTest {
                                    time,
                                    "clientId");
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+        Future<RecordMetadata> future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -116,7 +116,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future;
+        future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             client.disconnect(client.requests().peek().request().destination());
-- 
2.4.4


From 236885c894a9e96559b73cbeef91636d2cd96c7f Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sat, 13 Jun 2015 21:55:30 -0400
Subject: [PATCH 4/6] Add new error code and exceptions for offset-check
 failure

---
 .../errors/ExpectedOffsetMismatchException.java    | 38 ++++++++++++++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 ++-
 .../src/main/scala/kafka/common/ErrorMapping.scala |  7 +++-
 .../common/ExpectedOffsetMismatchException.scala   | 22 +++++++++++++
 4 files changed, 69 insertions(+), 2 deletions(-)
 create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/ExpectedOffsetMismatchException.java
 create mode 100644 core/src/main/scala/kafka/common/ExpectedOffsetMismatchException.scala

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ExpectedOffsetMismatchException.java b/clients/src/main/java/org/apache/kafka/common/errors/ExpectedOffsetMismatchException.java
new file mode 100644
index 0000000..f4d5d06
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ExpectedOffsetMismatchException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The expected offset didn't match the actual offset.
+ */
+public class ExpectedOffsetMismatchException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExpectedOffsetMismatchException() {
+        super("Expected offset doesn't match current offset!");
+    }
+
+    public ExpectedOffsetMismatchException(String message) {
+        super(message);
+    }
+
+    public ExpectedOffsetMismatchException(Throwable throwable) {
+        super(throwable);
+    }
+
+    public ExpectedOffsetMismatchException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 4c0ecc3..3ee9ceb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -81,7 +81,9 @@ public enum Errors {
     COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
             new ApiException("Some of the committing partitions are not assigned the committer")),
     INVALID_COMMIT_OFFSET_SIZE(28,
-            new ApiException("The committing offset data size is not valid"));
+            new ApiException("The committing offset data size is not valid")),
+    EXPECTED_OFFSET_MISMATCH(29,
+            new ExpectedOffsetMismatchException("Expected offset does not match assigned offset."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c75c685..40b5123 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -19,6 +19,9 @@ package kafka.common
 
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
+
+import org.apache.kafka.common.errors.ExpectedOffsetMismatchException
+
 import scala.Predef._
 
 /**
@@ -51,6 +54,7 @@ object ErrorMapping {
   val NotEnoughReplicasAfterAppendCode: Short = 20
   // 21: InvalidRequiredAcks
   // 22: IllegalConsumerGeneration
+  val ExpectedOffsetMismatchCode: Short = 29
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -72,7 +76,8 @@ object ErrorMapping {
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
-      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
+      classOf[ExpectedOffsetMismatchException].asInstanceOf[Class[Throwable]] -> ExpectedOffsetMismatchCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */
diff --git a/core/src/main/scala/kafka/common/ExpectedOffsetMismatchException.scala b/core/src/main/scala/kafka/common/ExpectedOffsetMismatchException.scala
new file mode 100644
index 0000000..28e4173
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ExpectedOffsetMismatchException.scala
@@ -0,0 +1,22 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+class ExpectedOffsetMismatchException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
-- 
2.4.4


From aca65ab780e134036226bfc608297cd8613ae97b Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sun, 12 Jul 2015 17:04:48 -0400
Subject: [PATCH 5/6] Add check-offset configuration option

---
 core/src/main/scala/kafka/log/LogConfig.scala          | 4 ++++
 core/src/test/scala/unit/kafka/log/LogConfigTest.scala | 1 +
 2 files changed, 5 insertions(+)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index fc41132..237b624 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -44,6 +44,7 @@ object Defaults {
   val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
   val CompressionType = kafka.server.Defaults.CompressionType
   val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
+  val CheckExpectedOffsets = false
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
@@ -66,6 +67,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
   val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
+  val checkExpectedOffsets = getBoolean(LogConfig.CheckExpectedOffsetsProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -98,6 +100,7 @@ object LogConfig {
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CompressionTypeProp = "compression.type"
   val PreAllocateEnableProp = "preallocate"
+  val CheckExpectedOffsetsProp = "check.expected.offsets"
 
   val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
   val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@@ -155,6 +158,7 @@ object LogConfig {
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
       .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
         MEDIUM, PreAllocateEnableDoc)
+      .define(CheckExpectedOffsetsProp, BOOLEAN, Defaults.CheckExpectedOffsets, LOW, "???")
   }
 
   def apply(): LogConfig = LogConfig(new Properties())
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 19dcb47..358e371 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -46,6 +46,7 @@ class LogConfigTest extends JUnit3Suite {
         case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
         case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
         case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case LogConfig.CheckExpectedOffsetsProp => expected.setProperty(name, nextBoolean().toString)
         case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
       }
     })
-- 
2.4.4


From de30fc2be8d66fc725df2a5441f8cf1b99b03e25 Mon Sep 17 00:00:00 2001
From: Ben Kirwin <ben@kirw.in>
Date: Sun, 12 Jul 2015 17:12:41 -0400
Subject: [PATCH 6/6] Expected offset check on server side

---
 core/src/main/scala/kafka/log/Log.scala            |  2 +-
 .../scala/kafka/message/ByteBufferMessageSet.scala | 17 +++++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 38 +++++++++++++++-
 .../kafka/message/ByteBufferMessageSetTest.scala   | 53 ++++++++++++++++++++++
 5 files changed, 107 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e5e8007..56ae302 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -324,7 +324,7 @@ class Log(val dir: File,
           // assign offsets to the message set
           val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
           try {
-            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact)
+            validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.checkExpectedOffsets)
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1968f5e..ad6f45d 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -18,7 +18,7 @@
 package kafka.message
 
 import kafka.utils.{IteratorTemplate, Logging}
-import kafka.common.KafkaException
+import kafka.common.{ExpectedOffsetMismatchException, KafkaException}
 
 import java.nio.ByteBuffer
 import java.nio.channels._
@@ -232,14 +232,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
                                                       sourceCodec: CompressionCodec,
                                                       targetCodec: CompressionCodec,
-                                                      compactedTopic: Boolean = false): ByteBufferMessageSet = {
+                                                      compactedTopic: Boolean = false,
+                                                      checkExpectedOffsets: Boolean = false): ByteBufferMessageSet = {
     if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // do in-place validation and offset assignment
       var messagePosition = 0
       buffer.mark()
       while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
+        val expectedOffset = buffer.getLong(messagePosition)
+        val assignedOffset = offsetCounter.getAndIncrement()
+        if (checkExpectedOffsets && expectedOffset != -1 && expectedOffset != assignedOffset)
+          throw new ExpectedOffsetMismatchException("Expected offset " + expectedOffset + " did not match assigned offset " + assignedOffset)
         buffer.position(messagePosition)
-        buffer.putLong(offsetCounter.getAndIncrement())
+        buffer.putLong(assignedOffset)
         val messageSize = buffer.getInt()
         val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
         if (compactedTopic && positionAfterKeySize < sizeInBytes) {
@@ -255,6 +260,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       buffer.reset()
       this
     } else {
+      var assignedOffset = offsetCounter.get()
       // We need to deep-iterate over the message-set if any of these are true:
       // (i) messages are compressed
       // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
@@ -262,6 +268,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         if (compactedTopic && !messageAndOffset.message.hasKey)
           throw new InvalidMessageException("Compacted topic cannot accept message without key.")
 
+        if (checkExpectedOffsets && messageAndOffset.offset != -1 && messageAndOffset.offset != assignedOffset)
+          throw new ExpectedOffsetMismatchException("Expected offset " + messageAndOffset.offset + " did not match assigned offset " + assignedOffset)
+
+        assignedOffset += 1
+
         messageAndOffset.message
       })
       new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 795220e..f7d4c05 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -410,6 +410,8 @@ class ReplicaManager(val config: KafkaConfig,
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
           case imse : InvalidMessageSizeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
+          case e: ExpectedOffsetMismatchException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9e26190..bb73f8c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -24,7 +24,7 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message._
-import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
+import kafka.common.{ExpectedOffsetMismatchException, MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
 import kafka.utils._
 import kafka.server.KafkaConfig
 
@@ -154,6 +154,42 @@ class LogTest extends JUnitSuite {
   }
 
   /**
+   * Test that appending handles expected / mismatched offsets correctly.
+   */
+  @Test
+  def testAppendMismatchedOffsets() {
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.CheckExpectedOffsetsProp, true: java.lang.Boolean)
+
+    // create a log
+    val log = new Log(logDir,
+      LogConfig(logProps),
+      recoveryPoint = 0L,
+      scheduler = time.scheduler,
+      time = time)
+
+    // append with no expected offset should succeed
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message("test0".getBytes()))
+
+    log.append(first)
+
+    // append with matching expected offset should succeed
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(1), new Message("test1".getBytes()))
+
+    log.append(second)
+
+    // append with mismatching expected offset should complain
+    try {
+      val third = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(7L), new Message("test2".getBytes()))
+      log.append(third)
+      fail("Appending a mismatched offset ought to fail!")
+    } catch {
+      case _: ExpectedOffsetMismatchException => {}
+    }
+  }
+
+  /**
    * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
    */
   @Test
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 9cc7de3..a8d0f86 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -20,6 +20,7 @@ package kafka.message
 import java.nio._
 import java.util.concurrent.atomic.AtomicLong
 import junit.framework.Assert._
+import kafka.common.ExpectedOffsetMismatchException
 import org.junit.Test
 import kafka.utils.TestUtils
 
@@ -154,6 +155,58 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset)
   }
   
+  @Test
+  def testOffsetAssignmentWithExpectedValues(): Unit = {
+
+    // test without expected offsets
+    {
+      val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("test0".getBytes))
+
+      val validated = messages.validateMessagesAndAssignOffsets(
+        new AtomicLong(0L),
+        sourceCodec = NoCompressionCodec,
+        targetCodec = NoCompressionCodec,
+        checkExpectedOffsets = true
+      )
+
+      checkOffsets(validated, 0)
+    }
+
+    // test with expected offsets that start at 0
+    {
+      val messages = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), new Message("test0".getBytes))
+
+      val validated = messages.validateMessagesAndAssignOffsets(
+        new AtomicLong(0L),
+        sourceCodec = NoCompressionCodec,
+        targetCodec = NoCompressionCodec,
+        checkExpectedOffsets = true
+      )
+
+      checkOffsets(validated, 0)
+    }
+
+    // test with expected offsets that don't match
+    {
+      val messages = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(7), new Message("test0".getBytes))
+
+      try {
+        messages.validateMessagesAndAssignOffsets(
+          new AtomicLong(0L),
+          sourceCodec = NoCompressionCodec,
+          targetCodec = NoCompressionCodec,
+          checkExpectedOffsets = true
+        )
+        fail("Validation should fail when offsets don't match!")
+      } catch {
+        case _: ExpectedOffsetMismatchException => {}
+      }
+    }
+
+
+
+  }
+
   /* check that offsets are assigned based on byte offset from the given base offset */
   def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) {
     var offset = baseOffset
-- 
2.4.4

