1239902:HEAD system_test/broker_failure/bin/run-test.sh <<<<<<< .working readonly num_iterations=5 ======= readonly num_iterations=1 >>>>>>> .merge-right.r1343118 kept working core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala >> just imports core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala >> just imports core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala >> just imports core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala >> just imports core/src/test/scala/unit/kafka/producer/ProducerTest.scala >> just imports core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala >> just imports core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala >> just imports core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala <<<<<<< .working class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { ======= class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/utils/Utils.scala 1) imports 2) blank lines from trunk with branch changes for the getNextRandom 3) 3 functions <<<<<<< .working def stringMapToJsonString(jsonDataMap: Map[String, String]): String = { val builder = new StringBuilder builder.append("{ ") var numElements = 0 for ( (key, value) <- jsonDataMap) { if (numElements > 0) builder.append(",") builder.append("\"" + key + "\": ") builder.append("\"" + value + "\"") numElements += 1 } builder.append(" }") builder.toString } ======= def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } } } /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. * @return A circular iterator over the collection. */ def circularIterator[T](coll: Iterable[T]) = { val stream: Stream[T] = for (forever <- Stream.continually(1); t <- coll) yield t stream.iterator } >>>>>>> .merge-right.r1343118 kept all 3 functions (so both branch and trunk) core/src/main/scala/kafka/utils/Logging.scala << last brace and blank space core/src/main/scala/kafka/producer/SyncProducer.scala 1) trunk has a SyncProducer Object, working did not <<<<<<< .working ======= object SyncProducer { val RequestKey: Short = 0 val randomGenerator = new Random } >>>>>>> .merge-right.r1343118 kept branch 2) within verifyRequest <<<<<<< .working if(requestTypeId == RequestKeys.Produce) { val request = ProducerRequest.readFrom(buffer) trace(request.toString) ======= if (requestTypeId == RequestKeys.MultiProduce) { try { val request = MultiProducerRequest.readFrom(buffer) for (produce <- request.produces) { try { for (messageAndOffset <- produce.messages) if (!messageAndOffset.message.isValid) throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid") } catch { case e: Throwable => error("error iterating messages ", e) } } } catch { case e: Throwable => error("error verifying sendbuffer ", e) } >>>>>>> .merge-right.r1343118 kept branch 3) send message <<<<<<< .working def send(producerRequest: ProducerRequest): ProducerResponse = { for( topicData <- producerRequest.data ) { for( partitionData <- topicData.partitionData ) { verifyMessageSize(partitionData.messages) val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] trace("Got message set with " + setSize + " bytes to send") } } val response = doSend(producerRequest) ProducerResponse.deserializeResponse(response._1.buffer) ======= def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { messages.verifyMessageSize(config.maxMessageSize) val setSize = messages.sizeInBytes.asInstanceOf[Int] trace("Got message set with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) >>>>>>> .merge-right.r1343118 } <<<<<<< .working def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { val response = doSend(request) TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer) ======= def multiSend(produces: Array[ProducerRequest]) { for (request <- produces) request.messages.verifyMessageSize(config.maxMessageSize) val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) trace("Got multi message sets with " + setSize + " bytes to send") send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) >>>>>>> .merge-right.r1343118 kept branch 4) verifyMessageSize did not exist in trunk line blank on trunk kept branch core/src/main/scala/kafka/producer/Producer.scala 1) imports <<<<<<< .working import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import org.I0Itec.zkclient.ZkClient import kafka.common.{QueueFullException, InvalidConfigException} ======= import java.util.Properties import kafka.cluster.{Partition, Broker} import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException} import kafka.api.ProducerRequest >>>>>>> .merge-right.r1343118 kept branch 2) within the class itself <<<<<<< .working if(!Utils.propertyExists(config.zkConnect)) throw new InvalidConfigException("zk.connect property must be specified in the producer") if (config.batchSize > config.queueSize) throw new InvalidConfigException("Batch size can't be larger than queue size.") private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize) private var sync: Boolean = true private var producerSendThread: ProducerSendThread[K,V] = null config.producerType match { case "sync" => case "async" => sync = false val asyncProducerID = Utils.getNextRandomInt producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, eventHandler, config.queueTime, config.batchSize) producerSendThread.start case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") ======= private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled private val zkEnabled = Utils.propertyExists(config.zkConnect) if(brokerPartitionInfo == null) { zkEnabled match { case true => val zkProps = new Properties() zkProps.put("zk.connect", config.zkConnect) zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString) zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString) zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString) brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk) case false => brokerPartitionInfo = new ConfigBrokerPartitionInfo(config) } >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/ProducerConfig.scala 1) fixes from deprecated use <<<<<<< .working if(brokerList != null) throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead") ======= if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") >>>>>>> .merge-right.r1343118 kept branch 2) more configs <<<<<<< .working if(zkConnect == null) throw new InvalidConfigException("zk.connect property is required") ======= if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect)) throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala 1) trunk trying to put send back in <<<<<<< .working def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) } def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = { val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] for (event <- events) { val topicPartitionsList = getPartitionListForTopic(event) val totalNumPartitions = topicPartitionsList.length val partitionIndex = getPartition(event.getKey, totalNumPartitions) val brokerPartition = topicPartitionsList(partitionIndex) val leaderBrokerId = brokerPartition.leader match { case Some(leader) => leader.brokerId case None => -1 // postpone the failure until the send operation, so that requests for other brokers are handled correctly } var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] case None => dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } val topicAndPartition = (event.getTopic, brokerPartition.partId) var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]] case None => dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]] dataPerBroker.put(topicAndPartition, dataPerTopicPartition) } dataPerTopicPartition.append(event) ======= private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray val maxAttempts = config.numRetries + 1 var attemptsRemaining = maxAttempts var sent = false while (attemptsRemaining > 0 && !sent) { attemptsRemaining -= 1 try { syncProducer.multiSend(requests) trace("kafka producer sent messages for topics %s to broker %s:%d (on attempt %d)" .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port, maxAttempts - attemptsRemaining)) sent = true } catch { case e => warn("Error sending messages, %d attempts remaining".format(attemptsRemaining)) if (attemptsRemaining == 0) throw e } } >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/producer/async/ProducerSendThread.scala 1) processEvents conflict between changes from KAFKA-300 (on branch) and KAFKA-326 (on trunk) <<<<<<< .working if(currentQueueItem.getKey == null) trace("Dequeued item for topic %s, no partition key, data: %s" .format(currentQueueItem.getTopic, currentQueueItem.getData.toString)) else ======= trace("Dequeued item for topic %s and partition %d" .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) // handle the dequeued current item if(cbkHandler != null) events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) else { if (currentQueueItem != null) >>>>>>> .merge-right.r1343118 kept branch, cbkHandler is no longer code on the branch core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala >> just imports core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 1) makeNext conflict from KAFKA-277 <<<<<<< .working val isInnerDone = innerDone() isInnerDone match { case true => makeNextOuter case false => { val messageAndOffset = innerIter.next if (!innerIter.hasNext) currValidBytes += 4 + lastMessageSize new MessageAndOffset(messageAndOffset.message, currValidBytes) ======= if(isShallow){ makeNextOuter } else{ val isInnerDone = innerDone() debug("makeNext() in internalIterator: innerDone = " + isInnerDone) isInnerDone match { case true => makeNextOuter case false => { val messageAndOffset = innerIter.next if (!innerIter.hasNext) currValidBytes += 4 + lastMessageSize new MessageAndOffset(messageAndOffset.message, currValidBytes) } >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/server/KafkaServer.scala 1) case upper <<<<<<< .working val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) ======= val CLEAN_SHUTDOWN_FILE = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/server/KafkaRequestHandlers.scala >> this file was removed and core/src/main/scala/kafka/server/KafkaRequestHandler.scala was created there were chagnes to KafkaRequestHanderls.scala => KAFKA-272 so I manually modified KafkaRequestHandler to have those changes too so the app can compile however not sure about the changes within here so much has changed core/src/main/scala/kafka/consumer/SimpleConsumer.scala 1) SimpleConsumer - KAFKA-305 getting overwritten from trunk merge because of single update of a line to tweak performance of a single fetch request which is NA <<<<<<< .working private def disconnect() = { if(blockingChannel.isConnected) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } ======= val channel = SocketChannel.open debug("Connected to " + address + " for fetching.") channel.configureBlocking(true) channel.socket.setReceiveBufferSize(bufferSize) channel.socket.setSoTimeout(soTimeout) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) channel.connect(address) trace("requested receive buffer size=" + bufferSize + " actual receive buffer size= " + channel.socket.getReceiveBufferSize) trace("soTimeout=" + soTimeout + " actual soTimeout= " + channel.socket.getSoTimeout) channel >>>>>>> .merge-right.r1343118 kept branch core/src/main/scala/kafka/consumer/FetcherRunnable.scala 1) svn merge, phhh <<<<<<< .working ======= val response = simpleConsumer.multifetch(fetches : _*) trace("recevied response from fetch request: " + fetches.toString) >>>>>>> .merge-right.r1343118 kept trunk core/src/main/scala/kafka/consumer/TopicCount.scala 1) KAFKA-249 merge went funky, keeping trunk <<<<<<< .working def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = { var topMap : Map[String,Int] = null try { JSON.parseFull(jsonString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + jsonString) } } catch { case e => error("error parsing consumer json string " + jsonString, e) throw e } ======= private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] >>>>>>> .merge-right.r1343118 <<<<<<< .working new TopicCount(consumerIdSting, topMap) } } private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) { def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = { ======= def dbString: String protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, topicCountMap: Map[String, Int]) = { >>>>>>> .merge-right.r1343118 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 1) KAFKA-300 Int for Parition conflicting with KAFKA-249 with topicThreadIdAndQueues <<<<<<< .working private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] ======= private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] >>>>>>> .merge-right.r1343118 keeping new topicRegistry and changing over for the topicThreadIdAndQueues to preserve KAFKA-300 and KAFKA-249 together 2) KAFKA-249 merge issues the right changes were made confliced with working <<<<<<< .working var consumerUuid : String = null config.consumerId match { case Some(consumerId) => // for testing only consumerUuid = consumerId case None => // generate unique consumerId automatically val uuid = UUID.randomUUID() consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8) ) } val consumerIdString = config.groupId + "_" + consumerUuid val topicCount = new TopicCount(consumerIdString, topicCountMap) ======= val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic >>>>>>> .merge-right.r1343118 kept trunk removed working 3) KAFKA-239 merge futzed <<<<<<< .working // this API is used by unit tests only def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { ======= private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) { >>>>>>> .merge-right.r1343118 keeping branch 4) KAFKA-265 not getting integrated into the ZKRebalancerListener class right <<<<<<< .working private val dirs = new ZKGroupDirs(group) private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]() private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() ======= private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { info("starting watcher executor thread for consumer " + consumerIdString) var doRebalance = false while (!isShuttingDown.get) { try { lock.lock() try { if (!isWatcherTriggered) cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag } finally { doRebalance = isWatcherTriggered isWatcherTriggered = false lock.unlock() } if (doRebalance) syncedRebalance } catch { case t => error("error during syncedRebalance", t) } } info("stopping watcher executor thread for consumer " + consumerIdString) } } watcherExecutorThread.start() >>>>>>> .merge-right.r1343118 removing private val dirs from branch keeping everything else from branch + everything from trunk 5) KAFKA-286 introduced deletePartitionOwnershipFromZK for rebalancing defect and KAFKA-239 refactored code to re-write ZK data structures <<<<<<< .working for ((topic, infos) <- topicRegistry) { for(partition <- infos.keys) { val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString) deletePath(zkClient, partitionOwnerPath) debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath) } ======= for ((topic, infos) <- localTopicRegistry) { for(partition <- infos.keys) deletePartitionOwnershipFromZK(topic, partition.toString) localTopicRegistry.remove(topic) >>>>>>> .merge-right.r1343118 keeping trunk so that defect is fixed 239 may need to get a pass again after commit 6) KAFKA-262 removed this function, svn merge phhhh <<<<<<< .working private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]], newPartMap: Map[String, Seq[String]], oldPartMap: Map[String, Seq[String]], newConsumerMap: Map[String,List[String]], oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = { var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]() for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap ) if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic)) relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet) relevantTopicThreadIdsMap } ======= >>>>>>> .merge-right.r1343118 keeping trunk 7) conflict with KAFKA-300 changing the Partition to Int and with KAFKA-262 fixing an issue with the line changed <<<<<<< .working for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { topicRegistry.remove(topic) topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) ======= var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] >>>>>>> .merge-right.r1343118 so, keeping the trunk but modifying it to have the proper Int for partition 8) space issue <<<<<<< .working } else ======= }else { >>>>>>> .merge-right.r1343118 keeping trunk needs the extra paren 9) conflict with KAFKA-300 changing the Partition to Int and with KAFKA-262 fixing an issue with the line changed <<<<<<< .working private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String, ======= private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]], topicDirs: ZKGroupTopicDirs, partitionString: String, >>>>>>> .merge-right.r1343118 so, keeping the trunk but modifying it to have the proper Int for partition 10) svn merge phhh <<<<<<< .working val queue = queues.get((topic, consumerThreadId)) ======= val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) >>>>>>> .merge-right.r1343118 keeping trunk perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala >> just imports contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java >> just imports examples/src/main/java/kafka/examples/SimpleConsumerDemo.java >> just imports