diff --git a/build.gradle b/build.gradle index 84fa0d6..7f1caa2 100644 --- a/build.gradle +++ b/build.gradle @@ -191,7 +191,7 @@ project(':core') { } tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) { - into "kafka_${baseScalaVersion}-${version}" + into "." compression = Compression.GZIP from(project.file("../bin")) { into "bin/" } from(project.file("../config")) { into "config/" } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bedd2a9..1ac6943 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -102,6 +102,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), + config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, new SystemTime()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index d8e35e7..307659c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -136,6 +136,12 @@ public class ProducerConfig extends AbstractConfig { public static final String MAX_RETRIES_CONFIG = "request.retries"; /** + * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids + * repeated sending-and-failing in a tight loop + */ + public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; + + /** * Should we register the Kafka metrics as JMX mbeans? */ public static final String ENABLE_JMX_CONFIG = "enable.jmx"; @@ -160,7 +166,8 @@ public class ProducerConfig extends AbstractConfig { .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") - .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 500L, atLeast(0L), "blah blah"); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 6990274..26887dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -51,9 +51,10 @@ public final class RecordAccumulator { private int drainIndex; private final int batchSize; private final long lingerMs; - private final ConcurrentMap> batches; + private final long retryBackoffMs; private final BufferPool free; private final Time time; + private final ConcurrentMap> batches; /** * Create a new record accumulator @@ -63,16 +64,19 @@ public final class RecordAccumulator { * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). + * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids + * exhausting all retries in a short period of time. * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of * memory * @param metrics The metrics * @param time The time instance to use */ - public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) { + public RecordAccumulator(int batchSize, long totalSize, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, Metrics metrics, Time time) { this.drainIndex = 0; this.closed = false; this.batchSize = batchSize; this.lingerMs = lingerMs; + this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull); this.time = time; @@ -155,6 +159,7 @@ public final class RecordAccumulator { */ public void reenqueue(RecordBatch batch, long now) { batch.attempts++; + batch.lastAttempt = now; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); @@ -181,6 +186,8 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { + boolean allowed = batch.attempts == 0 || batch.lastAttempt + retryBackoffMs <= now; + if (!allowed) continue; boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining(); boolean expired = now - batch.created >= lingerMs; if (full | expired | exhausted | closed) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index c7fbf3c..038a05a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -34,6 +34,7 @@ public final class RecordBatch { public int recordCount = 0; public volatile int attempts = 0; public final long created; + public long lastAttempt; public final MemoryRecords records; public final TopicPartition topicPartition; private final ProduceRequestResult produceFuture; @@ -41,6 +42,7 @@ public final class RecordBatch { public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { this.created = now; + this.lastAttempt = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7942623..37603c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; @@ -142,7 +143,7 @@ public class Sender implements Runnable { * The main run loop for the sender thread */ public void run() { - log.trace("Starting Kafka producer I/O thread."); + log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { @@ -153,7 +154,7 @@ public class Sender implements Runnable { } } - log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); + log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, @@ -170,7 +171,7 @@ public class Sender implements Runnable { // close all the connections this.selector.close(); - log.trace("Shutdown of Kafka producer I/O thread has completed."); + log.debug("Shutdown of Kafka producer I/O thread has completed."); } /** @@ -216,8 +217,8 @@ public class Sender implements Runnable { // handle responses, connections, and disconnections handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), now); - handleDisconnects(this.selector.disconnected(), now); + handleResponses(this.selector.completedReceives(), time.milliseconds()); + handleDisconnects(this.selector.disconnected(), time.milliseconds()); handleConnects(this.selector.connected()); return ready.size(); @@ -348,15 +349,28 @@ public class Sender implements Runnable { nodeStates.disconnected(node); log.debug("Node {} disconnected.", node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { - if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) { - if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { - this.accumulator.reenqueue(batch, now); - } else { - batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); - this.accumulator.deallocate(batch); + ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); + switch (requestKey) { + case PRODUCE: + for (RecordBatch batch : request.batches.values()) { + if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { + log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).", + batch.topicPartition, this.retries - batch.attempts - 1); + this.accumulator.reenqueue(batch, now); + } else { + batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); + this.accumulator.deallocate(batch); + } } - } + break; + case METADATA: + metadataFetchInProgress = false; + break; + default: + throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id); + } + if (request.batches != null) { + } } } @@ -409,18 +423,18 @@ public class Sender implements Runnable { correlate(req.request.header(), header); if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); - handleProduceResponse(req, body, now); + handleProduceResponse(req, req.request.header(), body, now); } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() - .correlationId()); - handleMetadataResponse(body, now); + .correlationId()); + handleMetadataResponse(req.request.header(), body, now); } else { throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); } } } - private void handleMetadataResponse(Struct body, long now) { + private void handleMetadataResponse(RequestHeader header, Struct body, long now) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); @@ -429,35 +443,30 @@ public class Sender implements Runnable { if (cluster.nodes().size() > 0) this.metadata.update(cluster, now); else - log.trace("Ignoring empty metadata response."); + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); } /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, Struct response, long now) { - for (Object topicResponse : (Object[]) response.get("responses")) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = (String) topicRespStruct.get("topic"); - for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { - Struct partRespStruct = (Struct) partResponse; - int partition = (Integer) partRespStruct.get("partition"); - short errorCode = (Short) partRespStruct.get("error_code"); - - // if we got an error we may need to refresh our metadata - Errors error = Errors.forCode(errorCode); + private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long now) { + ProduceResponse pr = new ProduceResponse(body); + for (Map responses : pr.responses().values()) { + for (Map.Entry entry : responses.entrySet()) { + TopicPartition tp = entry.getKey(); + ProduceResponse.PartitionResponse response = entry.getValue(); + Errors error = Errors.forCode(response.errorCode); if (error.exception() instanceof InvalidMetadataException) metadata.forceUpdate(); - - long offset = (Long) partRespStruct.get("base_offset"); - RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); + RecordBatch batch = request.batches.get(tp); if (canRetry(batch, error)) { // retry - log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error); + log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", + header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error); this.accumulator.reenqueue(batch, now); } else { // tell the user the result of their request - batch.done(offset, error.exception()); + batch.done(response.baseOffset, error.exception()); this.accumulator.deallocate(batch); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 21a2592..992791d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,10 @@ */ package org.apache.kafka.common.protocol; + +import java.util.ArrayList; +import java.util.List; + /** * Identifiers for all the Kafka APIs */ @@ -29,11 +33,13 @@ public enum ApiKeys { OFFSET_COMMIT(6, "offset_commit"), OFFSET_FETCH(7, "offset_fetch"); + private static List codeToType = new ArrayList(); public static int MAX_API_KEY = 0; static { for (ApiKeys key : ApiKeys.values()) { MAX_API_KEY = Math.max(MAX_API_KEY, key.id); + codeToType.add(key); } } @@ -48,4 +54,7 @@ public enum ApiKeys { this.name = name; } + public static ApiKeys forId(int id) { + return codeToType.get(id); + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 91b9d64..f35bd87 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -1,3 +1,15 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package org.apache.kafka.common.requests; import java.util.List; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 73b7006..2652c32 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -1,3 +1,15 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package org.apache.kafka.common.requests; import java.util.ArrayList; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java new file mode 100644 index 0000000..6ac2e53 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Struct; + +import java.util.HashMap; +import java.util.Map; + +public class ProduceResponse { + public class PartitionResponse { + public int partitionId; + public short errorCode; + public long baseOffset; + + public PartitionResponse(int partitionId, short errorCode, long baseOffset) { + this.partitionId = partitionId; + this.errorCode = errorCode; + this.baseOffset = baseOffset; + } + } + + private final Map> responses; + + public ProduceResponse(Struct struct) { + responses = new HashMap>(); + for (Object topicResponse : (Object[]) struct.get("responses")) { + Struct topicRespStruct = (Struct) topicResponse; + String topic = (String) topicRespStruct.get("topic"); + Map topicResponses = new HashMap(); + for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) { + Struct partRespStruct = (Struct) partResponse; + int partition = (Integer) partRespStruct.get("partition"); + short errorCode = (Short) partRespStruct.get("error_code"); + long offset = (Long) partRespStruct.get("base_offset"); + TopicPartition tp = new TopicPartition(topic, partition); + topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset)); + } + responses.put(topic, topicResponses); + } + } + + public Map> responses() { + return this.responses; + } +} diff --git a/config/log4j.properties b/config/log4j.properties index baa698b..1ab8507 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -73,6 +73,8 @@ log4j.additivity.kafka.controller=false log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender log4j.additivity.kafka.log.LogCleaner=false +log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.Cleaner=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false diff --git a/config/server.properties b/config/server.properties index c9e923a..2ffe0eb 100644 --- a/config/server.properties +++ b/config/server.properties @@ -40,7 +40,7 @@ port=9092 num.network.threads=2 # The number of threads doing disk I/O -num.io.threads=8 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 @@ -100,10 +100,6 @@ log.segment.bytes=536870912 # to the retention policies log.retention.check.interval.ms=60000 -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -log.cleaner.enable=false - ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). @@ -115,3 +111,6 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 + + +log.cleanup.policy=delete diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b58cdcd..4deff9d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -337,7 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg inLock(controllerContext.controllerLock) { if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() - deleteTopicManager.shutdown() + if (deleteTopicManager != null) + deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() replicaStateMachine.shutdown() @@ -647,8 +648,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null - info("Controller shutdown complete") } + info("Controller shutdown complete") } } diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index ade8386..fa946ad 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -35,7 +35,7 @@ case class CleanerConfig(val numThreads: Int = 1, val ioBufferSize: Int = 1024*1024, val maxMessageSize: Int = 32*1024*1024, val maxIoBytesPerSecond: Double = Double.MaxValue, - val backOffMs: Long = 15 * 1000, + val backOffMs: Long = 60 * 1000, val enableCleaner: Boolean = true, val hashAlgorithm: String = "MD5") { } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 312204c..6404647 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -131,9 +131,6 @@ class LogCleaner(val config: CleanerConfig, */ private class CleanerThread(threadId: Int) extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { - - override val loggerName = classOf[LogCleaner].getName - if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") @@ -188,7 +185,7 @@ class LogCleaner(val config: CleanerConfig, def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { def mb(bytes: Double) = bytes / (1024*1024) val message = - "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + + "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), stats.elapsedSecs, mb(stats.bytesRead/stats.elapsedSecs)) + @@ -225,8 +222,6 @@ private[log] class Cleaner(val id: Int, throttler: Throttler, time: Time, checkDone: (TopicAndPartition) => Unit) extends Logging { - - override val loggerName = classOf[LogCleaner].getName this.logIdent = "Cleaner " + id + ": " diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 43e5c1f..1612c8d 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -40,9 +40,6 @@ private[log] case object LogCleaningPaused extends LogCleaningState * requested to be resumed. */ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging { - - override val loggerName = classOf[LogCleaner].getName - /* the offset checkpoints holding the last cleaned point for each log */ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap @@ -68,7 +65,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val cleanableLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 18c86fe..0b32aee 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -34,7 +34,7 @@ import kafka.common._ * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned - * @param compact Should old segments in this log be deleted or deduplicated? + * @param dedupe Should old segments in this log be deleted or deduplicated? */ case class LogConfig(val segmentSize: Int = 1024*1024, val segmentMs: Long = Long.MaxValue, @@ -48,7 +48,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val fileDeleteDelayMs: Long = 60*1000, val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, val minCleanableRatio: Double = 0.5, - val compact: Boolean = false) { + val dedupe: Boolean = false) { def toProps: Properties = { val props = new Properties() @@ -65,7 +65,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) - props.put(CleanupPolicyProp, if(compact) "compact" else "delete") + props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") props } @@ -117,7 +117,7 @@ object LogConfig { fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, - compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") + dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bcd2bb7..80b38f3 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -162,7 +162,7 @@ class LogManager(val logDirs: Array[File], * Close all the logs */ def shutdown() { - debug("Shutting down.") + info("Shutting down.") try { // stop the cleaner first if(cleaner != null) @@ -179,7 +179,7 @@ class LogManager(val logDirs: Array[File], // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) } - debug("Shutdown complete.") + info("Shutdown complete.") } /** @@ -351,7 +351,7 @@ class LogManager(val logDirs: Array[File], debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds - for(log <- allLogs; if !log.config.compact) { + for(log <- allLogs; if !log.config.dedupe) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b871843..04a5d39 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -137,7 +137,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d) /* the amount of time to sleep when there are no logs to clean */ - val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue)) + val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue)) /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */ val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index feb2093..c606b50 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -262,7 +262,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact") + dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 4b7c544..9c92e29 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -26,7 +26,7 @@ import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{ShutdownableThread, Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer @@ -267,18 +267,77 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness def testBrokerFailure() { // create topic val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers) - val leader = leaders(0) + val partition = 0 + var leader = leaders(partition) assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined) - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) - assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L) + val scheduler = new ProducerScheduler() + scheduler.start + + // rolling bounce brokers + for (i <- 0 until 5) { + server1.shutdown() + server1.awaitShutdown() + server1.startup + + Thread.sleep(2000) + + server2.shutdown() + server2.awaitShutdown() + server2.startup + + Thread.sleep(2000) - // shutdown broker - val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2 - serverToShutdown.shutdown() - serverToShutdown.awaitShutdown() + assertTrue(scheduler.failed == false) + } + + scheduler.shutdown + leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500) + + val fetchResponse = if(leader.get == server1.config.brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) + } - // send the message again, it should still succeed due-to retry - assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L) + val messages = fetchResponse.iterator.toList.map(_.message) + val uniqueMessages = messages.toSet + val uniqueMessageSize = uniqueMessages.size + + assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) + } + + private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) + { + val numRecords = 1000 + var sent = 0 + var failed = false + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList) + producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString) + producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString) + producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString) + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString) + + // do not need to close producer when shutdown + val producer = new KafkaProducer(producerProps); + + override def doWork(): Unit = { + val responses = + for (i <- sent+1 to sent+numRecords) + yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes)) + val futures = responses.toList + + // make sure all of them end up in the same partition with increasing offset values + try { + futures.map(_.get) + sent += numRecords + } catch { + case e : Exception => failed = true + } + + Thread.sleep(500) + } } } \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index d20d132..22b16e5 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -243,11 +243,11 @@ object TestLogCleaning { percentDeletes: Int): File = { val producerProps = new Properties producerProps.setProperty("producer.type", "async") - producerProps.setProperty("metadata.broker.list", brokerUrl) + producerProps.setProperty("broker.list", brokerUrl) producerProps.setProperty("serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("queue.enqueue.timeout.ms", "-1") - producerProps.setProperty("batch.num.messages", 1000.toString) + producerProps.setProperty("batch.size", 1000.toString) val producer = new Producer[String, String](new ProducerConfig(producerProps)) val rand = new Random(1) val keyCount = (messages / dups).toInt @@ -275,9 +275,8 @@ object TestLogCleaning { def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { val consumerProps = new Properties consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) - consumerProps.setProperty("zookeeper.connect", zkUrl) - consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString) - consumerProps.setProperty("auto.offset.reset", "smallest") + consumerProps.setProperty("zk.connect", zkUrl) + consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString) new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index d10e4f4..51cd94b 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -33,7 +33,7 @@ import kafka.message._ class CleanerTest extends JUnitSuite { val dir = TestUtils.tempDir() - val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true) + val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 9aeb69d..1de3ef0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -101,7 +101,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { val dir = new File(logDir, "log-" + i) dir.mkdirs() val log = new Log(dir = dir, - LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true), + LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), recoveryPoint = 0L, scheduler = time.scheduler, time = time)