Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      Currently, the only place that ZK is actually used is in BrokerPartitionInfo. We use ZK to get a list of brokers for making TopicMetadataRequest requests. Instead, we can provide a list of brokers in the producer config directly. That way, the producer client is no longer dependant on ZK.

      1. kafka_369_v9.diff
        87 kB
        Yang Ye
      2. kafka_369_v8.diff
        84 kB
        Yang Ye
      3. kafka_369_v7.diff
        84 kB
        Yang Ye
      4. kafka_369_v6.diff
        82 kB
        Yang Ye
      5. kafka_369_v5.diff
        69 kB
        Yang Ye
      6. kafka_369_v4.diff
        69 kB
        Yang Ye
      7. kafka_369_v3.diff
        69 kB
        Yang Ye
      8. kafka_369_v2.diff
        62 kB
        Yang Ye
      9. kafka_369_v1.diff
        40 kB
        Yang Ye

        Activity

        Jun Rao created issue -
        Yang Ye made changes -
        Field Original Value New Value
        Assignee Yang Ye [ yeyangever ]
        Hide
        Yang Ye added a comment -

        1. Added a function getBrokerListStrFromConfigs() in class TestUtils, which creates the broker list string from a sequence of KafkaConfig's

        2. remove every part of zk.connect in the producer side

        3. Added three wait-until-topic-is-registered-in-zookeeper block in ProducerTest to eliminate potential transient failure.

        Show
        Yang Ye added a comment - 1. Added a function getBrokerListStrFromConfigs() in class TestUtils, which creates the broker list string from a sequence of KafkaConfig's 2. remove every part of zk.connect in the producer side 3. Added three wait-until-topic-is-registered-in-zookeeper block in ProducerTest to eliminate potential transient failure.
        Yang Ye made changes -
        Attachment kafka_369_v1.diff [ 12538881 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v1. Some comments:

        1. ProducerConfig:
        1.1 change broker.list's format to host:port; Also, change the comment and explain this is just for bootstrapping and it just needs to be a subset of all brokers in the cluster.
        1.2 We should make broker.list a required property by not providing a default value. If we do the following, an exception will be thrown if the property is not specified. There is no need to test for null any more.
        Utils.getString(props, "broker.list")
        1.3 There is no need for ProducerConfig to extend ZKConfig.

        2. Producer: There is no need to check the existence of brokerList since it will be handled in ProducerConfig.

        3. BrokerPartitionInfo:
        3.1 It's better to rename it to something like MetaDataCache
        3.2 This class shouldn't depend on ProducerPool, which is supposed to be used for caching SyncProducer objects for sending produce request. Instead, this class only needs to know broker.list. The reason is that broker.list is just for bootstrapping and we can pass in a VIP instead of a real host. Each time we need to send a getMetaData request, we can just pick a host from broker.list, instantiate a SyncProducer, send the request, get the response, and close the SyncProducer. Since getMetaData is going to be used rarely, there is no need to cache the connection. It also avoids the timeout problem with VIP. For new brokers that we get in updateInfo(), we can instantiate a new SyncProducer and add it to ProducerPool.

        4. ProducerPool
        4.1 If we do the above in BrokerPartitionInfo, we can get rid of the following methods.
        def addProducers(config: ProducerConfig)
        def getAnyProducer: SyncProducer

        5. KafkaConfig: The default value of hostName should be InetAddress.getLocalHost.getHostAddress. We can then get rid of the hostName null check in KafkaZookeeper.registerBrokerInZk().

        6. KafkaKig4jAppender: get rid of the commented out lines related to ZkConnect; get rid of host and port

        7. KafkaKig4jAppenderTest: testZkConnectLog4jAppends should be renamed since there is no ZK connect any more.

        8. ProducerTest:
        8.1 all method of testZK* should be renamed properly.
        8.2 Quite a few places have the following checks. Can't we just combine them using a check for leaderIsLive?
        assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
        AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
        zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
        TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)

        9. Utils.getAllBrokersFromBrokerList(): There is no need to get the broker id from broker.list. We can just assign some sequential ids.

        Show
        Jun Rao added a comment - Thanks for patch v1. Some comments: 1. ProducerConfig: 1.1 change broker.list's format to host:port; Also, change the comment and explain this is just for bootstrapping and it just needs to be a subset of all brokers in the cluster. 1.2 We should make broker.list a required property by not providing a default value. If we do the following, an exception will be thrown if the property is not specified. There is no need to test for null any more. Utils.getString(props, "broker.list") 1.3 There is no need for ProducerConfig to extend ZKConfig. 2. Producer: There is no need to check the existence of brokerList since it will be handled in ProducerConfig. 3. BrokerPartitionInfo: 3.1 It's better to rename it to something like MetaDataCache 3.2 This class shouldn't depend on ProducerPool, which is supposed to be used for caching SyncProducer objects for sending produce request. Instead, this class only needs to know broker.list. The reason is that broker.list is just for bootstrapping and we can pass in a VIP instead of a real host. Each time we need to send a getMetaData request, we can just pick a host from broker.list, instantiate a SyncProducer, send the request, get the response, and close the SyncProducer. Since getMetaData is going to be used rarely, there is no need to cache the connection. It also avoids the timeout problem with VIP. For new brokers that we get in updateInfo(), we can instantiate a new SyncProducer and add it to ProducerPool. 4. ProducerPool 4.1 If we do the above in BrokerPartitionInfo, we can get rid of the following methods. def addProducers(config: ProducerConfig) def getAnyProducer: SyncProducer 5. KafkaConfig: The default value of hostName should be InetAddress.getLocalHost.getHostAddress. We can then get rid of the hostName null check in KafkaZookeeper.registerBrokerInZk(). 6. KafkaKig4jAppender: get rid of the commented out lines related to ZkConnect; get rid of host and port 7. KafkaKig4jAppenderTest: testZkConnectLog4jAppends should be renamed since there is no ZK connect any more. 8. ProducerTest: 8.1 all method of testZK* should be renamed properly. 8.2 Quite a few places have the following checks. Can't we just combine them using a check for leaderIsLive? assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.getTopicMetaDataFromZK(List("new-topic"), zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) 9. Utils.getAllBrokersFromBrokerList(): There is no need to get the broker id from broker.list. We can just assign some sequential ids.
        Hide
        Yang Ye added a comment -

        In AsyncProducerTest.testFailedSendRetryLogic(), we may see following error messges, it won't affect the success of the test. It's because in handle() function of DefaultEventHandler,

        if (outstandingProduceRequests.size > 0) {
        // back off and update the topic metadata cache before attempting another send operation
        Thread.sleep(config.producerRetryBackoffMs)
        // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))

        it will refresh the cached metadata, but the broker is not up.

        This doesn't affect the correctness of the test. Maybe we need try to eliminate the error messages.

        [2012-08-03 19:11:44,182] ERROR Connection attempt to 127.0.0.1:52955 failed, next attempt in 100 ms (kafka.producer.SyncProducer:99)
        java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:161)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:182)
        at kafka.producer.SyncProducer.doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:116)
        at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:86)
        at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:81)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:81)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:53)
        at kafka.utils.Utils$.swallow(Utils.scala:429)
        at kafka.utils.Logging$class.swallowError(Logging.scala:102)
        at kafka.utils.Utils$.swallowError(Utils.scala:40)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
        at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:438)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at junit.framework.TestCase.runTest(TestCase.java:164)
        at junit.framework.TestCase.runBare(TestCase.java:130)
        at junit.framework.TestResult$1.protect(TestResult.java:110)
        at junit.framework.TestResult.runProtected(TestResult.java:128)
        at junit.framework.TestResult.run(TestResult.java:113)
        at junit.framework.TestCase.run(TestCase.java:120)
        at junit.framework.TestSuite.runTest(TestSuite.java:228)
        at junit.framework.TestSuite.run(TestSuite.java:223)
        at org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:121)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:71)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:199)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:62)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

        Show
        Yang Ye added a comment - In AsyncProducerTest.testFailedSendRetryLogic(), we may see following error messges, it won't affect the success of the test. It's because in handle() function of DefaultEventHandler, if (outstandingProduceRequests.size > 0) { // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.producerRetryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) it will refresh the cached metadata, but the broker is not up. This doesn't affect the correctness of the test. Maybe we need try to eliminate the error messages. [2012-08-03 19:11:44,182] ERROR Connection attempt to 127.0.0.1:52955 failed, next attempt in 100 ms (kafka.producer.SyncProducer:99) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:161) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:182) at kafka.producer.SyncProducer.doSend(SyncProducer.scala:74) at kafka.producer.SyncProducer.send(SyncProducer.scala:116) at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:86) at kafka.producer.BrokerPartitionInfo$$anonfun$updateInfo$1.apply(BrokerPartitionInfo.scala:81) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:44) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:81) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:53) at kafka.utils.Utils$.swallow(Utils.scala:429) at kafka.utils.Logging$class.swallowError(Logging.scala:102) at kafka.utils.Utils$.swallowError(Utils.scala:40) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53) at kafka.producer.AsyncProducerTest.testFailedSendRetryLogic(AsyncProducerTest.scala:438) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:228) at junit.framework.TestSuite.run(TestSuite.java:223) at org.junit.internal.runners.OldTestClassRunner.run(OldTestClassRunner.java:35) at org.junit.runner.JUnitCore.run(JUnitCore.java:121) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:71) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:199) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
        Yang Ye made changes -
        Attachment kafka_369_v2.diff [ 12539135 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Some more comments:

        20. ProducerConfig:
        20.1 change the comment and explain this is just for bootstrapping and it just needs to be a subset of all brokers in the cluster.
        20.2 use Utils.getString(props, "broker.list"). It will make sure the property exists.

        21. Producer: get rid of the following lines since ProducerConfig makes sure that brokerlist exists.
        if(!Utils.propertyExists(config.brokerList))
        throw new InvalidConfigException("broker.list property must be specified in the producer")

        22. BrokerPartitionInfo:
        22.1 Why is the following line there in the constructor?
        error("The broker list is : " + brokerList)
        22.2 updateInfo:
        22.2.1 we should close the producer even when there are exceptions
        22.2..2 if we get any socket level exception when getting metadata, we should retry getting the metadata using the next broker in the list.
        22.2.3 should call producerPool.updateProducer after the error checking

        23. ProducerPool:
        23.1 remove unused import
        23.2 updateProducer: need to sync on the lock.
        23.3 updateProducer: newBroker.+= get rid of the dot

        24. KafkaZookeeper:
        24.1 registerBrokerInZk: The following line should be val hostName = config.hostName.
        val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName

        25. ProducerConfig,Utils: remove unused import

        26. DefaultEventHandler:
        26.1 constructor: get rid of the comment "this api is for testing"
        26.2 send(): get rid if the following line:
        error("pool broker id: " + brokerId)

        Show
        Jun Rao added a comment - Thanks for patch v2. Some more comments: 20. ProducerConfig: 20.1 change the comment and explain this is just for bootstrapping and it just needs to be a subset of all brokers in the cluster. 20.2 use Utils.getString(props, "broker.list"). It will make sure the property exists. 21. Producer: get rid of the following lines since ProducerConfig makes sure that brokerlist exists. if(!Utils.propertyExists(config.brokerList)) throw new InvalidConfigException("broker.list property must be specified in the producer") 22. BrokerPartitionInfo: 22.1 Why is the following line there in the constructor? error("The broker list is : " + brokerList) 22.2 updateInfo: 22.2.1 we should close the producer even when there are exceptions 22.2..2 if we get any socket level exception when getting metadata, we should retry getting the metadata using the next broker in the list. 22.2.3 should call producerPool.updateProducer after the error checking 23. ProducerPool: 23.1 remove unused import 23.2 updateProducer: need to sync on the lock. 23.3 updateProducer: newBroker.+= get rid of the dot 24. KafkaZookeeper: 24.1 registerBrokerInZk: The following line should be val hostName = config.hostName. val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName 25. ProducerConfig,Utils: remove unused import 26. DefaultEventHandler: 26.1 constructor: get rid of the comment "this api is for testing" 26.2 send(): get rid if the following line: error("pool broker id: " + brokerId)
        Hide
        Yang Ye added a comment -

        Basically fix all the issues raised in the comments. I didn't use the "warmUp" function because it gets back "LeaderNotAvaiable" metadata info from the server. We may add a new jira to look into this problem and adding the batching warm up

        Show
        Yang Ye added a comment - Basically fix all the issues raised in the comments. I didn't use the "warmUp" function because it gets back "LeaderNotAvaiable" metadata info from the server. We may add a new jira to look into this problem and adding the batching warm up
        Yang Ye made changes -
        Attachment kafka_369_v3.diff [ 12539389 ]
        Hide
        Yang Ye added a comment -


        Change the error handling of "broker.list" config and refactoring the updateInfo() function

        Show
        Yang Ye added a comment - Change the error handling of "broker.list" config and refactoring the updateInfo() function
        Yang Ye made changes -
        Attachment kafka_369_v4.diff [ 12539404 ]
        Hide
        Yang Ye added a comment -

        I am thinking about the exception handling in updateInfo() function. The topic level error code maybe NoError while there may still be partition-level error code. In this case, it's not appropriate for us to put the (topic, topicMetadata) into the cache. Of course we can put in the "tailed" topicMetatData, which only contains partitionMetadata whose error code is NoError, but this is kind of hacky.

        Also it's not bad to just throw exception to the client, because this function is called on demand — when the client wants to send messages to a topic. If some topic is erroneous, the send need to be failed.

        Show
        Yang Ye added a comment - I am thinking about the exception handling in updateInfo() function. The topic level error code maybe NoError while there may still be partition-level error code. In this case, it's not appropriate for us to put the (topic, topicMetadata) into the cache. Of course we can put in the "tailed" topicMetatData, which only contains partitionMetadata whose error code is NoError, but this is kind of hacky. Also it's not bad to just throw exception to the client, because this function is called on demand — when the client wants to send messages to a topic. If some topic is erroneous, the send need to be failed.
        Hide
        Yang Ye added a comment -


        Handling the error code in the topicMetadataResponse

        Show
        Yang Ye added a comment - Handling the error code in the topicMetadataResponse
        Yang Ye made changes -
        Attachment kafka_369_v5.diff [ 12539718 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. A few more comments:

        50. There are a bunch of places where zk.connect is still used in ProducerConfig, could you fix them?
        ProducerPerformance, ReplayLogProducer, ConsoleProducer, TestEndToEndLatency, AsyncProducerTest, KafkaOutputFormat.java, Producer.java

        51. Single host multi broker system test seems to hang:
        2012-08-08 09:08:22 =======================================
        2012-08-08 09:08:22 Iteration 1 of 3
        2012-08-08 09:08:22 =======================================

        2012-08-08 09:08:22 looking up leader
        2012-08-08 09:08:22 found the log line: [2012-08-08 09:08:17,434] INFO Controller 1, at initializing leaders for new partitions, the leaderAndISR request sent to broker 3 is LeaderAndISRRequest(1,,false,1000,Map((mytest,0) ->

        { "ISR": "1,2,3","leader": "1","leaderEpoch": "0" }

        )) (kafka.server.KafkaController)
        bin/run-test.sh: line 151: return: [2012-08-08: numeric argument required
        2012-08-08 09:08:22 current leader's broker id : 255
        2012-08-08 09:08:22 stopping server: 255
        2012-08-08 09:08:22 sleeping for 10s

        52. remove unused imports

        53. ProducerPool.updateProducer: use a HashSet instead of ListBuffer for storing newBrokers.

        54. The following unit test seems to fail occasionally for me. Is that related to this patch?
        [error] Test Failed: testMultiProduceResend(kafka.integration.LazyInitProducerTest)
        kafka.common.KafkaException: fetching broker partition metadata for topics [List(test1)] from broker [ArrayBuffer(id:0,creatorId:192.168.1.111-1344443455425,host:192.168.1.111,port:61693)] failed
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:109)
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:140)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:101)
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:100)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:65)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:48)
        at kafka.producer.Producer.send(Producer.scala:65)
        at kafka.integration.LazyInitProducerTest.testMultiProduceResend(LazyInitProducerTest.scala:163)

        55. Could you change the comment for broker.list in ProducerConfig to the following?

        /** This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).

        • The socket connections for sending the actual data will be established based on the broker information
        • returned in the metadata. The format is host1:por1,host2:port2, and the list can be a subset of brokers or
        • a VIP pointing to a subset of brokers.
          */
        Show
        Jun Rao added a comment - Thanks for patch v5. A few more comments: 50. There are a bunch of places where zk.connect is still used in ProducerConfig, could you fix them? ProducerPerformance, ReplayLogProducer, ConsoleProducer, TestEndToEndLatency, AsyncProducerTest, KafkaOutputFormat.java, Producer.java 51. Single host multi broker system test seems to hang: 2012-08-08 09:08:22 ======================================= 2012-08-08 09:08:22 Iteration 1 of 3 2012-08-08 09:08:22 ======================================= 2012-08-08 09:08:22 looking up leader 2012-08-08 09:08:22 found the log line: [2012-08-08 09:08:17,434] INFO Controller 1, at initializing leaders for new partitions, the leaderAndISR request sent to broker 3 is LeaderAndISRRequest(1,,false,1000,Map((mytest,0) -> { "ISR": "1,2,3","leader": "1","leaderEpoch": "0" } )) (kafka.server.KafkaController) bin/run-test.sh: line 151: return: [2012-08-08: numeric argument required 2012-08-08 09:08:22 current leader's broker id : 255 2012-08-08 09:08:22 stopping server: 255 2012-08-08 09:08:22 sleeping for 10s 52. remove unused imports 53. ProducerPool.updateProducer: use a HashSet instead of ListBuffer for storing newBrokers. 54. The following unit test seems to fail occasionally for me. Is that related to this patch? [error] Test Failed: testMultiProduceResend(kafka.integration.LazyInitProducerTest) kafka.common.KafkaException: fetching broker partition metadata for topics [List(test1)] from broker [ArrayBuffer(id:0,creatorId:192.168.1.111-1344443455425,host:192.168.1.111,port:61693)] failed at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:109) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:140) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:101) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:100) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:65) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:48) at kafka.producer.Producer.send(Producer.scala:65) at kafka.integration.LazyInitProducerTest.testMultiProduceResend(LazyInitProducerTest.scala:163) 55. Could you change the comment for broker.list in ProducerConfig to the following? /** This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:por1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. */
        Hide
        Yang Ye added a comment -


        Basically act according to the last review comments. The system test seems to hang, I'll check it, maybe file another Jira

        Show
        Yang Ye added a comment - Basically act according to the last review comments. The system test seems to hang, I'll check it, maybe file another Jira
        Yang Ye made changes -
        Attachment kafka_369_v6.diff [ 12539953 ]
        Hide
        Yang Ye added a comment -

        Didn't check this in last time due to Jira failure

        Show
        Yang Ye added a comment - Didn't check this in last time due to Jira failure
        Yang Ye made changes -
        Attachment kafka_369_v7.diff [ 12540822 ]
        Hide
        Yang Ye added a comment -

        Didn't check this in last time due to Jira failure

        Show
        Yang Ye added a comment - Didn't check this in last time due to Jira failure
        Yang Ye made changes -
        Attachment kafka_369_v8.diff [ 12540823 ]
        Hide
        Yang Ye added a comment -

        Major changes from v8:

        Change the system test by specifying the broker list of all three brokers.

        Show
        Yang Ye added a comment - Major changes from v8: Change the system test by specifying the broker list of all three brokers.
        Yang Ye made changes -
        Attachment kafka_369_v9.diff [ 12540824 ]
        Hide
        Yang Ye added a comment -

        ran system test passes after v9 patch for quite a few times, all pass

        Show
        Yang Ye added a comment - ran system test passes after v9 patch for quite a few times, all pass
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Committed v9 to 0.8.

        Show
        Jun Rao added a comment - Thanks for the patch. Committed v9 to 0.8.
        Jun Rao made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 0.7.2 [ 12322475 ]
        Resolution Fixed [ 1 ]
        Jun Rao made changes -
        Fix Version/s 0.8 [ 12317244 ]
        Fix Version/s 0.7.2 [ 12322475 ]
        Hide
        Otis Gospodnetic added a comment -

        I'm completely late on this and don't have the full context, so my apologies if this is a naive question, but why would you want to have addresses of brokers in producer's configs and not have producer get that from Zookeeper? I thought using ZK for cluster-wide config sharing was a good thing, but there must be some reason why you chose not to do that here. Can anyone shed some light on this.... 14 months after the commit?

        Show
        Otis Gospodnetic added a comment - I'm completely late on this and don't have the full context, so my apologies if this is a naive question, but why would you want to have addresses of brokers in producer's configs and not have producer get that from Zookeeper? I thought using ZK for cluster-wide config sharing was a good thing, but there must be some reason why you chose not to do that here. Can anyone shed some light on this.... 14 months after the commit?
        Hide
        Jay Kreps added a comment -

        I guess all distributed systems have a problem of bootstrapping knowledge about the cluster.

        In 0.7 the client had a list of hard-coded zk brokers which you connect to, and from this you can discover the state of the kafka cluster.

        There are a number of problems with embedding zookeeper in clients at large scale:
        1. Makes language support hard
        2. GC in clients causes havoc
        3. Massive zookeeper load

        Instead we use the brokers for cluster discovery. As before you need a couple hardcoded urls to bootstrap metadata from. These can be any three random kafka brokers (or else use DNS or a vip) this is by no means intended to be a complete list of your brokers and the producer is not limited in any way to producing to the brokers it uses to get cluster metadata. These hosts are used only to find out the state of the cluster the first time. This isn't worse then the zookeeper situation (you still hardcode a couple urls) but gets rid of all the other zk issues.

        Show
        Jay Kreps added a comment - I guess all distributed systems have a problem of bootstrapping knowledge about the cluster. In 0.7 the client had a list of hard-coded zk brokers which you connect to, and from this you can discover the state of the kafka cluster. There are a number of problems with embedding zookeeper in clients at large scale: 1. Makes language support hard 2. GC in clients causes havoc 3. Massive zookeeper load Instead we use the brokers for cluster discovery. As before you need a couple hardcoded urls to bootstrap metadata from. These can be any three random kafka brokers (or else use DNS or a vip) this is by no means intended to be a complete list of your brokers and the producer is not limited in any way to producing to the brokers it uses to get cluster metadata. These hosts are used only to find out the state of the cluster the first time. This isn't worse then the zookeeper situation (you still hardcode a couple urls) but gets rid of all the other zk issues.
        Hide
        Neha Narkhede added a comment -

        In addition to the above, the cluster metadata approach is more scalable since you get all metadata for several topics in one RPC roundtrip vs reading several paths per topic from zookeeper.

        Show
        Neha Narkhede added a comment - In addition to the above, the cluster metadata approach is more scalable since you get all metadata for several topics in one RPC roundtrip vs reading several paths per topic from zookeeper.
        Hide
        Otis Gospodnetic added a comment -

        Interesting. Sounds like brokers act as "portals into the whole Kafka cluster", so to speak. I wasn't aware of these ZK issues. Just for my edification - are those issues specific to the ZK client that was used or how it was used in Kafka producer?

        Show
        Otis Gospodnetic added a comment - Interesting. Sounds like brokers act as "portals into the whole Kafka cluster", so to speak. I wasn't aware of these ZK issues. Just for my edification - are those issues specific to the ZK client that was used or how it was used in Kafka producer?
        Hide
        Neha Narkhede added a comment -

        Otis Gospodnetic Those zk issues exist with any zookeeper client.

        Show
        Neha Narkhede added a comment - Otis Gospodnetic Those zk issues exist with any zookeeper client.
        Hide
        Otis Gospodnetic added a comment -

        Thanks Neha Narkhede. Wouldn't all apps that talk to ZK have issues then? Hadoop, HBase, SolrCloud, etc. etc. Weird that I've never seen these issues reported anywhere else.... (honest comment, not trying to say anything bad here)

        Show
        Otis Gospodnetic added a comment - Thanks Neha Narkhede . Wouldn't all apps that talk to ZK have issues then? Hadoop, HBase, SolrCloud, etc. etc. Weird that I've never seen these issues reported anywhere else.... (honest comment, not trying to say anything bad here)

          People

          • Assignee:
            Yang Ye
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 252h
              252h
              Remaining:
              Remaining Estimate - 252h
              252h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development