ZookeeperConsumerConnector.scala 1) releasePartitionOwnership the issue was between trunk work on KAFKA-286 to fix rebalance issue and branch KAFKA-239 for making zk structures logical 2) addPartitionTopicInfo SyncProducer.scala 1) 1239902:HEAD system_test/broker_failure/bin/run-test.sh <<<<<<< .working readonly num_iterations=5 ======= readonly num_iterations=1 >>>>>>> .merge-right.r1343118 kept working core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala >> just imports core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala >> just imports core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> just imports core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> just imports core/src/test/scala/unit/kafka/producer/ProducerTest.scala >> just imports core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala >> just imports core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala >> just imports core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala <<<<<<< .working class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { ======= class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/utils/Utils.scala 1) imports 2) blank lines from trunk with branch changes for the getNextRandom 3) 3 functions <<<<<<< .working def stringMapToJsonString(jsonDataMap: Map[String, String]): String = { val builder = new StringBuilder builder.append("{ ") var numElements = 0 for ( (key, value) <- jsonDataMap) { if (numElements > 0) builder.append(",") builder.append("\"" + key + "\": ") builder.append("\"" + value + "\"") numElements += 1 } builder.append(" }") builder.toString } ======= def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } } } /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. * @return A circular iterator over the collection. */ def circularIterator[T](coll: Iterable[T]) = { val stream: Stream[T] = for (forever <- Stream.continually(1); t <- coll) yield t stream.iterator } >>>>>>> .merge-right.r1343118 kept all 3 functions (so both branch and trunk) core/src/main/scala/kafka/utils/Logging.scala << last brace and blank space core/src/main/scala/kafka/producer/SyncProducer.scala 1) trunk has a SyncProducer Object, working did not <<<<<<< .working ======= object SyncProducer { val RequestKey: Short = 0 val randomGenerator = new Random } >>>>>>> .merge-right.r1343118 kept branch 2) within verifyRequest <<<<<<< .working if(requestTypeId == RequestKeys.Produce) { val request = ProducerRequest.readFrom(buffer) trace(request.toString) ======= if (requestTypeId == RequestKeys.MultiProduce) { try { val request = MultiProducerRequest.readFrom(buffer) for (produce <- request.produces) { try { for (messageAndOffset <- produce.messages) if (!messageAndOffset.message.isValid) throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid") } catch { case e: Throwable => error("error iterating messages ", e) } } } catch { case e: Throwable => error("error verifying sendbuffer ", e) } >>>>>>> .merge-right.r1343118 kept branch 3) send message <<<<<<< .working def send(producerRequest: ProducerRequest): ProducerResponse = { for( topicData <- producerRequest.data ) { for( partitionData <- topicData.partitionData ) { verifyMessageSize(partitionData.messages) val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] trace("Got message set with " + setSize + " bytes to send") } } val response = doSend(producerRequest) ProducerResponse.deserializeResponse(response._1.buffer) ======= def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { messages.verifyMessageSize(config.maxMessageSize) val setSize = messages.sizeInBytes.asInstanceOf[Int] trace("Got message set with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) >>>>>>> .merge-right.r1343118 } <<<<<<< .working def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { val response = doSend(request) TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer) ======= def multiSend(produces: Array[ProducerRequest]) { for (request <- produces) request.messages.verifyMessageSize(config.maxMessageSize) val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) trace("Got multi message sets with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) >>>>>>> .merge-right.r1343118 kept branch 4) verifyMessageSize did not exist in trunk line blank on trunk kept branch core/src/main/scala/kafka/producer/Producer.scala 1) imports <<<<<<< .working import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import org.I0Itec.zkclient.ZkClient import kafka.common.{QueueFullException, InvalidConfigException} ======= import java.util.Properties import kafka.cluster.{Partition, Broker} import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException} import kafka.api.ProducerRequest >>>>>>> .merge-right.r1343118 kept branch 2) within the class itself <<<<<<< .working if(!Utils.propertyExists(config.zkConnect)) throw new InvalidConfigException("zk.connect property must be specified in the producer") if (config.batchSize > config.queueSize) throw new InvalidConfigException("Batch size can't be larger than queue size.") private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize) private var sync: Boolean = true private var producerSendThread: ProducerSendThread[K,V] = null config.producerType match { case "sync" => case "async" => sync = false val asyncProducerID = Utils.getNextRandomInt producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, eventHandler, config.queueTime, config.batchSize) producerSendThread.start case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") ======= private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled private val zkEnabled = Utils.propertyExists(config.zkConnect) if(brokerPartitionInfo == null) { zkEnabled match { case true => val zkProps = new Properties() zkProps.put("zk.connect", config.zkConnect) zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString) zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString) zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString) brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk) case false => brokerPartitionInfo = new ConfigBrokerPartitionInfo(config) } >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/ProducerConfig.scala 1) fixes from deprecated use <<<<<<< .working if(brokerList != null) throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead") ======= if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") >>>>>>> .merge-right.r1343118 kept branch 2) more configs <<<<<<< .working if(zkConnect == null) throw new InvalidConfigException("zk.connect property is required") ======= if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect)) throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 1) trunk trying to put send back in <<<<<<< .working def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) } def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = { val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] for (event <- events) { val topicPartitionsList = getPartitionListForTopic(event) val totalNumPartitions = topicPartitionsList.length val partitionIndex = getPartition(event.getKey, totalNumPartitions) val brokerPartition = topicPartitionsList(partitionIndex) val leaderBrokerId = brokerPartition.leader match { case Some(leader) => leader.brokerId case None => -1 // postpone the failure until the send operation, so that requests for other brokers are handled correctly } var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] case None => dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } val topicAndPartition = (event.getTopic, brokerPartition.partId) var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]] case None => dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]] dataPerBroker.put(topicAndPartition, dataPerTopicPartition) } dataPerTopicPartition.append(event) ======= private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray val maxAttempts = config.numRetries + 1 var attemptsRemaining = maxAttempts var sent = false while (attemptsRemaining > 0 && !sent) { attemptsRemaining -= 1 try { syncProducer.multiSend(requests) trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)" .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining)) sent = true } catch { case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining)) if (attemptsRemaining == 0) throw e } } >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/async/ProducerSendThread.scala 1) processEvents conflict between changes from KAFKA-300 (on branch) and KAFKA-326 (on trunk) <<<<<<< .working if(currentQueueItem.getKey == null) trace("Dequeued item for topic %s, no partition key, data: %s" .format(currentQueueItem.getTopic, currentQueueItem.getData.toString)) else ======= trace("Dequeued item for topic %s and partition %d" .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) // handle the dequeued current item if(cbkHandler != null) events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) else { if (currentQueueItem != null) >>>>>>> .merge-right.r1343118 kept branch, cbkHandler is no longer code on the branch core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala >> just imports core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 1) makeNext conflict from KAFKA-277 <<<<<<< .working val isInnerDone = innerDone() isInnerDone match { case true => makeNextOuter case false => { val messageAndOffset = innerIter.next if (!innerIter.hasNext) currValidBytes += 4 + lastMessageSize new MessageAndOffset(messageAndOffset.message, currValidBytes) ======= if(isShallow){ makeNextOuter } else{ val isInnerDone = innerDone() debug("makeNext() in internalIterator: innerDone = " + isInnerDone) isInnerDone match { case true => makeNextOuter case false => { val messageAndOffset = innerIter.next if (!innerIter.hasNext) currValidBytes += 4 + lastMessageSize new MessageAndOffset(messageAndOffset.message, currValidBytes) } >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/server/KafkaServer.scala 1) case upper <<<<<<< .working val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) ======= val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/server/KafkaRequestHandlers.scala >> this file was removed and core/src/main/scala/kafka/server/KafkaRequestHandler.scala was created there were chagnes to KafkaRequestHanderls.scala => KAFKA-272 so I manually modified KafkaRequestHandler to have those changes too so the app can compile however not sure about the changes within here so much has changed core/src/main/scala/kafka/consumer/SimpleConsumer.scala 1) SimpleConsumer - KAFKA-305 getting overwritten from trunk merge because of single update of a line to tweak performance of a single fetch request which is NA <<<<<<< .working private def disconnect() = { if(blockingChannel.isConnected) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } ======= val channel = SocketChannel.open debug("Connected to " + address + " for fetching.") channel.configureBlocking(true) channel.socket.setReceiveBufferSize(bufferSize) channel.socket.setSoTimeout(soTimeout) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) channel.connect(address) trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize) trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout) channel >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/consumer/FetcherRunnable.scala 1) svn merge, phhh <<<<<<< .working ======= val response = simpleConsumer.multifetch(fetches : _*) trace("recevied response from fetch request: " + fetches.toString) >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/consumer/TopicCount.scala 1) KAFKA-249 merge went funky, keeping trunk <<<<<<< .working def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = { var topMap : Map[String,Int] = null try { JSON.parseFull(jsonString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + jsonString) } } catch { case e => error("error parsing consumer json string " + jsonString, e) throw e } ======= private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] >>>>>>> .merge-right.r1343118 <<<<<<< .working new TopicCount(consumerIdSting, topMap) } } private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) { def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = { ======= def dbString: String protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, topicCountMap: Map[String, Int]) = { >>>>>>> .merge-right.r1343118 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 1) KAFKA-300 Int for Parition conflicting with KAFKA-249 with topicThreadIdAndQueues <<<<<<< .working private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] ======= private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] >>>>>>> .merge-right.r1343118 keeping new topicRegistry and changing over for the topicThreadIdAndQueues to preserve KAFKA-300 and KAFKA-249 together 2) KAFKA-249 merge issues the right changes were made confliced with working <<<<<<< .working var consumerUuid : String = null config.consumerId match { case Some(consumerId) => // for testing only consumerUuid = consumerId case None => // generate unique consumerId automatically val uuid = UUID.randomUUID() consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8) ) } val consumerIdString = config.groupId + "_" + consumerUuid val topicCount = new TopicCount(consumerIdString, topicCountMap) ======= val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic >>>>>>> .merge-right.r1343118 kept trunk removed working 3) KAFKA-239 merge futzed <<<<<<< .working // this API is used by unit tests only def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { ======= private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { >>>>>>> .merge-right.r1343118 keeping branch 4) KAFKA-265 not getting integrated into the ZKRebalancerListener class right <<<<<<< .working private val dirs = new ZKGroupDirs(group) private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]() private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() ======= private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { info("starting watcher executor thread for consumer " + consumerIdString) var doRebalance = false while (!isShuttingDown.get) { try { lock.lock() try { if (!isWatcherTriggered) cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag } finally { doRebalance = isWatcherTriggered isWatcherTriggered = false lock.unlock() } if (doRebalance) syncedRebalance } catch { case t => error("error during syncedRebalance", t) } } info("stopping watcher executor thread for consumer " + consumerIdString) } } watcherExecutorThread.start() >>>>>>> .merge-right.r1343118 removing private val dirs from branch keeping everything else from branch + everything from trunk 5) KAFKA-286 introduced deletePartitionOwnershipFromZK for rebalancing defect and KAFKA-239 refactored code to re-write ZK data structures <<<<<<< .working for ((topic, infos) <- topicRegistry) { for(partition <- infos.keys) { val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString) deletePath(zkClient, partitionOwnerPath) debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath) } ======= for ((topic, infos) <- localTopicRegistry) { for(partition <- infos.keys) deletePartitionOwnershipFromZK(topic, partition.toString) localTopicRegistry.remove(topic) >>>>>>> .merge-right.r1343118 keeping trunk so that defect is fixed 239 may need to get a pass again after commit 6) KAFKA-262 removed this function, svn merge phhhh <<<<<<< .working private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]], newPartMap: Map[String, Seq[String]], oldPartMap: Map[String, Seq[String]], newConsumerMap: Map[String,List[String]], oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = { var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]() for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap ) if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic)) relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet) relevantTopicThreadIdsMap } ======= >>>>>>> .merge-right.r1343118 keeping trunk 7) conflict with KAFKA-300 changing the Partition to Int and with KAFKA-262 fixing an issue with the line changed <<<<<<< .working for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { topicRegistry.remove(topic) topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) ======= var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] >>>>>>> .merge-right.r1343118 so, keeping the trunk but modifying it to have the proper Int for partition 8) space issue <<<<<<< .working } else ======= }else { >>>>>>> .merge-right.r1343118 keeping branch 9) conflict with KAFKA-300 changing the Partition to Int and with KAFKA-262 fixing an issue with the line changed <<<<<<< .working private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String, ======= private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]], topicDirs: ZKGroupTopicDirs, partitionString: String, >>>>>>> .merge-right.r1343118 so, keeping the trunk but modifying it to have the proper Int for partition 10) svn merge phhh <<<<<<< .working val queue = queues.get((topic, consumerThreadId)) ======= val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) >>>>>>> .merge-right.r1343118 keeping trunk perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala >> just imports contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java >> just imports examples/src/main/java/kafka/examples/SimpleConsumerDemo.java >> just imports ***** v3 updates from the svn commit: r1343255 via KAFKA-49 ***** G core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala <<<<<<< .working @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) props.put("buffer.size", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") props.put("max.message.size", "100") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) // #1 - test that we get an error when partition does not belong to broker in response val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) val response = producer.send(request) Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) Assert.assertEquals(response.errors.length, response.offsets.length) Assert.assertEquals(3, response.errors.length) response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _)) response.offsets.foreach(Assert.assertEquals(-1L, _)) // #2 - test that we get correct offsets when partition is owned by broker CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500) CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500) Thread.sleep(500) val response2 = producer.send(request) Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(response2.errors.length, response2.offsets.length) Assert.assertEquals(3, response2.errors.length) // the first and last message should have been accepted by broker Assert.assertEquals(0, response2.errors(0)) Assert.assertEquals(0, response2.errors(2)) Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) // the middle message should have been rejected because broker doesn't lead partition Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(-1, response2.offsets(1)) } @Test def testProducerCanTimeout() { val timeoutMs = 500 val server = servers.head val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) props.put("buffer.size", "102400") props.put("socket.timeout.ms", String.valueOf(timeoutMs)) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) val request = TestUtils.produceRequest("topic1", 0, messages) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled server.requestHandlerPool.shutdown() val t1 = SystemTime.milliseconds try { val response2 = producer.send(request) Assert.fail("Should have received timeout exception since request handling is stopped.") } catch { case e: SocketTimeoutException => /* success */ case e => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) } } ======= @Test def testCompressedMessageSizeTooLarge() { val props = new Properties() props.put("host", "localhost") props.put("port", server.socketServer.port.toString) props.put("buffer.size", "102400") props.put("connect.timeout.ms", "300") props.put("reconnect.interval", "500") props.put("max.message.size", "100") val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new Array[Message](10) import Array.fill var a = 0 for( a <- 0 to 9){ val bytes = fill(20){a.asInstanceOf[Byte]} messages(a) = new Message(bytes) } var failed = false /** After compression, the compressed message has size 118 **/ try { producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages: _*)) }catch { case e: MessageSizeTooLargeException => failed = true } Assert.assertTrue(failed) } }>>>>>>> .merge-right.r1343118 kept both G core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala <<<<<<< .working props.put("batch.size", "1") ======= props.put("serializer.class", "kafka.producer.StringSerializer") props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new AsyncProducerConfig(props) >>>>>>> .merge-right.r1343118 kept both