Kafka
  1. Kafka
  2. KAFKA-253

Refactor the async producer to have only one queue instead of one queue per broker in a Kafka cluster

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Today, the async producer is associated with a particular broker instance, just like the SyncProducer. The Producer maintains a producer pool of sync/async producers, one per broker. Since the producer pool creates one async producer per broker, we have multiple producer queues for one Producer instance.

      With replication, a topic partition will be logical. This requires refactoring the AsyncProducer to be broker agnostic. As a side effect of this refactoring, we should also ensure that we have only one queue per Producer instance.

      1. kafka-253_v5.patch
        176 kB
        Jun Rao
      2. kafka-253_v4.patch
        177 kB
        Jun Rao
      3. kafka-253_v3.patch
        175 kB
        Jun Rao
      4. kafka-253_v2.patch
        175 kB
        Jun Rao
      5. kafka-253.patch
        166 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          No. Since most of the development happens on 0.8, we are only fixing blocker issues in 0.7.

          Show
          Jun Rao added a comment - No. Since most of the development happens on 0.8, we are only fixing blocker issues in 0.7.
          Hide
          Jason Rosenberg added a comment -

          Was this change merged to the 0.7 branch?

          Show
          Jason Rosenberg added a comment - Was this change merged to the 0.7 branch?
          Hide
          Jun Rao added a comment -

          Thanks for the review. Committed to 0.8 branch. Will close the jira once it's committed to 0.7 branch.

          Show
          Jun Rao added a comment - Thanks for the review. Committed to 0.8 branch. Will close the jira once it's committed to 0.7 branch.
          Hide
          Neha Narkhede added a comment -

          Jun, could you please merge v5 to the 0.8 branch, so that work on KAFKA-239 is unblocked ?

          Show
          Neha Narkhede added a comment - Jun, could you please merge v5 to the 0.8 branch, so that work on KAFKA-239 is unblocked ?
          Hide
          Neha Narkhede added a comment -

          ProducerMethodsTest needs to be removed. Other than that, v5 looks good.

          Show
          Neha Narkhede added a comment - ProducerMethodsTest needs to be removed. Other than that, v5 looks good.
          Hide
          Jun Rao added a comment -

          Attach patch v5. Minor changes in DefaultEventHandler to correctly handle retries recursively.

          Show
          Jun Rao added a comment - Attach patch v5. Minor changes in DefaultEventHandler to correctly handle retries recursively.
          Hide
          Jun Rao added a comment -

          Attach patch v4. Made the following changes:
          1. Renamed AsyncProducerConfigShared to AsyncProducerConfig.
          2. Added producer retry backoff time.
          3. Only retry on the top level of send, i.e., no recursive retries (this ran the risk that send may never finish).

          Show
          Jun Rao added a comment - Attach patch v4. Made the following changes: 1. Renamed AsyncProducerConfigShared to AsyncProducerConfig. 2. Added producer retry backoff time. 3. Only retry on the top level of send, i.e., no recursive retries (this ran the risk that send may never finish).
          Hide
          Jun Rao added a comment -

          Attached v3 patch by rebasing to trunk.

          Show
          Jun Rao added a comment - Attached v3 patch by rebasing to trunk.
          Hide
          Jun Rao added a comment -

          Uploaded patch v2.
          1. removed
          2. made it case class
          3.1 changed.
          3.2 removed #events stat at AsyncProducerStats; added #events per topic at Producer level
          4. improved by doing serialization before partitioning and collating
          5.1 made the test mock and move to AsyncProducerTest
          5.2 made testJavaProducer mock and move to AsyncProducerTest; I don't feel testPartitionedSendToNewBrokerInExistingTopic adds more coverage since there are separate tests to test partitioner and NewBrokerInExistingTopic. Since partitioning and broker discovery are independent, existing test cases should be enough. Ditto for testPartitionedSendToNewTopic.
          5.3 will open another jira to track this.

          Show
          Jun Rao added a comment - Uploaded patch v2. 1. removed 2. made it case class 3.1 changed. 3.2 removed #events stat at AsyncProducerStats; added #events per topic at Producer level 4. improved by doing serialization before partitioning and collating 5.1 made the test mock and move to AsyncProducerTest 5.2 made testJavaProducer mock and move to AsyncProducerTest; I don't feel testPartitionedSendToNewBrokerInExistingTopic adds more coverage since there are separate tests to test partitioner and NewBrokerInExistingTopic. Since partitioning and broker discovery are independent, existing test cases should be enough. Ditto for testPartitionedSendToNewTopic. 5.3 will open another jira to track this.
          Hide
          Neha Narkhede added a comment - - edited

          That is a pretty useful refactoring patch ! I have a few questions -

          1. ProducerMethodsTest is empty

          2. ProducerData can be a case class instead. You will get toString for free, in addition to equals() which is useful for unit testing. Al though, I'm not sure if there are any obvious downsides of using case classes.

          3. Producer:

          3.1 How about throwing an InvalidConfigException when both zk.connect and broker.list are specified ? It is probably a misconfiguration that should be corrected. We have gotten bitten by this several times at LinkedIn. The user could easily miss the warning we issue saying that we will just use zk.connect.

          3.2 AsyncProducerStats: Do we want to record an event as sent even before it has successfully entered the producer queue ? I'm thinking that one stat counts the number of events successfully entering the producer queue and another stat counts the number of events that got dropped due to a full queue. But maybe, the user is interested in per topic level event counts and those should be available for the sync producer as well. So maybe we can get rid of numEvents from AsyncProducerStats for now ?

          4. DefaultEventHandler
          In the producer send retry logic inside handle() API, what happens if a broker fails after partitionAndCollate() and right before send() ? Simply retrying send to the same broker is not useful, we should probably repartition and retry the send, which will at least attempt to select another live broker.

          5. ProducerTest

          5.1 testBrokerListAndAsync: It is better to write as many true unit tests as possible. I don't see an advantage in actually instantiating a zookeeper consumer when EasyMock can be used to verify the producer functionality. For example, it seems that here, you want to test the async producer with the broker.list config. Simply verifying the send() API of the SyncProducer should suffice. Unit tests can assume that data doesn't get corrupted over the socket.

          5.2 testJavaProducer: Same as above. Use mocks wherever possible.
          testPartitionedSendToNewBrokerInExistingTopic, testPartitionedSendToNewBrokerInExistingTopic: Any reason for deleting these tests ? They test valid code paths for the ZK producer and we added these to make sure new topics and new brokers in existing topics are handled correctly with the ZK producer.

          5.3 Rest of the tests: It will be a good idea to even cleanup this test suite to avoid bringing up a server and 2 SimpleConsumers. We don't know of a good way to mock out the zk part of it, so we can choose to just bring up a local zk instance, and to indicate existence of a new broker, we can just call registerBrokerInZK instead of actually bringing up a broker.

          Though, if you want, this can go in a separate jira.

          Show
          Neha Narkhede added a comment - - edited That is a pretty useful refactoring patch ! I have a few questions - 1. ProducerMethodsTest is empty 2. ProducerData can be a case class instead. You will get toString for free, in addition to equals() which is useful for unit testing. Al though, I'm not sure if there are any obvious downsides of using case classes. 3. Producer: 3.1 How about throwing an InvalidConfigException when both zk.connect and broker.list are specified ? It is probably a misconfiguration that should be corrected. We have gotten bitten by this several times at LinkedIn. The user could easily miss the warning we issue saying that we will just use zk.connect. 3.2 AsyncProducerStats: Do we want to record an event as sent even before it has successfully entered the producer queue ? I'm thinking that one stat counts the number of events successfully entering the producer queue and another stat counts the number of events that got dropped due to a full queue. But maybe, the user is interested in per topic level event counts and those should be available for the sync producer as well. So maybe we can get rid of numEvents from AsyncProducerStats for now ? 4. DefaultEventHandler In the producer send retry logic inside handle() API, what happens if a broker fails after partitionAndCollate() and right before send() ? Simply retrying send to the same broker is not useful, we should probably repartition and retry the send, which will at least attempt to select another live broker. 5. ProducerTest 5.1 testBrokerListAndAsync: It is better to write as many true unit tests as possible. I don't see an advantage in actually instantiating a zookeeper consumer when EasyMock can be used to verify the producer functionality. For example, it seems that here, you want to test the async producer with the broker.list config. Simply verifying the send() API of the SyncProducer should suffice. Unit tests can assume that data doesn't get corrupted over the socket. 5.2 testJavaProducer: Same as above. Use mocks wherever possible. testPartitionedSendToNewBrokerInExistingTopic, testPartitionedSendToNewBrokerInExistingTopic: Any reason for deleting these tests ? They test valid code paths for the ZK producer and we added these to make sure new topics and new brokers in existing topics are handled correctly with the ZK producer. 5.3 Rest of the tests: It will be a good idea to even cleanup this test suite to avoid bringing up a server and 2 SimpleConsumers. We don't know of a good way to mock out the zk part of it, so we can choose to just bring up a local zk instance, and to indicate existence of a new broker, we can just call registerBrokerInZK instead of actually bringing up a broker. Though, if you want, this can go in a separate jira.
          Hide
          Jun Rao added a comment -

          The patch can go into 0.7 too.

          Show
          Jun Rao added a comment - The patch can go into 0.7 too.
          Hide
          Jun Rao added a comment -

          Attach a patch.

          The current Producer logic has a number of problems:
          1. There are 2 pluggable handlers EventHandler and CallbackHandler. This makes the logic a bit complicated. Furthermore, we need to maintain a scala and a java version and convert btw the two.
          2. The logics for doing collating, serialization, compression, etc are in the pluggable EventHandler, which means if you plug in your own handler, a lot of the code in the DefaultEventHandler has to be duplicated.
          3. The partition assignment is done at enqueue time. It's better to do this at the dequeue time to minimize the impact of failed brokers.
          4. The event handler is only applicable to async producer. If we want to add auditing in the handler, it is useful for both sync and async producer.

          The new design made the following changes:
          a. Get rid of both pluggable handlers.
          b. DefaultEventHandler now does partitioning, serialization, collating, compression, etc and is used by both sync and async producer.
          c. The data flow now is simpler:
          c1. async: Producer => queue => ProducerSendThread => DefaultEventHandler
          c2. sync: Producer => DefaultEventHandler
          d. Keep partitioner and encoder pluggable.

          Some detailed changes:
          e. BrokerPartition logic is moved to DefaultEventHandler
          f. ProducerPool is simplified to be just a cache for sync producers.
          g. Remove most tests for javaapi.producer and only keep one test in ProducerTest to verify the wrapper code.
          h. Move tests in ProducerTest that don't need the full stack to AsyncProducerTest and remove some tests that don't seem to improve code coverage.

          As for application specific logic, one option is to do that outside of Producer. We can track that as part of kafka-260.

          Show
          Jun Rao added a comment - Attach a patch. The current Producer logic has a number of problems: 1. There are 2 pluggable handlers EventHandler and CallbackHandler. This makes the logic a bit complicated. Furthermore, we need to maintain a scala and a java version and convert btw the two. 2. The logics for doing collating, serialization, compression, etc are in the pluggable EventHandler, which means if you plug in your own handler, a lot of the code in the DefaultEventHandler has to be duplicated. 3. The partition assignment is done at enqueue time. It's better to do this at the dequeue time to minimize the impact of failed brokers. 4. The event handler is only applicable to async producer. If we want to add auditing in the handler, it is useful for both sync and async producer. The new design made the following changes: a. Get rid of both pluggable handlers. b. DefaultEventHandler now does partitioning, serialization, collating, compression, etc and is used by both sync and async producer. c. The data flow now is simpler: c1. async: Producer => queue => ProducerSendThread => DefaultEventHandler c2. sync: Producer => DefaultEventHandler d. Keep partitioner and encoder pluggable. Some detailed changes: e. BrokerPartition logic is moved to DefaultEventHandler f. ProducerPool is simplified to be just a cache for sync producers. g. Remove most tests for javaapi.producer and only keep one test in ProducerTest to verify the wrapper code. h. Move tests in ProducerTest that don't need the full stack to AsyncProducerTest and remove some tests that don't seem to improve code coverage. As for application specific logic, one option is to do that outside of Producer. We can track that as part of kafka-260.
          Hide
          Neha Narkhede added a comment -

          Here are some details on a potential refactoring solution -

          1. Get rid of the AsyncProducer
          2. In the send(ProducerData) API of the Producer, simply add new ProducerData to an internal queue
          3. In the DefaultEventHandler,

          3.1 it should have access to the Partitioner, custom or default
          3.2 Add a partition API that will tag each ProducerData object with a broker id-partition id (kafka.cluster.Partition)
          3.3 the collate API will group events per (topic, Partition) pair
          3.4 the serialize API will convert the data to a ByteBufferMessageSet => (topic, Partition, BBME)
          3.5 The send() API will use a SyncProducer to send each (topic, Partition, BBME) triple to the Partition.broker_id.

          The above will ensure that there is a single queue in the Producer and that the Producer is broker agnostic. This makes it much easier to resolve KAFKA-239, since that needs to make partitions logical

          Show
          Neha Narkhede added a comment - Here are some details on a potential refactoring solution - 1. Get rid of the AsyncProducer 2. In the send(ProducerData) API of the Producer, simply add new ProducerData to an internal queue 3. In the DefaultEventHandler, 3.1 it should have access to the Partitioner, custom or default 3.2 Add a partition API that will tag each ProducerData object with a broker id-partition id (kafka.cluster.Partition) 3.3 the collate API will group events per (topic, Partition) pair 3.4 the serialize API will convert the data to a ByteBufferMessageSet => (topic, Partition, BBME) 3.5 The send() API will use a SyncProducer to send each (topic, Partition, BBME) triple to the Partition.broker_id. The above will ensure that there is a single queue in the Producer and that the Producer is broker agnostic. This makes it much easier to resolve KAFKA-239 , since that needs to make partitions logical

            People

            • Assignee:
              Jun Rao
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 168h
                168h
                Remaining:
                Remaining Estimate - 168h
                168h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development