diff --git clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3fc2a19..867f6a1 100644 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -435,7 +435,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.offset(), serializedKey, serializedValue, callback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 75cd51e..782f92b 100644 --- clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -14,7 +14,7 @@ package org.apache.kafka.clients.producer; /** * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional - * partition number, and an optional key and value. + * partition number, an optional offset, and an optional key and value. *

* If a valid partition number is specified that partition will be used when sending the record. If no partition is * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is @@ -24,6 +24,7 @@ public final class ProducerRecord { private final String topic; private final Integer partition; + private final long offset; private final K key; private final V value; @@ -32,19 +33,33 @@ public final class ProducerRecord { * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent + * @param offset The expected offset for the record, or -1 to leave unspecified * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, Integer partition, K key, V value) { + public ProducerRecord(String topic, Integer partition, long offset, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.topic = topic; this.partition = partition; + this.offset = offset; this.key = key; this.value = value; } /** + * Creates a record to be sent to a specified topic and partition + * + * @param topic The topic the record will be appended to + * @param partition The partition to which the record should be sent + * @param key The key that will be included in the record + * @param value The record contents + */ + public ProducerRecord(String topic, Integer partition, K key, V value) { + this(topic, partition, -1, key, value); + } + + /** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to @@ -93,11 +108,18 @@ public final class ProducerRecord { return partition; } + /** + * The expected offset for this record (or -1 if no expected offset + */ + public long offset() { + return offset; + } + @Override public String toString() { String key = this.key == null ? "null" : this.key.toString(); String value = this.value == null ? "null" : this.value.toString(); - return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value; + return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", offset=" + offset + ", key=" + key + ", value=" + value; } @Override @@ -115,6 +137,8 @@ public final class ProducerRecord { return false; else if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false; + else if (offset != that.offset) + return false; else if (value != null ? !value.equals(that.value) : that.value != null) return false; @@ -125,6 +149,7 @@ public final class ProducerRecord { public int hashCode() { int result = topic != null ? topic.hashCode() : 0; result = 31 * result + (partition != null ? partition.hashCode() : 0); + result = 31 * result + (int) offset; result = 31 * result + (key != null ? key.hashCode() : 0); result = 31 * result + (value != null ? value.hashCode() : 0); return result; diff --git clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4b394f9..ef12a3e 100644 --- clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -151,7 +151,7 @@ public final class RecordAccumulator { * @param callback The user-supplied callback to execute when the request is complete * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, long offset, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -163,7 +163,7 @@ public final class RecordAccumulator { synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); + FutureRecordMetadata future = last.tryAppend(offset, key, value, callback, time.milliseconds()); if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } @@ -179,7 +179,7 @@ public final class RecordAccumulator { throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); + FutureRecordMetadata future = last.tryAppend(offset, key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); @@ -188,7 +188,7 @@ public final class RecordAccumulator { } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); - FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); + FutureRecordMetadata future = Utils.notNull(batch.tryAppend(offset, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); diff --git clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 3f18582..11e6335 100644 --- clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -62,11 +62,11 @@ public final class RecordBatch { * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ - public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) { + public FutureRecordMetadata tryAppend(long offset, byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) { return null; } else { - this.records.append(0L, key, value); + this.records.append(offset, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); diff --git clients/src/main/java/org/apache/kafka/common/protocol/Errors.java clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2667bc8..55b6d01 100644 --- clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.ExpectedOffsetMismatchException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +122,9 @@ public enum Errors { GROUP_AUTHORIZATION_FAILED(30, new GroupAuthorizationException("Group authorization failed.")), CLUSTER_AUTHORIZATION_FAILED(31, - new ClusterAuthorizationException("Cluster authorization failed.")); + new ClusterAuthorizationException("Cluster authorization failed.")), + EXPECTED_OFFSET_MISMATCH(32, + new ExpectedOffsetMismatchException("Expected offset does not match assigned offset.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 723e450..3e63bc1 100644 --- clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -77,10 +77,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -99,7 +99,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, metrics, time); - accum.append(tp1, key, new byte[2 * batchSize], null, maxBlockTimeMs); + accum.append(tp1, -1L, key, new byte[2 * batchSize], null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -107,7 +107,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time); - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -129,7 +129,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, null, maxBlockTimeMs); + accum.append(tp, -1L, key, value, null, maxBlockTimeMs); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -150,7 +150,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, null, maxBlockTimeMs); + accum.append(new TopicPartition(topic, i % numParts), -1L, key, value, null, maxBlockTimeMs); } catch (Exception e) { e.printStackTrace(); } @@ -189,7 +189,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -198,14 +198,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, null, maxBlockTimeMs); + accum.append(tp3, -1L, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, null, maxBlockTimeMs); + accum.append(tp2, -1L, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -220,7 +220,7 @@ public class RecordAccumulatorTest { final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); long now = time.milliseconds(); - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -232,7 +232,7 @@ public class RecordAccumulatorTest { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, key, value, null, maxBlockTimeMs); + accum.append(tp2, -1L, key, value, null, maxBlockTimeMs); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); @@ -256,7 +256,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, metrics, time); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, null, maxBlockTimeMs); + accum.append(new TopicPartition(topic, i % 3), -1L, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -287,7 +287,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback(), maxBlockTimeMs); + accum.append(new TopicPartition(topic, i % 3), -1L, key, value, new TestCallback(), maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); @@ -304,12 +304,12 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, null, maxBlockTimeMs); + accum.append(tp1, -1L, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } time.sleep(2000); accum.ready(cluster, now); - accum.append(tp1, key, value, null, 0); + accum.append(tp1, -1L, key, value, null, 0); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); Cluster cluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); diff --git clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 14a839b..401988e 100644 --- clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -93,7 +93,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + Future future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -112,7 +112,7 @@ public class SenderTest { public void testQuotaMetrics() throws Exception { final long offset = 0; for (int i = 1; i <= 3; i++) { - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + Future future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request client.respond(produceResponse(tp, offset, Errors.NONE.code(), 100 * i)); sender.run(time.milliseconds()); @@ -141,7 +141,7 @@ public class SenderTest { "clientId", REQUEST_TIMEOUT); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + Future future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request String id = client.requests().peek().request().destination(); @@ -162,7 +162,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + future = accumulator.append(tp, -1L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala index e20b88c..8b85562 100644 --- core/src/main/scala/kafka/common/ErrorMapping.scala +++ core/src/main/scala/kafka/common/ErrorMapping.scala @@ -18,9 +18,8 @@ package kafka.common import java.nio.ByteBuffer - import kafka.message.InvalidMessageException - +import org.apache.kafka.common.errors.ExpectedOffsetMismatchException import scala.Predef._ /** @@ -62,6 +61,7 @@ object ErrorMapping { val TopicAuthorizationCode: Short = 29 val GroupAuthorizationCode: Short = 30 val ClusterAuthorizationCode: Short = 31 + val ExpectedOffsetMismatchCode: Short = 32 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -84,6 +84,7 @@ object ErrorMapping { classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode, classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode, classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode, + classOf[ExpectedOffsetMismatchException].asInstanceOf[Class[Throwable]] -> ExpectedOffsetMismatchCode, classOf[TopicAuthorizationException].asInstanceOf[Class[Throwable]] -> TopicAuthorizationCode, classOf[GroupAuthorizationException].asInstanceOf[Class[Throwable]] -> GroupAuthorizationCode, classOf[ClusterAuthorizationException].asInstanceOf[Class[Throwable]] -> ClusterAuthorizationCode diff --git core/src/main/scala/kafka/log/Log.scala core/src/main/scala/kafka/log/Log.scala index 32c194d..9ae9f54 100644 --- core/src/main/scala/kafka/log/Log.scala +++ core/src/main/scala/kafka/log/Log.scala @@ -327,7 +327,7 @@ class Log(val dir: File, val offset = new AtomicLong(nextOffsetMetadata.messageOffset) try { validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config - .compact) + .compact, config.checkExpectedOffsets) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } diff --git core/src/main/scala/kafka/log/LogConfig.scala core/src/main/scala/kafka/log/LogConfig.scala index 7fc7e33..6246502 100755 --- core/src/main/scala/kafka/log/LogConfig.scala +++ core/src/main/scala/kafka/log/LogConfig.scala @@ -44,6 +44,7 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable + val CheckExpectedOffsets = false } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -69,6 +70,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) + val checkExpectedOffsets = getBoolean(LogConfig.CheckExpectedOffsetsProp) def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -101,6 +103,7 @@ object LogConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" val PreAllocateEnableProp = "preallocate" + val CheckExpectedOffsetsProp = "check.expected.offsets" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -158,6 +161,7 @@ object LogConfig { .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc) + .define(CheckExpectedOffsetsProp, BOOLEAN, Defaults.CheckExpectedOffsets, LOW, "???") } def apply(): LogConfig = LogConfig(new Properties()) diff --git core/src/main/scala/kafka/message/ByteBufferMessageSet.scala core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 5a32de8..ad6f45d 100644 --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -18,7 +18,7 @@ package kafka.message import kafka.utils.{IteratorTemplate, Logging} -import kafka.common.KafkaException +import kafka.common.{ExpectedOffsetMismatchException, KafkaException} import java.nio.ByteBuffer import java.nio.channels._ @@ -27,13 +27,13 @@ import java.util.concurrent.atomic.AtomicLong object ByteBufferMessageSet { - private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { + private def create(offsetCounter: () => Long, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = { if(messages.size == 0) { MessageSet.Empty.buffer } else if(compressionCodec == NoCompressionCodec) { val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) for(message <- messages) - writeMessage(buffer, message, offsetCounter.getAndIncrement) + writeMessage(buffer, message, offsetCounter()) buffer.rewind() buffer } else { @@ -43,7 +43,7 @@ object ByteBufferMessageSet { val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) try { for (message <- messages) { - offset = offsetCounter.getAndIncrement + offset = offsetCounter() output.writeLong(offset) output.writeInt(message.size) output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) @@ -125,15 +125,15 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi private var shallowValidByteCount = -1 def this(compressionCodec: CompressionCodec, messages: Message*) { - this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*)) + this(ByteBufferMessageSet.create(() => -1, compressionCodec, messages:_*)) } def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) { - this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*)) + this(ByteBufferMessageSet.create(() => offsetCounter.getAndIncrement(), compressionCodec, messages:_*)) } def this(messages: Message*) { - this(NoCompressionCodec, new AtomicLong(0), messages: _*) + this(NoCompressionCodec, messages: _*) } def getBuffer = buffer @@ -232,14 +232,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, - compactedTopic: Boolean = false): ByteBufferMessageSet = { + compactedTopic: Boolean = false, + checkExpectedOffsets: Boolean = false): ByteBufferMessageSet = { if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // do in-place validation and offset assignment var messagePosition = 0 buffer.mark() while(messagePosition < sizeInBytes - MessageSet.LogOverhead) { + val expectedOffset = buffer.getLong(messagePosition) + val assignedOffset = offsetCounter.getAndIncrement() + if (checkExpectedOffsets && expectedOffset != -1 && expectedOffset != assignedOffset) + throw new ExpectedOffsetMismatchException("Expected offset " + expectedOffset + " did not match assigned offset " + assignedOffset) buffer.position(messagePosition) - buffer.putLong(offsetCounter.getAndIncrement()) + buffer.putLong(assignedOffset) val messageSize = buffer.getInt() val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength if (compactedTopic && positionAfterKeySize < sizeInBytes) { @@ -255,6 +260,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi buffer.reset() this } else { + var assignedOffset = offsetCounter.get() // We need to deep-iterate over the message-set if any of these are true: // (i) messages are compressed // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec @@ -262,6 +268,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi if (compactedTopic && !messageAndOffset.message.hasKey) throw new InvalidMessageException("Compacted topic cannot accept message without key.") + if (checkExpectedOffsets && messageAndOffset.offset != -1 && messageAndOffset.offset != assignedOffset) + throw new ExpectedOffsetMismatchException("Expected offset " + messageAndOffset.offset + " did not match assigned offset " + assignedOffset) + + assignedOffset += 1 + messageAndOffset.message }) new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*) diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala index 5b1276e..0b2521b 100644 --- core/src/main/scala/kafka/server/ReplicaManager.scala +++ core/src/main/scala/kafka/server/ReplicaManager.scala @@ -440,6 +440,8 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) case imse: CorruptRecordException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) + case e: ExpectedOffsetMismatchException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() diff --git core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index 95085f4..a9a66dc 100644 --- core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -31,7 +31,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases { def createMessageSet(messages: Seq[Message]): FileMessageSet = { val set = new FileMessageSet(tempFile()) - set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) + set.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0L), messages: _*)) set.flush() set } diff --git core/src/test/scala/unit/kafka/log/LogConfigTest.scala core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 51cd62c..398aa2b 100644 --- core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -61,6 +61,7 @@ class LogConfigTest { case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") + case LogConfig.CheckExpectedOffsetsProp => assertPropertyInvalid(name, "not_a_boolean") case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") } }) diff --git core/src/test/scala/unit/kafka/log/LogTest.scala core/src/test/scala/unit/kafka/log/LogTest.scala index 47908e7..139659d 100755 --- core/src/test/scala/unit/kafka/log/LogTest.scala +++ core/src/test/scala/unit/kafka/log/LogTest.scala @@ -20,7 +20,7 @@ package kafka.log import java.io._ import java.util.Properties import java.util.concurrent.atomic._ -import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException} +import org.apache.kafka.common.errors.{ExpectedOffsetMismatchException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, CorruptRecordException} import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} @@ -154,6 +154,42 @@ class LogTest extends JUnitSuite { } /** + * Test that appending handles expected / mismatched offsets correctly. + */ + @Test + def testAppendMismatchedOffsets() { + + val logProps = new Properties() + logProps.put(LogConfig.CheckExpectedOffsetsProp, true: java.lang.Boolean) + + // create a log + val log = new Log(logDir, + LogConfig(logProps), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + + // append with no expected offset should succeed + val first = new ByteBufferMessageSet(NoCompressionCodec, new Message("test0".getBytes())) + + log.append(first) + + // append with matching expected offset should succeed + val second = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(1), new Message("test1".getBytes())) + + log.append(second) + + // append with mismatching expected offset should complain + try { + val third = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(7L), new Message("test2".getBytes())) + log.append(third) + fail("Appending a mismatched offset ought to fail!") + } catch { + case _: ExpectedOffsetMismatchException => {} + } + } + + /** * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets. */ @Test diff --git core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 511060e..f815a15 100644 --- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -20,6 +20,7 @@ package kafka.message import java.nio._ import java.util.concurrent.atomic.AtomicLong import org.junit.Assert._ +import kafka.common.ExpectedOffsetMismatchException import org.junit.Test import kafka.utils.TestUtils @@ -145,21 +146,73 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { val compressedMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages.map(_.message).toBuffer:_*) // check uncompressed offsets - checkOffsets(messages, 0) - var offset = 1234567 + checkOffsets(messages, -1) + val offset = 1234567 checkOffsets(messages.validateMessagesAndAssignOffsets(new AtomicLong(offset), NoCompressionCodec, NoCompressionCodec), offset) // check compressed messages - checkOffsets(compressedMessages, 0) + checkOffsets(compressedMessages, -1) checkOffsets(compressedMessages.validateMessagesAndAssignOffsets(new AtomicLong(offset), DefaultCompressionCodec, DefaultCompressionCodec), offset) } + @Test + def testOffsetAssignmentWithExpectedValues(): Unit = { + + // test without expected offsets + { + val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("test0".getBytes)) + + val validated = messages.validateMessagesAndAssignOffsets( + new AtomicLong(0L), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + checkExpectedOffsets = true + ) + + checkOffsets(validated, 0) + } + + // test with expected offsets that start at 0 + { + val messages = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), new Message("test0".getBytes)) + + val validated = messages.validateMessagesAndAssignOffsets( + new AtomicLong(0L), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + checkExpectedOffsets = true + ) + + checkOffsets(validated, 0) + } + + // test with expected offsets that don't match + { + val messages = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(7), new Message("test0".getBytes)) + + try { + messages.validateMessagesAndAssignOffsets( + new AtomicLong(0L), + sourceCodec = NoCompressionCodec, + targetCodec = NoCompressionCodec, + checkExpectedOffsets = true + ) + fail("Validation should fail when offsets don't match!") + } catch { + case _: ExpectedOffsetMismatchException => {} + } + } + + + + } + /* check that offsets are assigned based on byte offset from the given base offset */ def checkOffsets(messages: ByteBufferMessageSet, baseOffset: Long) { var offset = baseOffset for(entry <- messages) { assertEquals("Unexpected offset in message set iterator", offset, entry.offset) - offset += 1 + if (offset != -1) offset += 1 } }