From 90e740bc70cedf321eaad1ead1748d0f3bfc11e6 Mon Sep 17 00:00:00 2001 From: raulcf Date: Mon, 4 Aug 2014 14:55:44 -0700 Subject: [PATCH] KAFKA-1569; end to end correctness test for transactions --- bin/kafka-tx-consumer-test.sh | 20 ++ bin/kafka-tx-producer-test.sh | 20 ++ .../kafka/tools/TransactionalConsumerTest.scala | 145 +++++++++++++ .../kafka/tools/TransactionalProducerTest.scala | 240 +++++++++++++++++++++ 4 files changed, 425 insertions(+) create mode 100644 bin/kafka-tx-consumer-test.sh create mode 100644 bin/kafka-tx-producer-test.sh create mode 100644 core/src/main/scala/kafka/tools/TransactionalConsumerTest.scala create mode 100644 core/src/main/scala/kafka/tools/TransactionalProducerTest.scala diff --git a/bin/kafka-tx-consumer-test.sh b/bin/kafka-tx-consumer-test.sh new file mode 100644 index 0000000..5fba619 --- /dev/null +++ b/bin/kafka-tx-consumer-test.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh kafka.tools.TransactionalConsumerTest $@ diff --git a/bin/kafka-tx-producer-test.sh b/bin/kafka-tx-producer-test.sh new file mode 100644 index 0000000..4bd7a99 --- /dev/null +++ b/bin/kafka-tx-producer-test.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh kafka.tools.TransactionalProducerTest $@ diff --git a/core/src/main/scala/kafka/tools/TransactionalConsumerTest.scala b/core/src/main/scala/kafka/tools/TransactionalConsumerTest.scala new file mode 100644 index 0000000..381ef53 --- /dev/null +++ b/core/src/main/scala/kafka/tools/TransactionalConsumerTest.scala @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io._ +import java.math.BigInteger +import java.net.URI +import java.nio.charset.Charset +import java.util.concurrent.{CountDownLatch, Executors} + +import joptsimple.OptionParser +import kafka.api.FetchRequestBuilder +import kafka.consumer.SimpleConsumer +import kafka.utils.{CommandLineUtils, Logging} + +/** + * Load test for the producer + */ +object TransactionalConsumerTest extends Logging { + + def main(args: Array[String]) { + + val config = new TransactionalConsumerConf(args) + + val executor = Executors.newFixedThreadPool(1) + val done = new CountDownLatch(1) + + executor.execute(new TransactionalConsumerThread(config, done)) + + done.await() + + println("Done. Diff the files to check correctness") + + System.exit(0) + } + + class TransactionalConsumerThread(val config: TransactionalConsumerConf, + val done: CountDownLatch) extends Runnable { + + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + + override def run { + + val topic = config.topic + val partition = config.partition + var rcvMessages = 0 + + // Open file to read data + var inFile: Option[java.io.File] = None + var fWriter: Option[FileWriter] = None + var bw: Option[java.io.BufferedWriter] = None + + try { + inFile = Some(new File("/tmp/out_file.dat")) + fWriter = Some(new FileWriter(inFile.get)) + bw = Some(new BufferedWriter(fWriter.get)) + + val charset = Charset.forName("UTF-8") + val decoder = charset.newDecoder() + + // read from kafka and write + while(rcvMessages < config.totalMessages) { + + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(topic, partition, 0, config.fetchSize) + .build() + val fetchTxResponse = consumer.fetchTx(request) + val (messages, lastConsumedOffset) = fetchTxResponse((config.topic, config.partition)) + + for (message <- messages) { + val payload = message.payload + val data = decoder.decode(payload).toString + + bw.get.write(data) + bw.get.newLine() + rcvMessages = rcvMessages + 1 + } + } + + } finally { + bw.get.close() + } + done.countDown() + println("Finished sending file") + } + } + + class TransactionalConsumerConf(args: Array[String]) { + val parser = new OptionParser + val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + .withRequiredArg + .describedAs("kafka://hostname:port") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: A single topic to produce to") + .withRequiredArg + .describedAs("topic_name") + .ofType(classOf[String]) + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("TxConsumerClient") + val totalMessagesOpt = parser.accepts("num-messages", "The number of messages expected to receive") + .withRequiredArg() + .describedAs("total messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*1024) + + + val options = parser.parse(args: _*) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) + + val url = new URI(options.valueOf(urlOpt)) + val topic = options.valueOf(topicOpt) + val partition = options.valueOf(partitionOpt).intValue + val clientId = options.valueOf(clientIdOpt).toString + val totalMessages = options.valueOf(totalMessagesOpt).intValue + val fetchSize = options.valueOf(fetchSizeOpt).intValue + } + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/tools/TransactionalProducerTest.scala b/core/src/main/scala/kafka/tools/TransactionalProducerTest.scala new file mode 100644 index 0000000..7e0607b --- /dev/null +++ b/core/src/main/scala/kafka/tools/TransactionalProducerTest.scala @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.tools + +import java.io._ +import java.math.BigInteger +import java.nio.CharBuffer +import java.nio.charset.Charset +import java.util._ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{Future, CountDownLatch, Executors} + +import joptsimple.OptionParser +import kafka.message.CompressionCodec +import kafka.producer.NewTxProducer +import kafka.utils.{CommandLineUtils, Logging} +import org.apache.kafka.clients.producer.{RecordMetadata, ProducerConfig} + +/** + * Load test for the producer + */ +object TransactionalProducerTest extends Logging { + + def main(args: Array[String]) { + + val config = new TransactionalProducerConf(args) + + val totalBytesSent = new AtomicLong(0) + val totalMessagesSent = new AtomicLong(0) + val executor = Executors.newFixedThreadPool(1) + val done = new CountDownLatch(1) + val startMs = System.currentTimeMillis + + val rand = new java.util.Random + + val file = generateDataFile(config) + + println("Generated data file") + + executor.execute(new TransactionalProducerThread(config, totalBytesSent, totalMessagesSent, done, file, rand)) + + done.await() + + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs) / 1000.0 + val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) + println(("%d, %d, %.2f, %.4f, %d, %.4f").format( + config.compressionCodec.codec, config.messageSize, totalMBSent, + totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) + System.exit(0) + } + + def generateDataFile(conf: TransactionalProducerConf): Option[File] = { + val fileName = conf.dataFileName + val numMessages = conf.totalMessages + val messageSize = conf.messageSize + + var inFile: Option[java.io.File] = None + var fWriter: Option[FileWriter] = None + var bw: Option[java.io.BufferedWriter] = None + + try { + inFile = Some(new File("/tmp/"+fileName)) + fWriter = Some(new FileWriter(inFile.get)) + bw = Some(new BufferedWriter(fWriter.get)) + var message = "" + for (i <- 0 until numMessages) { + var payload = "" + for(j <- 0 until (messageSize-1)) { + payload += "A" + } + message = "%d %s".format(i, payload) + bw.get.write(message) + bw.get.newLine() + } + } finally { + bw.get.close() + } + inFile + } + + class TransactionalProducerThread(val config: TransactionalProducerConf, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val done: CountDownLatch, + val file: Option[java.io.File], + val rand: Random) extends Runnable { + + val props = new Properties() + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "tx-producer-performance") + props.put(ProducerConfig.CLIENT_TX_GROUP_CONFIG, 0.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + val producer = new NewTxProducer(props) + + override def run { + + val numMsgPerTx = config.numMessagesPerTx + var msgSentInCurrentTx = numMsgPerTx + val topic = config.topic + + // Open file to send data + var fReader: Option[FileReader] = None + var br: Option[java.io.BufferedReader] = None + + try { + fReader = Some(new FileReader(file.get)) + br = Some(new BufferedReader(fReader.get)) + + val charset = Charset.forName("UTF-8") + val encoder = charset.newEncoder() + //val decoder = charset.newDecoder() + + var message = "" + var firstLoop = true + // Send all msgs (lines) in file + message = br.get.readLine() + while( message != null) { + if( msgSentInCurrentTx == numMsgPerTx ) { + // hacky way of controlling first commit + if (firstLoop) { + firstLoop = false + } + else { + producer.commit(null) + } + producer.begin() + msgSentInCurrentTx = 0 + } + + // transform string to byteArray with the correct size + val value = encoder.encode(CharBuffer.wrap(message)) + val valueArray = new Array[Byte](message.size) + Array.copy(value.array(), 0, valueArray, 0, message.size) + + // send to the configured topic to the same partition + producer.send(topic, BigInteger.valueOf(0).toByteArray, valueArray) + msgSentInCurrentTx = msgSentInCurrentTx + 1 + message = br.get.readLine() + } + } finally { + producer.commit(null) + br.get.close() + } + done.countDown() + println("Finished sending file") + } + } + + class TransactionalProducerConf(args: Array[String]) { + val parser = new OptionParser + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") + .withRequiredArg + .describedAs("hostname:port,..,hostname:port") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: A single topic to produce to") + .withRequiredArg + .describedAs("topic_name") + .ofType(classOf[String]) + val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") + .withRequiredArg() + .describedAs("message send time gap") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val totalMessagesOpt = parser.accepts("num-messages", "Total number of messages to send") + .withRequiredArg() + .describedAs("total number of msgs") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val messageSizeOpt = parser.accepts("message-size-bytes", "Total size of message payload in bytes") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val dataFileNameOpt = parser.accepts("datafile-name", "If provided, this is the name of the data file") + .withRequiredArg() + .ofType(classOf[java.lang.String]) + .defaultsTo("in_datafile.dat") + val numMessagesPerTxOpt = parser.accepts("total-msg-per-tx", "The number of messages per transaction") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + + val options = parser.parse(args: _*) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) + + val topic = options.valueOf(topicOpt) + val brokerList = options.valueOf(brokerListOpt) + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() + val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() + val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() + val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() + val totalMessages = options.valueOf(totalMessagesOpt).intValue() + val messageSize = options.valueOf(messageSizeOpt).intValue() + val dataFileName = options.valueOf(dataFileNameOpt) + val numMessagesPerTx = options.valueOf(numMessagesPerTxOpt).intValue() + } +} \ No newline at end of file -- 1.7.12.4