Kafka
  1. Kafka
  2. KAFKA-736

Add an option to the 0.8 producer to mimic 0.7 producer behavior

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: producer

      Description

      I profiled a producer throughput benchmark between a producer and a remote broker. It turns out that the background send threads spends ~97% of its time waiting to read the acknowledgement from the broker.

      I propose we change the current behavior of request.required.acks=0 to mean no acknowledgement from the broker. This will mimic the 0.7 producer behavior and will enable tuning the producer for very high throughput.

      1. kafka-736-v4.patch
        48 kB
        Neha Narkhede
      2. kafka-736-v3-producer-latency-20threads-acks1.out
        2 kB
        Neha Narkhede
      3. kafka-736-draft-producer-latency-20threads-acks1.out
        2 kB
        Neha Narkhede
      4. kafka-736-v3.patch
        72 kB
        Neha Narkhede
      5. check-message-ordering.py
        2 kB
        Neha Narkhede
      6. kafka-736-v2.patch
        47 kB
        Neha Narkhede
      7. kafka-736-v1.patch
        32 kB
        Neha Narkhede
      8. kafka-736-draft.patch
        38 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          This is a draft patch that changes the behavior of the required.request.acks=0 to not wait on a response from the broker. Since the producer can send batched requests without waiting for a network roundtrip, the throughput of the producer is very high and matches that of the 0.7 producer. I haven't run full fledged performance tests to get a detailed report but I've seen a single producer's throughput increase from 11 MB/s to 45 MB/s with the same config. Initially, I thought that without any changes to the socket server, it will not read more than 1 request from a producer on the same connection. That's because after reading a request completely, we set the interest ops to READ only after the response is written on the socket. Now, since we basically got rid of the response, my thinking was that the producer will keep writing onto the socket and its socket buffer will eventually fill up since the broker is not reading from that socket anymore. But this is not how the socket server behaves, which works in the favor of this feature. When the socket server accepts a connection, it registers the READ interest for that channel. Now even after we read a request completely, if there are more requests waiting on that socket and since the interest ops on that socket has not been changed, the server continues to select that key for READ operation.

          But, the current socket server design will reorder pipelined requests. All the requests sent to the broker end up in a common request queue. Let's say there are two requests R1 followed by R2 from the same socket in the request queue. Two different io threads can handle those requests and the response for R2 gets written before R1 on the socket. For ordering to work correctly, we need to maintain stickiness between the requests from one key and the corresponding io/request handler thread. One way of solving this problem is to replace the common request queue with a per io thread request queue. The network thread maps a key to a io thread when it accepts a new connection and maintains this mapping until the connection is closed and the key is invalid. One of the problems that this design has is that if one client sends requests at a very high rate, the corresponding io thread's request queue will fill up and the respective network thread will block. But thinking about this, the current one-request-queue approach suffers from the same drawback.

          This draft patch is meant for design review, I would like to save the following improvements for the v1 patch depending on which way we decide to go -
          1. Add more unit tests for required.num.acks=0
          2. Cache the key->io thread mapping instead of recomputing on each request

          Show
          Neha Narkhede added a comment - This is a draft patch that changes the behavior of the required.request.acks=0 to not wait on a response from the broker. Since the producer can send batched requests without waiting for a network roundtrip, the throughput of the producer is very high and matches that of the 0.7 producer. I haven't run full fledged performance tests to get a detailed report but I've seen a single producer's throughput increase from 11 MB/s to 45 MB/s with the same config. Initially, I thought that without any changes to the socket server, it will not read more than 1 request from a producer on the same connection. That's because after reading a request completely, we set the interest ops to READ only after the response is written on the socket. Now, since we basically got rid of the response, my thinking was that the producer will keep writing onto the socket and its socket buffer will eventually fill up since the broker is not reading from that socket anymore. But this is not how the socket server behaves, which works in the favor of this feature. When the socket server accepts a connection, it registers the READ interest for that channel. Now even after we read a request completely, if there are more requests waiting on that socket and since the interest ops on that socket has not been changed, the server continues to select that key for READ operation. But, the current socket server design will reorder pipelined requests. All the requests sent to the broker end up in a common request queue. Let's say there are two requests R1 followed by R2 from the same socket in the request queue. Two different io threads can handle those requests and the response for R2 gets written before R1 on the socket. For ordering to work correctly, we need to maintain stickiness between the requests from one key and the corresponding io/request handler thread. One way of solving this problem is to replace the common request queue with a per io thread request queue. The network thread maps a key to a io thread when it accepts a new connection and maintains this mapping until the connection is closed and the key is invalid. One of the problems that this design has is that if one client sends requests at a very high rate, the corresponding io thread's request queue will fill up and the respective network thread will block. But thinking about this, the current one-request-queue approach suffers from the same drawback. This draft patch is meant for design review, I would like to save the following improvements for the v1 patch depending on which way we decide to go - 1. Add more unit tests for required.num.acks=0 2. Cache the key->io thread mapping instead of recomputing on each request
          Hide
          Neha Narkhede added a comment -

          Another approach is to reset the interest ops to not select READ or WRITE after a request is completely read. This is a smaller change to the socket server, but will have an impact on performance. Once an io thread has finished processing a request from a key, they will not have the next pipelined request available in the request queue. My guess is that the producer throughput will see larger dips in the throughput, which happens when the socket buffer fills up and the producer has to wait for the server to dequeue more requests from the socket.

          Show
          Neha Narkhede added a comment - Another approach is to reset the interest ops to not select READ or WRITE after a request is completely read. This is a smaller change to the socket server, but will have an impact on performance. Once an io thread has finished processing a request from a key, they will not have the next pipelined request available in the request queue. My guess is that the producer throughput will see larger dips in the throughput, which happens when the socket buffer fills up and the producer has to wait for the server to dequeue more requests from the socket.
          Hide
          Neha Narkhede added a comment -

          Went with the simpler approach of resetting the interest ops on the producer socket key to ~(READ & WRITE) right after the read is completed. Changes include -

          1. SyncProducer

          • If required.request.acks=0, don't read response
          • Change the doSend() API to take in requestAcks. This is used by the topic metadata send as well as the produce request send. Forced the num acks for topic metadata to be 1 since no other value makes sense

          2. SocketServer

          • Added some tracing mainly to make it easier to trace what the socket server is doing
          • Reset the interest ops on the selection key to not do writes or reads after a read is complete. This prevents the server from reading more than one request from the same producer
          • In processNewResponses(), if the send buffer in the producer response is null, turn the read interest bit on, since this is a fake producer response for a produce request that had required.request.acks=0. For other produce requests, it will turn on the write interest bit like it does today. The reason I put the change here vs in the write() API is because we don't want to waste any time setting the READ interest bit to reduce the delay in reading the next pipelined produce request

          3. KafkaApis

          • Sends a fake producer response while handling a producer request with required.request.acks=0

          4. Unit testing
          The reason I changed the required.request.acks=1 in most unit tests is to avoid having unit tests that are timing dependent. We went through the exercise of cleaning up our unit test suite to not include time dependent unit tests in the past, so I haven't included any more in this patch. We should add system tests though and I will file a JIRA to get that fixed. Let me know if you think we should still add unit tests for required.request.acks=0

          Show
          Neha Narkhede added a comment - Went with the simpler approach of resetting the interest ops on the producer socket key to ~(READ & WRITE) right after the read is completed. Changes include - 1. SyncProducer If required.request.acks=0, don't read response Change the doSend() API to take in requestAcks. This is used by the topic metadata send as well as the produce request send. Forced the num acks for topic metadata to be 1 since no other value makes sense 2. SocketServer Added some tracing mainly to make it easier to trace what the socket server is doing Reset the interest ops on the selection key to not do writes or reads after a read is complete. This prevents the server from reading more than one request from the same producer In processNewResponses(), if the send buffer in the producer response is null, turn the read interest bit on, since this is a fake producer response for a produce request that had required.request.acks=0. For other produce requests, it will turn on the write interest bit like it does today. The reason I put the change here vs in the write() API is because we don't want to waste any time setting the READ interest bit to reduce the delay in reading the next pipelined produce request 3. KafkaApis Sends a fake producer response while handling a producer request with required.request.acks=0 4. Unit testing The reason I changed the required.request.acks=1 in most unit tests is to avoid having unit tests that are timing dependent. We went through the exercise of cleaning up our unit test suite to not include time dependent unit tests in the past, so I haven't included any more in this patch. We should add system tests though and I will file a JIRA to get that fixed. Let me know if you think we should still add unit tests for required.request.acks=0
          Hide
          Jay Kreps added a comment -

          This is basically good, but the comments and naming are all just carrying through specifics of producer behavior into the network layer which is a no-no. Instead, think of this as a general feature you are implementing:
          1. server enqueues one request at a time
          2. null as the response send indicates no response

          This is a nice feature for our socket server to have.

          Specifically:
          1. RequestChannel.getFakeProducerResponse? Let's not add this as a public method on the thing that handles request queueing to our network server. That doesn't seem like part of the contract of a request queue. Can you remove that and the getShutdownReceive producer request that victor seems to have hacked in. The socket server's idea of a request is just a byte buffer. There shouldn't be any notion of producers or anything like that.
          2. SocketServer.processNewResponses() the actual logic here is good, using a null send is a very logical way to say "no response". Please remove the comment about producers and num.acks and just describe the feature you have implemented: null responseSend means no response to send. This is just part of the contract of socket server.
          3. SocketServer.processNewResponses() fix misformatted else statement.
          4. SyncProducer.send--this is an api change is that going to break anything? Can we just have it return null?
          5. SyncProducer.doSend--this sends a generic request, you can't add numacks since acks are specific to ProducerRequest.
          6. Not sure if I get why we need to override required.request.acks. If we want to override is the a generic place to do that instead of each test?
          7. It would be good to add a unit test for one-way requests in the socket server.
          8. It would be good to add a unit test for the producer num.acks=0 feature.

          We also should do a quick perf test on your machine to assess the impact of only reading one request at a time (if any).

          Show
          Jay Kreps added a comment - This is basically good, but the comments and naming are all just carrying through specifics of producer behavior into the network layer which is a no-no. Instead, think of this as a general feature you are implementing: 1. server enqueues one request at a time 2. null as the response send indicates no response This is a nice feature for our socket server to have. Specifically: 1. RequestChannel.getFakeProducerResponse? Let's not add this as a public method on the thing that handles request queueing to our network server. That doesn't seem like part of the contract of a request queue. Can you remove that and the getShutdownReceive producer request that victor seems to have hacked in. The socket server's idea of a request is just a byte buffer. There shouldn't be any notion of producers or anything like that. 2. SocketServer.processNewResponses() the actual logic here is good, using a null send is a very logical way to say "no response". Please remove the comment about producers and num.acks and just describe the feature you have implemented: null responseSend means no response to send. This is just part of the contract of socket server. 3. SocketServer.processNewResponses() fix misformatted else statement. 4. SyncProducer.send--this is an api change is that going to break anything? Can we just have it return null? 5. SyncProducer.doSend--this sends a generic request, you can't add numacks since acks are specific to ProducerRequest. 6. Not sure if I get why we need to override required.request.acks. If we want to override is the a generic place to do that instead of each test? 7. It would be good to add a unit test for one-way requests in the socket server. 8. It would be good to add a unit test for the producer num.acks=0 feature. We also should do a quick perf test on your machine to assess the impact of only reading one request at a time (if any).
          Hide
          Neha Narkhede added a comment -

          Thanks for the review, these are good points. I see your point about separating kafka logic from the socket server, I guess writing some real code and having it reviewed is the best way to drill it down in my head

          1. RequestChannel.getFakeProducerResponse is a bad name, I think. Since it is a general API for getting an empty response, renamed it getEmptyResponse. You are right, it has nothing to do with producer response. I'm not too sure about the getShutdownReceive right now, will clean that in a follow up patch, right after this.
          2. SocketServer.processNewResponses()
          Good point, changed the comment to reflect that
          3. SocketServer.processNewResponses()
          done
          4. SyncProducer.send-
          Good point, this will also avoid creating hordes of Option objects.
          5. SyncProducer.doSend-
          True, this is meant to be generic though currently it is a private api inside SyncProducer. Changed it to take a boolean readResponse.
          6. Sure. Included a util in TestUtils and changed tests to use that instead of creating a properties object repeatedly
          7. Added a unit test for testing ordering of pipelined requests in SocketServerTest
          8. Added a unit test to PrimitiveApiTest for testing pipelined producer requests and ordering of the consumed data

          More changes in the patch -

          • Fixed a bug from v1 patch in socket server. Basically, the way I was resetting the interest ops was illegal. Fixed that to only reset the READ interest bit

          I think I was right about the performance concern, but it is still way off what I had imagined the degradation to be -

          replication factor = 1
          no compression
          required.request.acks=0

          producer-threads batch-size draft-patch v1/v2 patch

          1 1 19.31 6.49
          2 1 21.70 0.01

          1 50 77.84 58.07
          2 50 83.72 0.33

          1 100 75.18 45.80
          2 100 83.53 0.63

          Show
          Neha Narkhede added a comment - Thanks for the review, these are good points. I see your point about separating kafka logic from the socket server, I guess writing some real code and having it reviewed is the best way to drill it down in my head 1. RequestChannel.getFakeProducerResponse is a bad name, I think. Since it is a general API for getting an empty response, renamed it getEmptyResponse. You are right, it has nothing to do with producer response. I'm not too sure about the getShutdownReceive right now, will clean that in a follow up patch, right after this. 2. SocketServer.processNewResponses() Good point, changed the comment to reflect that 3. SocketServer.processNewResponses() done 4. SyncProducer.send- Good point, this will also avoid creating hordes of Option objects. 5. SyncProducer.doSend- True, this is meant to be generic though currently it is a private api inside SyncProducer. Changed it to take a boolean readResponse. 6. Sure. Included a util in TestUtils and changed tests to use that instead of creating a properties object repeatedly 7. Added a unit test for testing ordering of pipelined requests in SocketServerTest 8. Added a unit test to PrimitiveApiTest for testing pipelined producer requests and ordering of the consumed data More changes in the patch - Fixed a bug from v1 patch in socket server. Basically, the way I was resetting the interest ops was illegal. Fixed that to only reset the READ interest bit I think I was right about the performance concern, but it is still way off what I had imagined the degradation to be - replication factor = 1 no compression required.request.acks=0 producer-threads batch-size draft-patch v1/v2 patch 1 1 19.31 6.49 2 1 21.70 0.01 1 50 77.84 58.07 2 50 83.72 0.33 1 100 75.18 45.80 2 100 83.53 0.63
          Hide
          Jay Kreps added a comment -

          1. It would be good to remove getShutdownReceive() too. I think that method is totally unnecessary.
          2. It would be good to remove getEmptyResponse and just have KafkaApis enqueue the request object with the send null. The scala style isn't to start with getXXX and in any case I don't think this is an operation that our request channel would do.
          3. Yes, just changing the parameter name for doSend was all I had in mind.
          4. I'm not sure I understand the perf numbers will swing by.

          Show
          Jay Kreps added a comment - 1. It would be good to remove getShutdownReceive() too. I think that method is totally unnecessary. 2. It would be good to remove getEmptyResponse and just have KafkaApis enqueue the request object with the send null. The scala style isn't to start with getXXX and in any case I don't think this is an operation that our request channel would do. 3. Yes, just changing the parameter name for doSend was all I had in mind. 4. I'm not sure I understand the perf numbers will swing by.
          Hide
          ben fleis added a comment -

          I tried the v2 patch linked above this morning against v0.8 HEAD. It appears (after full throttle testing for over an hour) to have eliminated the problem originally reported in KAFKA-706.

          Show
          ben fleis added a comment - I tried the v2 patch linked above this morning against v0.8 HEAD. It appears (after full throttle testing for over an hour) to have eliminated the problem originally reported in KAFKA-706 .
          Hide
          Jay Kreps added a comment -

          Reviewing the draft patch in case we end up going that route. It might be better to do the assignment to the queue inside request channel rather than in the socket server read method. key.hashCode % numQueues should be a fine way to go. I think the selector keys have no hashCode implementation so this effectively uses the object memory address which should be plenty fast.

          Show
          Jay Kreps added a comment - Reviewing the draft patch in case we end up going that route. It might be better to do the assignment to the queue inside request channel rather than in the socket server read method. key.hashCode % numQueues should be a fine way to go. I think the selector keys have no hashCode implementation so this effectively uses the object memory address which should be plenty fast.
          Hide
          ben fleis added a comment -

          Went back and tested the 'draft' patch, and this failed in the same way that 706 was previously failing. I have not looked at the patches at all, merely blindly applied. Perhaps I made an error when applying/testing the v2 patch, although I believe I ran the same steps... YMMV. In any case draft patch has reliably failed 3x in a row, and a la 706, tcpdump confirms the lost messages.

          Show
          ben fleis added a comment - Went back and tested the 'draft' patch, and this failed in the same way that 706 was previously failing. I have not looked at the patches at all, merely blindly applied. Perhaps I made an error when applying/testing the v2 patch, although I believe I ran the same steps... YMMV. In any case draft patch has reliably failed 3x in a row, and a la 706, tcpdump confirms the lost messages.
          Hide
          Jay Kreps added a comment -

          It would be nice to have an "ordering" torture test. There are actually lots of ways ordering can get messed up (producer client code, network layer, broker, synchronization) and this would help check that invariant. This can't really be done in a unit test very well since it depends on load. This would be a command line tool that produced N messages from M threads. Each message would be of the form T-i where T is the thread number and i is a sequential counter incremented by the producer. When all N messages had been produced the tool would start a consumer and consume all the messages and keep a map of threadId => last_seen. The last_seen value should increase sequentially for each thread with no gaps.

          Show
          Jay Kreps added a comment - It would be nice to have an "ordering" torture test. There are actually lots of ways ordering can get messed up (producer client code, network layer, broker, synchronization) and this would help check that invariant. This can't really be done in a unit test very well since it depends on load. This would be a command line tool that produced N messages from M threads. Each message would be of the form T-i where T is the thread number and i is a sequential counter incremented by the producer. When all N messages had been produced the tool would start a consumer and consume all the messages and keep a map of threadId => last_seen. The last_seen value should increase sequentially for each thread with no gaps.
          Hide
          ben fleis added a comment -

          This is more-or-less what my node command line tool does, among other things. Not threaded, but event driven. But spawns both sides, uses custom correlation_ids for debugging, etc. Unfortunately, I can't release it, but if there's something to learn from it, it's available to you.

          Show
          ben fleis added a comment - This is more-or-less what my node command line tool does, among other things. Not threaded, but event driven. But spawns both sides, uses custom correlation_ids for debugging, etc. Unfortunately, I can't release it, but if there's something to learn from it, it's available to you.
          Hide
          Neha Narkhede added a comment -

          We have the producer performance that can take in an option to produce messages with an id per thread and a configurable number of threads. We can then run console consumer and the script attached here to see if messages are ordered per thread. Please cross check this script, I'm a Python noob!

          ./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --initial-message-id 1 --messages 100000 --threads 10 --topics sequential

          bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer --zookeeper localhost:2181 --topic sequential --consumer-timeout-ms 10000 --from-beginning > sequential-output &

          python -B check-message-ordering.py -f sequential-output

          Output from script -

          Checking ordering for thread 1
          All 20000 messages are ordered
          Checking ordering for thread 0
          All 21000 messages are ordered

          So far, using this test I couldn't reproduce the reordering problem on the draft patch, but I ran a short test.

          Show
          Neha Narkhede added a comment - We have the producer performance that can take in an option to produce messages with an id per thread and a configurable number of threads. We can then run console consumer and the script attached here to see if messages are ordered per thread. Please cross check this script, I'm a Python noob! ./bin/kafka-producer-perf-test.sh --broker-list localhost:9092 --initial-message-id 1 --messages 100000 --threads 10 --topics sequential bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer --zookeeper localhost:2181 --topic sequential --consumer-timeout-ms 10000 --from-beginning > sequential-output & python -B check-message-ordering.py -f sequential-output Output from script - Checking ordering for thread 1 All 20000 messages are ordered Checking ordering for thread 0 All 21000 messages are ordered So far, using this test I couldn't reproduce the reordering problem on the draft patch, but I ran a short test.
          Hide
          Neha Narkhede added a comment -

          Benchmarked the draft and v2 patches for producer throughput , here are the results -

          Message size is 1K in all the tests

          batch size 1, producer threads 1

          kafka-736-v2 - 13 MB/s

          kafka-736-draft - 30 MB/s

          batch size 100, producer threads 1

          kafka-736-v2 - 48.4 MB/s

          kafka-736-draft - 61.5 MB/s

          batch size 100, producer threads 20

          kafka-736-v2 - 11.6 MB/s

          kafka-736-draft - 81.6 MB/s

          I looked into the cause of this performance degradation on the v2 patch. What's happening is setting the selection key's interest bits to READ in processNewResponses is not reflected in the following select() operation for all BUT the first network thread (id 0). I tried the producer performance test with varying # of producer threads and network threads on the server and I consistently see this result. Due to this, all the producer connections handled by network threads with ids > 1 see very low throughput since the next request is not read until 300 ms after the previous request is finished processing. I also confirmed that the producer had sent lot of data on those low throughput connections, just the server was reading it 300 ms later. I read up a little bit about concurrency and selection keys, found this -

          "Generally, SelectionKey objects are thread-safe, but it's important to know that operations that modify the interest set are synchronized by Selector objects. This could cause calls to the interestOps( ) method to block for an indeterminate amount of time. The specific locking policy used by a selector, such as whether the locks are held throughout the selection process, is implementation-dependent.

          Overall, seems like Java NIO doesn't behave the way we want to wrt to having the updated interest bits take effect in the next select operation. This makes the v2 approach even trickier to reason about.

          Show
          Neha Narkhede added a comment - Benchmarked the draft and v2 patches for producer throughput , here are the results - Message size is 1K in all the tests batch size 1, producer threads 1 kafka-736-v2 - 13 MB/s kafka-736-draft - 30 MB/s batch size 100, producer threads 1 kafka-736-v2 - 48.4 MB/s kafka-736-draft - 61.5 MB/s batch size 100, producer threads 20 kafka-736-v2 - 11.6 MB/s kafka-736-draft - 81.6 MB/s I looked into the cause of this performance degradation on the v2 patch. What's happening is setting the selection key's interest bits to READ in processNewResponses is not reflected in the following select() operation for all BUT the first network thread (id 0). I tried the producer performance test with varying # of producer threads and network threads on the server and I consistently see this result. Due to this, all the producer connections handled by network threads with ids > 1 see very low throughput since the next request is not read until 300 ms after the previous request is finished processing. I also confirmed that the producer had sent lot of data on those low throughput connections, just the server was reading it 300 ms later. I read up a little bit about concurrency and selection keys, found this - "Generally, SelectionKey objects are thread-safe, but it's important to know that operations that modify the interest set are synchronized by Selector objects. This could cause calls to the interestOps( ) method to block for an indeterminate amount of time. The specific locking policy used by a selector, such as whether the locks are held throughout the selection process, is implementation-dependent. Overall, seems like Java NIO doesn't behave the way we want to wrt to having the updated interest bits take effect in the next select operation. This makes the v2 approach even trickier to reason about.
          Hide
          Neha Narkhede added a comment -

          Found a bug in the v2 patch that caused the multiple producer throughput degradation. The empty response was created with processor id 0. Now, when the response is enqueued, we wake up this processor id instead of the request's processor id. So, processor 0 always woke up on time, but not the rest of the processors that led to their select operation block for the entire 300 ms.

          New performance numbers make sense now and the v2/v3 design shows ~16 % throughput degradation compared to the draft patch approach, which can be explained.

          Message size is 1K in all the tests

          batch size 1, producer threads 1

          kafka-736-v3 - 13 MB/s

          kafka-736-draft - 30 MB/s

          batch size 100, producer threads 1

          kafka-736-v2 - 48.4 MB/s

          kafka-736-draft - 61.5 MB/s

          batch size 100, producer threads 20

          kafka-736-v2 - 70 MB/s

          kafka-736-draft - 81.6 MB/s

          Show
          Neha Narkhede added a comment - Found a bug in the v2 patch that caused the multiple producer throughput degradation. The empty response was created with processor id 0. Now, when the response is enqueued, we wake up this processor id instead of the request's processor id. So, processor 0 always woke up on time, but not the rest of the processors that led to their select operation block for the entire 300 ms. New performance numbers make sense now and the v2/v3 design shows ~16 % throughput degradation compared to the draft patch approach, which can be explained. Message size is 1K in all the tests batch size 1, producer threads 1 kafka-736-v3 - 13 MB/s kafka-736-draft - 30 MB/s batch size 100, producer threads 1 kafka-736-v2 - 48.4 MB/s kafka-736-draft - 61.5 MB/s batch size 100, producer threads 20 kafka-736-v2 - 70 MB/s kafka-736-draft - 81.6 MB/s
          Hide
          Jun Rao added a comment -

          Thanks for the results. The case with 20 producer threads is pretty interesting. How many partitions are there in the topic? If there is only 1 partition, all I/O threads will need to synchronize on the same log during append. So, this will serialize all I/O threads and therefore the reading of the next produce request. If this is the case, I'd suggest that we try with more partitions in the topic, with sth like 20 network threads.

          Show
          Jun Rao added a comment - Thanks for the results. The case with 20 producer threads is pretty interesting. How many partitions are there in the topic? If there is only 1 partition, all I/O threads will need to synchronize on the same log during append. So, this will serialize all I/O threads and therefore the reading of the next produce request. If this is the case, I'd suggest that we try with more partitions in the topic, with sth like 20 network threads.
          Hide
          Neha Narkhede added a comment - - edited

          In all the tests above, the number of partitions is 1. I think to compare the 2 patches, we don't need to worry about changing the # of partitions simply because there is no change in the Log layer in both patches. But, just for curiosity, I can give that a try. Here are the results -

          Number of producer threads 20
          Number of partitions 10
          Batch size 100
          Message size 1K

          v3 patch

          2013-02-03 18:34:59:834, 2013-02-03 18:35:01:327, 0, 1000, 100, 95.37, 61.8764, 100000, 66979.2364

          draft patch

          2013-02-03 18:34:05:757, 2013-02-03 18:34:07:132, 0, 1000, 100, 95.37, 70.3581, 100000, 72727.2727

          Still a ~13 % performance degradation

          Show
          Neha Narkhede added a comment - - edited In all the tests above, the number of partitions is 1. I think to compare the 2 patches, we don't need to worry about changing the # of partitions simply because there is no change in the Log layer in both patches. But, just for curiosity, I can give that a try. Here are the results - Number of producer threads 20 Number of partitions 10 Batch size 100 Message size 1K v3 patch 2013-02-03 18:34:59:834, 2013-02-03 18:35:01:327, 0, 1000, 100, 95.37, 61.8764, 100000, 66979.2364 draft patch 2013-02-03 18:34:05:757, 2013-02-03 18:34:07:132, 0, 1000, 100, 95.37, 70.3581, 100000, 72727.2727 Still a ~13 % performance degradation
          Hide
          Jay Kreps added a comment -

          This makes sense, nice catch.

          I think to make an informed decision we really need to also test the acks>0 case with, say, 20 threads. The reason is because the assumption is that the multi-queue design will have better throughput but worse latency. If multi-queue has equally good latency then I think the decision is very easy. If latency is worse then I suppose it depends how much worse? Producer perf test measures latency too, right?

          Show
          Jay Kreps added a comment - This makes sense, nice catch. I think to make an informed decision we really need to also test the acks>0 case with, say, 20 threads. The reason is because the assumption is that the multi-queue design will have better throughput but worse latency. If multi-queue has equally good latency then I think the decision is very easy. If latency is worse then I suppose it depends how much worse? Producer perf test measures latency too, right?
          Hide
          Neha Narkhede added a comment -

          Producer performance does not measure latency directly, but we expose a jmx bean that measures latency and request rate. I ran a 20 thread producer performance test with acks = 1, sent 5000000 messages and following are the latency numbers - avg and max. Full distribution of latency over the period of the test is attached here.

          v3 avg = 0.8452, max = 593.7651
          draft avg = 0.8059, avg = avg = 537.3038

          Show
          Neha Narkhede added a comment - Producer performance does not measure latency directly, but we expose a jmx bean that measures latency and request rate. I ran a 20 thread producer performance test with acks = 1, sent 5000000 messages and following are the latency numbers - avg and max. Full distribution of latency over the period of the test is attached here. v3 avg = 0.8452, max = 593.7651 draft avg = 0.8059, avg = avg = 537.3038
          Hide
          Jun Rao added a comment -

          Because of the problem identified in KAFKA-706, only the v3 patch will work. The v3 patch needs to be rebased though. Also, not sure why it includes changes related to Hadoop bridge. Is this patch mixed with another one?

          Show
          Jun Rao added a comment - Because of the problem identified in KAFKA-706 , only the v3 patch will work. The v3 patch needs to be rebased though. Also, not sure why it includes changes related to Hadoop bridge. Is this patch mixed with another one?
          Hide
          Neha Narkhede added a comment -

          Rebased

          Show
          Neha Narkhede added a comment - Rebased
          Hide
          Jun Rao added a comment -

          Thanks for patch v4. Looks good. A few minor comments. Once they are addressed, the patch can be checked in.

          40. SocketServerTest.testPipelinedRequestOrdering(): id, send, id2, send2 are not referenced

          41. SyncProducerTest.testProducerCanTimeout(): Should we remove the println?

          42. PrimitiveApiTest: testPipelinedProduceRequests seems to fail. I suspect that it's timing related. Also, produceList is not referenced.

          Show
          Jun Rao added a comment - Thanks for patch v4. Looks good. A few minor comments. Once they are addressed, the patch can be checked in. 40. SocketServerTest.testPipelinedRequestOrdering(): id, send, id2, send2 are not referenced 41. SyncProducerTest.testProducerCanTimeout(): Should we remove the println? 42. PrimitiveApiTest: testPipelinedProduceRequests seems to fail. I suspect that it's timing related. Also, produceList is not referenced.
          Hide
          Sriram Subramanian added a comment -

          +1

          Show
          Sriram Subramanian added a comment - +1
          Hide
          Neha Narkhede added a comment -

          Thanks for the reviews. I checked in v4 after addressing your latest comments.

          Show
          Neha Narkhede added a comment - Thanks for the reviews. I checked in v4 after addressing your latest comments.

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Neha Narkhede
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 24h
                24h
                Remaining:
                Remaining Estimate - 24h
                24h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development