Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-227

Upgrade KafkaSystemProducer to new API

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.9.0
    • Component/s: kafka
    • Labels:

      Description

      Kafka has upgraded their producer API in 0.8.1/0.8.2 to have a new Java-based API. It's much improved both in performance, and features. We should upgrade KafkaSystemProducer to use it.

      Initially, we should keep the old producer as well, and add a config to switch back and forth. Once we've run it for a while, and things look stable, we can deprecate and delete the old producer.

      1. rb29899.patch
        69 kB
        Navina Ramesh
      2. rb29899-1.patch
        68 kB
        Navina Ramesh

        Issue Links

          Activity

          Hide
          nehanarkhede Neha Narkhede added a comment - - edited

          I think we can upgrade as early as 0.8.1.1. I can take a stab at this

          Show
          nehanarkhede Neha Narkhede added a comment - - edited I think we can upgrade as early as 0.8.1.1. I can take a stab at this
          Hide
          criccomini Chris Riccomini added a comment -

          Great! I've added you as a contributor, and assigned you the JIRA.

          Show
          criccomini Chris Riccomini added a comment - Great! I've added you as a contributor, and assigned you the JIRA.
          Hide
          navina Navina Ramesh added a comment -

          Initial patch using the new Kafka producer API is available here - https://reviews.apache.org/r/29899/diff/#

          Unit tests need to be rewritten. This is WIP

          Show
          navina Navina Ramesh added a comment - Initial patch using the new Kafka producer API is available here - https://reviews.apache.org/r/29899/diff/# Unit tests need to be rewritten. This is WIP
          Hide
          criccomini Chris Riccomini added a comment -

          Navina Ramesh, looks good. Please:

          1. Address comments from Guozhang Wang and myself on the ticket.
          2. Attach patch to JIRA.
          3. Open a follow on SAMZA JIRA to upgrade to the final Kafka 0.8.2 release, when it's available.
          4. Open a KAFKA JIRA to track missing functionality in MockProducer.
          5. Open SAMZA JIRAs to track KAFKA-1794, KAFKA-1865, and the KAFKA MockProducer JIRA (4, above).

          I think we should commit this, but block on the next Samza release until Kafka 0.8.2 has been officially released. This will involve some code changes, since the RC we're using doesn't have serialization in the Kafka producer API, but 0.8.2 final will.

          Show
          criccomini Chris Riccomini added a comment - Navina Ramesh , looks good. Please: Address comments from Guozhang Wang and myself on the ticket. Attach patch to JIRA. Open a follow on SAMZA JIRA to upgrade to the final Kafka 0.8.2 release, when it's available. Open a KAFKA JIRA to track missing functionality in MockProducer. Open SAMZA JIRAs to track KAFKA-1794 , KAFKA-1865 , and the KAFKA MockProducer JIRA (4, above). I think we should commit this, but block on the next Samza release until Kafka 0.8.2 has been officially released. This will involve some code changes, since the RC we're using doesn't have serialization in the Kafka producer API, but 0.8.2 final will.
          Hide
          navina Navina Ramesh added a comment - - edited

          Attached code changes for Samza using the new Kafka Producer API

          Show
          navina Navina Ramesh added a comment - - edited Attached code changes for Samza using the new Kafka Producer API
          Hide
          criccomini Chris Riccomini added a comment -

          When running the latest RB with bin/check-all.sh, I see quite a few of these:

          38986 [kafka-producer-network-thread] WARN org.apache.kafka.common.network.Selector - Error in I/O with localhost/127.0.0.1
          java.net.ConnectException: Connection refused
          	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
          	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
          	at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
          	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
          	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
          	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
          	at java.lang.Thread.run(Thread.java:745)
          
          Show
          criccomini Chris Riccomini added a comment - When running the latest RB with bin/check-all.sh , I see quite a few of these: 38986 [kafka-producer-network-thread] WARN org.apache.kafka.common.network.Selector - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.Selector.poll(Selector.java:232) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:745)
          Hide
          criccomini Chris Riccomini added a comment -

          Seems to come from samza-kafka/build/reports/tests/classes/org.apache.samza.system.kafka.TestKafkaSystemProducer.html

          Show
          criccomini Chris Riccomini added a comment - Seems to come from samza-kafka/build/reports/tests/classes/org.apache.samza.system.kafka.TestKafkaSystemProducer.html
          Hide
          criccomini Chris Riccomini added a comment -

          Also, "[ant:javac] Note: /Users/criccomi/Code/samza/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java uses unchecked or unsafe operations."

          Show
          criccomini Chris Riccomini added a comment - Also, " [ant:javac] Note: /Users/criccomi/Code/samza/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java uses unchecked or unsafe operations."
          Hide
          navina Navina Ramesh added a comment -

          Looks like the producer thread was not closed in some of the tests ( in TestKafkaSystemAdmin and TestStatefulTask ). This caused the I/O thread to be orphaned and spew the logs with warnings. Also, fixed the unchecked operation in MockKafkaProducer.java

          Update the diff in https://reviews.apache.org/r/29899/diff/# and attached the new patch

          Show
          navina Navina Ramesh added a comment - Looks like the producer thread was not closed in some of the tests ( in TestKafkaSystemAdmin and TestStatefulTask ). This caused the I/O thread to be orphaned and spew the logs with warnings. Also, fixed the unchecked operation in MockKafkaProducer.java Update the diff in https://reviews.apache.org/r/29899/diff/# and attached the new patch
          Hide
          criccomini Chris Riccomini added a comment -

          +1 merged and committed. Ran all tests locally. Logs are clean. Integration test is clean as well.

          Show
          criccomini Chris Riccomini added a comment - +1 merged and committed. Ran all tests locally. Logs are clean. Integration test is clean as well.
          Hide
          jkreps Jay Kreps added a comment -

          It would be good to benchmark Samza vs the raw Kafka producer using the test harness in 0.8.2 (example commands are linked here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines).

          The primary important optimization in the producer is batching of messages. For small messages this is the difference between about 20k messages/sec and 1M messages sec.

          This patch is likely good in an all-out performance test. However there may be some issues and it may prove to add more load to the Kafka cluster than the old implementation.

          The reason is because I think the old implementation did it's own batching and hence was pretty agressive about not sending messages until there were multiple things to send.

          But the new implementation does no batching and just relies on the producer to batch. The producer will batch whenever it can't send out requests fast enough. In practice this is kind of a race between the I/O thread that is sending requests and the Samza thread that is generating messages. As the samza thread generates more and more load the I/O thread will fall behind, at which point batching kicks in and makes each send a lot more efficient.

          However an issue here is that a slightly slower samza job might generate a ton of messages, just not enough to overwhelm the I/O thread, and hence each will be sent immediately as it's own request. There is nothing inherently wrong with this, and then only way to get 0 latency, but it will definitely add Kafka load.

          The setting that controls this is linger.ms. Linger.ms=5 says, look, just because you got one message, don't send it right away, wait up until 5 ms for more messages to be written. Even a linger.ms=1 will dramatically drop the request count at only a 1 ms latency hit.

          The problem is that there is no way with linger.ms > 0 to immediately flush the client, you always wait that long. Hence if you set linger.ms=1, then when you want to commit you will likely wait a ms while things sit in the producer queue, which is silly.

          To fix this we need to have a flush() call in the producer that immediately makes any queued message ready for sending (irrespective of linger.ms) and blocks on completion of all these requests.

          If we do this we can set the linger.ms to something greater than 0 and all will be great.

          Show
          jkreps Jay Kreps added a comment - It would be good to benchmark Samza vs the raw Kafka producer using the test harness in 0.8.2 (example commands are linked here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines ). The primary important optimization in the producer is batching of messages. For small messages this is the difference between about 20k messages/sec and 1M messages sec. This patch is likely good in an all-out performance test. However there may be some issues and it may prove to add more load to the Kafka cluster than the old implementation. The reason is because I think the old implementation did it's own batching and hence was pretty agressive about not sending messages until there were multiple things to send. But the new implementation does no batching and just relies on the producer to batch. The producer will batch whenever it can't send out requests fast enough. In practice this is kind of a race between the I/O thread that is sending requests and the Samza thread that is generating messages. As the samza thread generates more and more load the I/O thread will fall behind, at which point batching kicks in and makes each send a lot more efficient. However an issue here is that a slightly slower samza job might generate a ton of messages, just not enough to overwhelm the I/O thread, and hence each will be sent immediately as it's own request. There is nothing inherently wrong with this, and then only way to get 0 latency, but it will definitely add Kafka load. The setting that controls this is linger.ms. Linger.ms=5 says, look, just because you got one message, don't send it right away, wait up until 5 ms for more messages to be written. Even a linger.ms=1 will dramatically drop the request count at only a 1 ms latency hit. The problem is that there is no way with linger.ms > 0 to immediately flush the client, you always wait that long. Hence if you set linger.ms=1, then when you want to commit you will likely wait a ms while things sit in the producer queue, which is silly. To fix this we need to have a flush() call in the producer that immediately makes any queued message ready for sending (irrespective of linger.ms) and blocks on completion of all these requests. If we do this we can set the linger.ms to something greater than 0 and all will be great.
          Hide
          jkreps Jay Kreps added a comment -

          I posted a patch that implements flush in the producer on KAFKA-1865. It might be worth setting linger.ms=1 even before that is used to avoid issuing too many requests even though that will add a ms of useless latency to commit.

          Show
          jkreps Jay Kreps added a comment - I posted a patch that implements flush in the producer on KAFKA-1865 . It might be worth setting linger.ms=1 even before that is used to avoid issuing too many requests even though that will add a ms of useless latency to commit.
          Hide
          criccomini Chris Riccomini added a comment -

          Fiddling with linger.ms as part of SAMZA-6.

          Show
          criccomini Chris Riccomini added a comment - Fiddling with linger.ms as part of SAMZA-6 .
          Hide
          criccomini Chris Riccomini added a comment -
          Show
          criccomini Chris Riccomini added a comment - Jay Kreps , see SAMZA-548 .

            People

            • Assignee:
              navina Navina Ramesh
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development