diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 933de9d..2cf7d30 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,7 +28,7 @@ import kafka.cluster.Broker import kafka.common._ import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import kafka.server.{ZookeeperLeaderElector, KafkaConfig} +import kafka.server._ import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.Utils._ @@ -39,6 +39,13 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition +import scala.Some +import kafka.common.TopicAndPartition +import scala.None +import kafka.controller.ReassignedPartitionsContext +import kafka.controller.PartitionAndReplica +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.server.BrokerState class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -154,7 +161,7 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger @@ -316,6 +323,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) + brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ @@ -349,6 +357,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } + brokerState.newState(RunningAsBroker) } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7bc5ff..1d30a69 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -175,6 +175,7 @@ class Log(val dir: File, return } + // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ac67f08..1e77b68 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} -import kafka.server.OffsetCheckpoint +import kafka.server.{RecoveringFromUncleanShutdown, BrokerStates, BrokerState, OffsetCheckpoint} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File], val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler, + val brokerState: BrokerState, private val time: Time) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" @@ -110,6 +111,9 @@ class LogManager(val logDirs: Array[File], val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) + else + brokerState.newState(RecoveringFromUncleanShutdown) + for(dir <- subDirs) { if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala deleted file mode 100644 index b020793..0000000 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer - -import java.util.Properties - -// A base producer used whenever we need to have options for both old and new producers; -// this class will be removed once we fully rolled out 0.9 -trait BaseProducer { - def send(topic: String, key: Array[Byte], value: Array[Byte]) - def close() -} - -class NewShinyProducer(producerProps: Properties) extends BaseProducer { - import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} - import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback - - // decide whether to send synchronously based on producer properties - val sync = producerProps.getProperty("producer.type", "async").equals("sync") - - val producer = new KafkaProducer(producerProps) - - override def send(topic: String, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord(topic, key, value) - if(sync) { - this.producer.send(record).get() - } else { - this.producer.send(record, - new ErrorLoggingCallback(topic, key, value, false)) - } - } - - override def close() { - this.producer.close() - } -} - -class OldProducer(producerProps: Properties) extends BaseProducer { - import kafka.producer.{KeyedMessage, ProducerConfig} - - // default to byte array partitioner - if (producerProps.getProperty("partitioner.class") == null) - producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName) - val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps)) - - override def send(topic: String, key: Array[Byte], value: Array[Byte]) { - this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value)) - } - - override def close() { - this.producer.close() - } -} - diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala index 6a3b02e..988e437 100644 --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -20,7 +20,7 @@ package kafka.producer import kafka.utils._ -class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { +private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions } diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index a2af988..b19ab49 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -17,17 +17,16 @@ package kafka.producer +import joptsimple._ +import java.util.Properties +import java.io._ import kafka.common._ import kafka.message._ import kafka.serializer._ -import kafka.utils.CommandLineUtils - -import java.util.Properties -import java.io._ - -import joptsimple._ +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import kafka.utils.{CommandLineUtils, Utils} -object ConsoleProducer { +object ConsoleProducer { def main(args: Array[String]) { @@ -40,46 +39,8 @@ object ConsoleProducer { try { val producer = - if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - new OldProducer(props) - } + if(config.useNewProducer) new NewShinyProducer(config) + else new OldProducer(config) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -296,4 +257,72 @@ object ConsoleProducer { } } } + + trait Producer { + def send(topic: String, key: Array[Byte], bytes: Array[Byte]) + def close() + } + + class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { + import org.apache.kafka.clients.producer.ProducerConfig + val props = new Properties() + props.putAll(producerConfig.cmdLineProps) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString) + if(producerConfig.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + val producer = new KafkaProducer(props) + + def send(topic: String, key: Array[Byte], bytes: Array[Byte]) { + val response = this.producer.send(new ProducerRecord(topic, key, bytes)) + if(producerConfig.sync) { + response.get() + } + } + + def close() { + this.producer.close() + } + } + + class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer { + val props = new Properties() + props.putAll(producerConfig.cmdLineProps) + props.put("metadata.broker.list", producerConfig.brokerList) + props.put("compression.codec", producerConfig.compressionCodec) + props.put("producer.type", if(producerConfig.sync) "sync" else "async") + props.put("batch.num.messages", producerConfig.batchSize.toString) + props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", producerConfig.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", producerConfig.sendTimeout.toString) + props.put("queue.buffering.max.messages", producerConfig.queueSize.toString) + props.put("queue.enqueue.timeout.ms", producerConfig.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", producerConfig.requestRequiredAcks.toString) + props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString) + props.put("key.serializer.class", producerConfig.keyEncoderClass) + props.put("serializer.class", producerConfig.valueEncoderClass) + props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", producerConfig.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new kafka.producer.ProducerConfig(props)) + + def send(topic: String, key: Array[Byte], bytes: Array[Byte]) { + this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, bytes)) + } + + def close() { + this.producer.close() + } + } } diff --git a/core/src/main/scala/kafka/server/BrokerStates.scala b/core/src/main/scala/kafka/server/BrokerStates.scala new file mode 100644 index 0000000..e6ee77e --- /dev/null +++ b/core/src/main/scala/kafka/server/BrokerStates.scala @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +/** + * Broker states are the possible state that a kafka broker can be in. + * A broker should be only in one state at a time. + * The expected state transition with the following defined states is: + * + * +-----------+ + * |Not Running| + * +-----+-----+ + * | + * v + * +-----+-----+ + * |Starting +--+ + * +-----+-----+ | +----+------------+ + * | +>+RecoveringFrom | + * v |UncleanShutdown | + * +----------+ +-----+-----+ +-------+---------+ + * |RunningAs | |RunningAs | | + * |Controller+<--->+Broker +<-----------+ + * +----------+ +-----+-----+ + * | | + * | v + * | +-----+------------+ + * |-----> |PendingControlled | + * |Shutdown | + * +-----+------------+ + * | + * v + * +-----+----------+ + * |BrokerShutting | + * |Down | + * +-----+----------+ + * | + * v + * +-----+-----+ + * |Not Running| + * +-----------+ + * + * Custom states is also allowed for cases where there are custom kafka states for different scenarios. + */ +sealed trait BrokerStates { def state: Byte } +case object NotRunning extends BrokerStates { val state: Byte = 0 } +case object Starting extends BrokerStates { val state: Byte = 1 } +case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } +case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } +case object RunningAsController extends BrokerStates { val state: Byte = 4 } +case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } +case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 } + + +case class BrokerState() { + @volatile var currentState: Byte = NotRunning.state + + def newState(newState: BrokerStates) { + this.newState(newState.state) + } + + // Allowing undefined custom state + def newState(newState: Byte) { + currentState = newState + } +} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c208f83..c22e51e 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -31,16 +31,19 @@ import kafka.cluster.Broker import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.ErrorMapping import kafka.network.{Receive, BlockingChannel, SocketServer} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) + val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null + newGauge( + "BrokerState", + new Gauge[Int] { + def value = brokerState.currentState + } + ) + /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { info("starting") + brokerState.newState(Starting) isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg zkClient = initZk() /* start log manager */ - logManager = createLogManager(zkClient) + logManager = createLogManager(zkClient, brokerState) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start offset manager */ offsetManager = createOffsetManager() - kafkaController = new KafkaController(config, zkClient) + kafkaController = new KafkaController(config, zkClient, brokerState) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) + brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var prevController : Broker = null var shutdownSuceeded : Boolean = false try { + brokerState.newState(PendingControlledShutdown) while (!shutdownSuceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // send the controlled shutdown request val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) channel.send(request) + response = channel.receive() + val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown) { Utils.swallow(controlledShutdown()) + brokerState.newState(BrokerShuttingDown) if(kafkaHealthcheck != null) Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if(zkClient != null) Utils.swallow(zkClient.close()) + brokerState.newState(NotRunning) shutdownLatch.countDown() startupComplete.set(false) info("shut down completed") @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager - private def createLogManager(zkClient: ZkClient): LogManager = { + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = 60L * 60L * 1000L * config.logRollHours, flushInterval = config.logFlushIntervalMessages, @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, retentionCheckMs = config.logCleanupIntervalMs, scheduler = kafkaScheduler, + brokerState = brokerState, time = time) } diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index acda52b..5e35380 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -52,6 +52,10 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { } } + def setServerState(newState: Byte) { + server.brokerState.newState(newState) + } + def awaitShutdown() = server.awaitShutdown diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 12fa797..e4d1a86 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,23 +17,21 @@ package kafka.tools +import joptsimple.OptionParser import kafka.utils.{Utils, CommandLineUtils, Logging} +import kafka.producer.{KeyedMessage, ProducerConfig, Producer} +import scala.collection.JavaConversions._ +import java.util.concurrent.CountDownLatch import kafka.consumer._ import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} -import org.apache.kafka.clients.producer.ProducerRecord - -import scala.collection.mutable.ListBuffer -import scala.collection.JavaConversions._ - -import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} - -import joptsimple.OptionParser +import collection.mutable.ListBuffer +import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel} +import kafka.javaapi object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null - private var consumerThreads: Seq[ConsumerThread] = null + private var consumerThreads: Seq[MirrorMakerThread] = null private var producerThreads: ListBuffer[ProducerThread] = null def main(args: Array[String]) { @@ -54,9 +52,6 @@ object MirrorMaker extends Logging { .describedAs("config file") .ofType(classOf[String]) - val useNewProducerOpt = parser.accepts("new.producer", - "Use the new producer implementation.") - val numProducersOpt = parser.accepts("num.producers", "Number of producer instances") .withRequiredArg() @@ -75,7 +70,7 @@ object MirrorMaker extends Logging { .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(classOf[java.lang.Integer]) - .defaultsTo(10000) + .defaultsTo(10000); val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") @@ -104,35 +99,24 @@ object MirrorMaker extends Logging { System.exit(1) } - val numProducers = options.valueOf(numProducersOpt).intValue() - val numStreams = options.valueOf(numStreamsOpt).intValue() + val numStreams = options.valueOf(numStreamsOpt) val bufferSize = options.valueOf(bufferSizeOpt).intValue() - val useNewProducer = options.has(useNewProducerOpt) - val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) - - // create data channel - val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) - - // create producer threads - val producers = (1 to numProducers).map(_ => { - if (useNewProducer) - new NewShinyProducer(producerProps) - else - new OldProducer(producerProps) - }) - - producerThreads = new ListBuffer[ProducerThread]() - var producerIndex: Int = 1 - for(producer <- producers) { - val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex) - producerThreads += producerThread - producerIndex += 1 - } + val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { + val props = Utils.loadProps(options.valueOf(producerConfigOpt)) + val config = props.getProperty("partitioner.class") match { + case null => + new ProducerConfig(props) { + override val partitionerClass = "kafka.producer.ByteArrayPartitioner" + } + case pClass : String => + new ProducerConfig(props) + } + new Producer[Array[Byte], Array[Byte]](config) + }) - // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg.toString))) .map(new ZookeeperConsumerConnector(_)) val filterSpec = if (options.has(whitelistOpt)) @@ -142,13 +126,18 @@ object MirrorMaker extends Logging { var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil try { - streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten + streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten } catch { case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } - consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2)) + + val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); + + consumerThreads = streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2)) + + producerThreads = new ListBuffer[ProducerThread]() Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -156,6 +145,15 @@ object MirrorMaker extends Logging { } }) + // create producer threads + var i: Int = 1 + for(producer <- producers) { + val producerThread: KafkaMigrationTool.ProducerThread = new KafkaMigrationTool.ProducerThread(producerDataChannel, + new javaapi.producer.Producer[Array[Byte], Array[Byte]](producer), i) + producerThreads += producerThread + i += 1 + } + consumerThreads.foreach(_.start) producerThreads.foreach(_.start) @@ -174,14 +172,14 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } - class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: BlockingQueue[ProducerRecord], - producers: Seq[BaseProducer], + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], + producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], + producers: Seq[Producer[Array[Byte], Array[Byte]]], threadId: Int) extends Thread with Logging { private val shutdownLatch = new CountDownLatch(1) - private val threadName = "mirrormaker-consumer-" + threadId + private val threadName = "mirrormaker-" + threadId this.logIdent = "[%s] ".format(threadName) this.setName(threadName) @@ -194,13 +192,14 @@ object MirrorMaker extends Logging { // Otherwise use a pre-assigned producer to send the message if (msgAndMetadata.key == null) { trace("Send the non-keyed message the producer channel.") - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) - mirrorDataChannel.put(data) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) + producerDataChannel.sendRequest(pd) } else { val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) val producer = producers(producerId) - producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + producer.send(pd) } } } catch { @@ -220,62 +219,5 @@ object MirrorMaker extends Logging { } } } - - class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], - val producer: BaseProducer, - val threadId: Int) extends Thread { - val threadName = "mirrormaker-producer-" + threadId - val logger = org.apache.log4j.Logger.getLogger(classOf[KafkaMigrationTool.ProducerThread].getName) - val shutdownComplete: CountDownLatch = new CountDownLatch(1) - - private final val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) - - setName(threadName) - - override def run { - try { - while (true) { - val data: ProducerRecord = dataChannel.take - logger.trace("Sending message with value size %d".format(data.value().size)) - - if(data eq shutdownMessage) { - logger.info("Producer thread " + threadName + " finished running") - return - } - producer.send(data.topic(), data.key(), data.value()) - } - } catch { - case t: Throwable => { - logger.fatal("Producer thread failure due to ", t) - } - } finally { - shutdownComplete.countDown - } - } - - def shutdown { - try { - logger.info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) - } - catch { - case ie: InterruptedException => { - logger.warn("Interrupt during shutdown of ProducerThread", ie) - } - } - } - - def awaitShutdown { - try { - shutdownComplete.await - producer.close - logger.info("Producer thread " + threadName + " shutdown complete") - } catch { - case ie: InterruptedException => { - logger.warn("Interrupt during shutdown of ProducerThread") - } - } - } - } } diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala new file mode 100644 index 0000000..a969a22 --- /dev/null +++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools.newproducer + +import joptsimple.OptionParser +import kafka.utils.{Utils, CommandLineUtils, Logging} +import java.util.concurrent.CountDownLatch +import kafka.consumer._ +import collection.mutable.ListBuffer +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} +import java.util.concurrent.atomic.AtomicInteger +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback + + +object MirrorMaker extends Logging { + + private var connector: ZookeeperConsumerConnector = null + private var mirroringThreads: Seq[MirrorMakerThread] = null + private var producerChannel: ProducerDataChannel = null + + def main(args: Array[String]) { + info ("Starting mirror maker") + val parser = new OptionParser + + val consumerConfigOpt = parser.accepts("consumer.config", + "Consumer config file to consume from a source cluster.") + .withRequiredArg() + .describedAs("config file") + .ofType(classOf[String]) + + val producerConfigOpt = parser.accepts("producer.config", + "Embedded producer config file for target cluster.") + .withRequiredArg() + .describedAs("config file") + .ofType(classOf[String]) + + val numStreamsOpt = parser.accepts("num.streams", + "Number of mirroring streams.") + .withRequiredArg() + .describedAs("Number of threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + + val whitelistOpt = parser.accepts("whitelist", + "Whitelist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) + + val blacklistOpt = parser.accepts("blacklist", + "Blacklist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) + + val helpOpt = parser.accepts("help", "Print this message.") + val options = parser.parse(args : _*) + if (options.has(helpOpt)) { + parser.printHelpOn(System.out) + System.exit(0) + } + CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt) + if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { + fatal("Exactly one of whitelist or blacklist is required.") + System.exit(1) + } + val filterSpec = if (options.has(whitelistOpt)) + new Whitelist(options.valueOf(whitelistOpt)) + else + new Blacklist(options.valueOf(blacklistOpt)) + val producerConfig = options.valueOf(producerConfigOpt) + val producerProps = Utils.loadProps(producerConfig) + producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + val consumerConfig = options.valueOf(consumerConfigOpt) + val numStreams = options.valueOf(numStreamsOpt) + producerChannel = new ProducerDataChannel() + connector = new ZookeeperConsumerConnector(new ConsumerConfig(Utils.loadProps(consumerConfig))) + var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null + try { + streams = connector.createMessageStreamsByFilter(filterSpec, numStreams.intValue()) + debug("%d consumer streams created".format(streams.size)) + } catch { + case t: Throwable => + fatal("Unable to create stream - shutting down mirror maker.") + connector.shutdown() + System.exit(1) + } + val streamIndex = new AtomicInteger() + streams.foreach(stream => producerChannel.addProducer(new KafkaProducer(producerProps))) + mirroringThreads = streams.map(stream => new MirrorMakerThread(stream, streamIndex.getAndIncrement)) + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + cleanShutdown() + } + }) + // start the mirroring threads + mirroringThreads.foreach(_.start) + // in case the consumer threads hit a timeout/other exception + mirroringThreads.foreach(_.awaitShutdown) + cleanShutdown() + } + + def cleanShutdown() { + if (connector != null) connector.shutdown() + if (mirroringThreads != null) mirroringThreads.foreach(_.awaitShutdown) + if (producerChannel != null) producerChannel.close() + info("Kafka mirror maker shutdown successfully") + } + + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], + threadId: Int) + extends Thread with Logging { + + private val shutdownLatch = new CountDownLatch(1) + private val threadName = "mirrormaker-" + threadId + this.logIdent = "[%s] ".format(threadName) + + this.setName(threadName) + + override def run() { + info("Starting mirror maker thread " + threadName) + try { + for (msgAndMetadata <- stream) { + producerChannel.send(new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key(), msgAndMetadata.message())) + } + } catch { + case e: Throwable => + fatal("Stream unexpectedly exited.", e) + } finally { + shutdownLatch.countDown() + info("Stopped thread.") + } + } + + def awaitShutdown() { + try { + shutdownLatch.await() + } catch { + case e: InterruptedException => fatal("Shutdown of thread %s interrupted. This might leak data!".format(threadName)) + } + } + } + + class ProducerDataChannel extends Logging { + val producers = new ListBuffer[KafkaProducer] + var producerIndex = new AtomicInteger(0) + + def addProducer(producer: KafkaProducer) { + producers += producer + } + + def send(producerRecord: ProducerRecord) { + if(producerRecord.key() != null) { + val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size + trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) + val producer = producers(producerId) + producer.send(producerRecord, + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) + } else { + val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) + producers(producerId).send(producerRecord, + new ErrorLoggingCallback(producerRecord.topic(), producerRecord.key(), producerRecord.value(), false)) + trace("Sent message to producer " + producerId) + } + } + + def close() { + producers.foreach(_.close()) + } + } +} + diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index be1a1ee..0da2a7d 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -21,10 +21,9 @@ import java.io._ import junit.framework.Assert._ import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.server.KafkaConfig +import kafka.server.{BrokerState, KafkaConfig, OffsetCheckpoint} import kafka.common._ import kafka.utils._ -import kafka.server.OffsetCheckpoint class LogManagerTest extends JUnit3Suite { @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite { flushCheckpointMs = 100000L, retentionCheckMs = 1000L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = new BrokerState()) logManager.startup logDir = logManager.logDirs(0) } @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite { logManager.shutdown() val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time) + logManager = new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = config, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) logManager.startup // create a log @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite { def testTimeBasedFlush() { logManager.shutdown() val config = logConfig.copy(flushMs = 1000) - logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) + logManager = new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = config, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite { TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) + logManager = new LogManager( + logDirs = dirs, + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite { @Test def testTwoLogManagersUsingSameDirFails() { try { - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) + new LogManager( + logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 10000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + brokerState = new BrokerState(), + time = time + ) fail("Should not be able to create a second log manager instance with the same data directory") } catch { case e: KafkaException => // this is good @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite { flushCheckpointMs = 100000L, retentionCheckMs = 1000L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = new BrokerState()) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite { flushCheckpointMs = 100000L, retentionCheckMs = 1000L, scheduler = time.scheduler, - time = time) + time = time, + brokerState = new BrokerState()) logManager.startup verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index a78f7cf..558a5d6 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { flushCheckpointMs = 10000L, retentionCheckMs = 30000, scheduler = new KafkaScheduler(1), + brokerState = new BrokerState(), time = new MockTime)) @After diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 41ebc7a..518d416 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite { flushCheckpointMs = 100000L, retentionCheckMs = 1000L, scheduler = time.scheduler, + brokerState = new BrokerState(), time = time) } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 00fa90b..1490bdb 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -17,20 +17,19 @@ package kafka.perf -import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging} +import java.util.concurrent.{ CountDownLatch, Executors } +import java.util.concurrent.atomic.AtomicLong +import kafka.producer._ +import org.apache.log4j.Logger import kafka.message.CompressionCodec +import java.text.SimpleDateFormat import kafka.serializer._ - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong import java.util._ -import java.text.SimpleDateFormat -import java.math.BigInteger -import scala.collection.immutable.List - -import org.apache.log4j.Logger +import collection.immutable.List +import kafka.utils.{VerifiableProperties, Logging, Utils} +import kafka.metrics.KafkaMetricsReporter +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback /** * Load test for the producer @@ -171,6 +170,67 @@ object ProducerPerformance extends Logging { val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() } + trait Producer { + def send(topic: String, partition: Long, bytes: Array[Byte]) + def close() + } + + class OldRustyProducer(config: ProducerPerfConfig) extends Producer { + val props = new Properties() + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") + } + props.put("client.id", "perf-test") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("serializer.class", classOf[DefaultEncoder].getName.toString) + props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString) + val producer = new kafka.producer.Producer[Long, Array[Byte]](new ProducerConfig(props)) + + def send(topic: String, partition: Long, bytes: Array[Byte]) { + this.producer.send(new KeyedMessage[Long, Array[Byte]](topic, partition, bytes)) + } + + def close() { + this.producer.close() + } + } + + class NewShinyProducer(config: ProducerPerfConfig) extends Producer { + import org.apache.kafka.clients.producer.ProducerConfig + val props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test") + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + val producer = new KafkaProducer(props) + + def send(topic: String, partition: Long, bytes: Array[Byte]) { + val part = partition % this.producer.partitionsFor(topic).size + if (config.isSync) { + this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() + } else { + this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), + new ErrorLoggingCallback(topic, null, bytes, if (config.seqIdMode) true else false)) + } + } + + def close() { + this.producer.close() + } + } + class ProducerThread(val threadId: Int, val config: ProducerPerfConfig, val totalBytesSent: AtomicLong, @@ -181,37 +241,11 @@ object ProducerPerformance extends Logging { val messagesPerThread = config.numMessages / config.numThreads debug("Messages per thread = " + messagesPerThread) - val props = new Properties() val producer = - if (config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { - props.put("producer.type", "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - } - props.put("client.id", "producer-performance") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) - new OldProducer(props) - } + if (config.useNewProducer) + new NewShinyProducer(config) + else + new OldRustyProducer(config) // generate the sequential message ID private val SEP = ":" // message field separator @@ -254,16 +288,15 @@ object ProducerPerformance extends Logging { override def run { var bytesSent = 0L var nSends = 0 - var i: Long = 0L + var j: Long = 0L var message: Array[Byte] = null - while (i < messagesPerThread) { + while (j < messagesPerThread) { try { config.topics.foreach( topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size + message = generateProducerData(topic, j) + producer.send(topic, j, message) nSends += 1 if (config.messageSendGapMs > 0) Thread.sleep(config.messageSendGapMs) @@ -271,7 +304,7 @@ object ProducerPerformance extends Logging { } catch { case e: Throwable => error("Error when sending message " + new String(message), e) } - i += 1 + j += 1 } try { producer.close() diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 6917ea1..423b512 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -747,7 +747,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.tools.MirrorMaker", + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.newproducer.MirrorMaker", "--consumer.config " + configPathName + "/" + mmConsumerConfigFile, "--producer.config " + configPathName + "/" + mmProducerConfigFile, "--whitelist=\".*\" >> ",