Kafka
  1. Kafka
  2. KAFKA-267

Enhance ProducerPerformance to generate unique random Long value for payload

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      This is achieved by:
      1. Adding a new class UniqueRandom to shuffle a range of numbers.
      2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
      3. The ending number of the range is the starting number + number of messages - 1.

      Other ProducerPerformance advancement:
      1. producing to multiple topics
      2. supporting multiple instances of producer performance ( and distinguishes them)
      3. allowing waiting some time after sending a request

      1. kafka_267_additional.diff
        1 kB
        Yang Ye
      2. kafka-267-v7.patch
        32 kB
        Yang Ye
      3. kafka-267-v6.patch
        27 kB
        Yang Ye
      4. kafka-267-v5.patch
        22 kB
        Yang Ye
      5. kafka-267-v4.patch
        19 kB
        Yang Ye
      6. kafka-267-v3.patch
        19 kB
        Yang Ye
      7. kafka-267-v2.patch
        20 kB
        Yang Ye
      8. kafka-267-v1.patch
        8 kB
        John Fung

        Activity

        John Fung created issue -
        John Fung made changes -
        Field Original Value New Value
        Attachment kafka-267-v1.patch [ 12513765 ]
        John Fung made changes -
        Summary Enhance ProducerPerformance to generate unique random checksum Enhance ProducerPerformance to generate unique random Long value for payload
        Description This is achieved by specifying the starting number of a range of number to be shuffled. The ending number of the range is the starting number + number of messages - 1. This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. Specifying the starting number of the range to be shuffled.
        3. The ending number of the range is the starting number + number of messages - 1.
        John Fung made changes -
        Description This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. Specifying the starting number of the range to be shuffled.
        3. The ending number of the range is the starting number + number of messages - 1.
        This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
        3. The ending number of the range is the starting number + number of messages - 1.
        John Fung made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Yang Ye made changes -
        Description This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
        3. The ending number of the range is the starting number + number of messages - 1.
        This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
        3. The ending number of the range is the starting number + number of messages - 1.


        Other per
        Yang Ye made changes -
        Description This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
        3. The ending number of the range is the starting number + number of messages - 1.


        Other per
        This is achieved by:
        1. Adding a new class UniqueRandom to shuffle a range of numbers.
        2. An optional new argument "start-index" is added to specify the starting number of the range to be shuffled. If this argument is omitted, it is defaulted to 1. So it is backward compatible with the argument options.
        3. The ending number of the range is the starting number + number of messages - 1.


        Other ProducerPerformance advancement:
        1. producing to multiple topics
        2. supporting multiple instances of producer performance ( and distinguishes them)
        3. allowing waiting some time after sending a request
        Yang Ye made changes -
        Assignee John Fung [ jfung ] Yang Ye [ yeyangever ]
        Hide
        Yang Ye added a comment -

        Refactor a log of code
        1. support mutliple topics
        2. support various sized messages
        3. support sleep after sending the mesage

        4. refactor a lot of code to make it readable

        5. can be used for both producing test data and performance test. For the former usage, one has to give the --initial-message-id argument.

        Show
        Yang Ye added a comment - Refactor a log of code 1. support mutliple topics 2. support various sized messages 3. support sleep after sending the mesage 4. refactor a lot of code to make it readable 5. can be used for both producing test data and performance test. For the former usage, one has to give the --initial-message-id argument.
        Yang Ye made changes -
        Attachment kafka-267-v2.patch [ 12547864 ]
        Hide
        Jun Rao added a comment -

        Can't find a revision that I can apply the patch cleanly. Could you rebase?

        Show
        Jun Rao added a comment - Can't find a revision that I can apply the patch cleanly. Could you rebase?
        Hide
        Yang Ye added a comment -


        rebase on kafka 42

        Show
        Yang Ye added a comment - rebase on kafka 42
        Yang Ye made changes -
        Attachment kafka-267-v3.patch [ 12548677 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. The code is still pretty messy, mostly for historical reasons. For example, the fix/variable length option is mixed with seqIdMode and batch size, etc. I suggest that we do the following: (1) Get rid of the seqId mode and always generate sequential ids in the message header. If the user doesn't specify the starting index, seq Id will start from 0. (2) Alway pad a message with random bytes, whether the message is fix length or variable length. (3) Don't batch messages into sets in the sync mode. Instead, send one message at a time in sync mode. (4) The send gap should probably be added after sending a batch of messages, not after each message. If we do all this, the send thread can just have one simple loop as the following:

        while(j < messagesPerThread) {
        for each topic

        { msgString = // get the message for a seq id of a given size msg = create Message producer.send(msg) if(config.messageSendGapMs > 0 && batch size is reached) Thread.sleep(config.messageSendGapMs) }

        }

        Also, we need to patch system test since the command line option has changed.

        Show
        Jun Rao added a comment - Thanks for patch v3. The code is still pretty messy, mostly for historical reasons. For example, the fix/variable length option is mixed with seqIdMode and batch size, etc. I suggest that we do the following: (1) Get rid of the seqId mode and always generate sequential ids in the message header. If the user doesn't specify the starting index, seq Id will start from 0. (2) Alway pad a message with random bytes, whether the message is fix length or variable length. (3) Don't batch messages into sets in the sync mode. Instead, send one message at a time in sync mode. (4) The send gap should probably be added after sending a batch of messages, not after each message. If we do all this, the send thread can just have one simple loop as the following: while(j < messagesPerThread) { for each topic { msgString = // get the message for a seq id of a given size msg = create Message producer.send(msg) if(config.messageSendGapMs > 0 && batch size is reached) Thread.sleep(config.messageSendGapMs) } } Also, we need to patch system test since the command line option has changed.
        Hide
        Yang Ye added a comment -

        create a function "generateProducerData" for common use. It can generate for both sync and async mode, fixed and variable length, sequential id mode and non-sequential id mode.

        The main structure of the fun() function of ProducerThread is shrinked to :

        For each batch:
        For each topic:
        generateProducerData
        send
        sleep if required

        Show
        Yang Ye added a comment - create a function "generateProducerData" for common use. It can generate for both sync and async mode, fixed and variable length, sequential id mode and non-sequential id mode. The main structure of the fun() function of ProducerThread is shrinked to : For each batch: For each topic: generateProducerData send sleep if required
        Yang Ye made changes -
        Attachment kafka-267-v4.patch [ 12549794 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. It's better, but still messy. Some more comments:

        40. I suggest that we combine initialMessageIdOpt and varyMessageSizeOpt into one option, something like message-mode. It will take 3 choices: sequence, fix-length, and variable-length that are mutually exclusive.

        41. I suggest that batchSizeOpt be only used in async mode. In sync mode, we always send a single message at a time without batching. This will simplify the way that we generate messages. When generating the data, we just need to generate one message at a time, depending on message-mode.

        42. The following 2 methods should be moved out of ProducerThread and put in a util class.
        generateProducerData
        generateMessageWithSeqId

        43. UniqueRandom: Why do we need this class? Can't we just use Random.nextBytes()?

        44. The following chunk of code is identical to that in Kafka. Could we create a util function to share the code?
        val metricsConfig = new KafkaMetricsConfig(verifiableProps)
        metricsConfig.reporters.foreach(reporterType =>

        { val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) }

        )

        45. There are a few long lines like the following. Let's put .format in a separate line.
        println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec,
        threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))

        Show
        Jun Rao added a comment - Thanks for patch v4. It's better, but still messy. Some more comments: 40. I suggest that we combine initialMessageIdOpt and varyMessageSizeOpt into one option, something like message-mode. It will take 3 choices: sequence, fix-length, and variable-length that are mutually exclusive. 41. I suggest that batchSizeOpt be only used in async mode. In sync mode, we always send a single message at a time without batching. This will simplify the way that we generate messages. When generating the data, we just need to generate one message at a time, depending on message-mode. 42. The following 2 methods should be moved out of ProducerThread and put in a util class. generateProducerData generateMessageWithSeqId 43. UniqueRandom: Why do we need this class? Can't we just use Random.nextBytes()? 44. The following chunk of code is identical to that in Kafka. Could we create a util function to share the code? val metricsConfig = new KafkaMetricsConfig(verifiableProps) metricsConfig.reporters.foreach(reporterType => { val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) } ) 45. There are a few long lines like the following. Let's put .format in a separate line. println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec, threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
        Hide
        Yang Ye added a comment -

        Change since v4:

        1. removing the "UniqueRandom" class and its related usages

        2. further simplify the code of function generateProducerData()

        Show
        Yang Ye added a comment - Change since v4: 1. removing the "UniqueRandom" class and its related usages 2. further simplify the code of function generateProducerData()
        Yang Ye made changes -
        Attachment kafka-267-v5.patch [ 12550541 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. Looks good. Some minor comments:

        50. In the following line in generateProducerData(), we should allocate a byte array and pass it to the constructor of Message. ByteBuffer.array() gives you the backing array and it may or may not be the size that you allocated.
        new Message(ByteBuffer.allocate(if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)).array())

        51. Could you break all long lines into multiple lines?

        52. Could you expose producer.num.retries and producer.retry.backoff.ms to the command line and default them to the same value as in ProducerConfig?

        Show
        Jun Rao added a comment - Thanks for patch v5. Looks good. Some minor comments: 50. In the following line in generateProducerData(), we should allocate a byte array and pass it to the constructor of Message. ByteBuffer.array() gives you the backing array and it may or may not be the size that you allocated. new Message(ByteBuffer.allocate(if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)).array()) 51. Could you break all long lines into multiple lines? 52. Could you expose producer.num.retries and producer.retry.backoff.ms to the command line and default them to the same value as in ProducerConfig?
        Hide
        Neha Narkhede added a comment - - edited

        Thanks for patch v5, we are pretty close to checking this in. A few more review comments -

        1. Fix the description of the topics argument, it should just say "produce to". Let's remove the consume from. Also, if it is a csv list, let's add that to the description as well
        2. Fix the description of batch-size. Right now, it just says "size". Same for threads, it just says "count"
        3. Let's describe the supported compression codecs in the description for compression-codec.
        4. Fix typo in the description for metrics-dir
        5. I think it is valuable to default to async. That is the most common production usage, I doubt we will frequently test one-message-at-a-time
        6. Should message-send-gap be renamed to message-send-gap-ms ?
        7. Now that we have metrics csv reporter, will we use ProducerPerformance to calculate the producer throughput in MB/s, messages/sec at all ? I guess we can just enable csv reporting and rely on our metrics to measure throughput/latency correctly. In that case, we don't need to compute the various metrics and also can get rid of the println and showDetailedStats parameter. With this change, we can view ProducerPerformance like a workload generator only. Not sure if I'm missing something here though.
        8. If the changes specified in #7 are implemented, let's change the system test script to dump the producer metrics for ProducerPerformance through the new metrics-dir option.
        9. generateProducerData is a bit unreadable. Recommend you separately compute the messageId and messageSize and pass it into generateMessageWithSeqId

        Show
        Neha Narkhede added a comment - - edited Thanks for patch v5, we are pretty close to checking this in. A few more review comments - 1. Fix the description of the topics argument, it should just say "produce to". Let's remove the consume from. Also, if it is a csv list, let's add that to the description as well 2. Fix the description of batch-size. Right now, it just says "size". Same for threads, it just says "count" 3. Let's describe the supported compression codecs in the description for compression-codec. 4. Fix typo in the description for metrics-dir 5. I think it is valuable to default to async. That is the most common production usage, I doubt we will frequently test one-message-at-a-time 6. Should message-send-gap be renamed to message-send-gap-ms ? 7. Now that we have metrics csv reporter, will we use ProducerPerformance to calculate the producer throughput in MB/s, messages/sec at all ? I guess we can just enable csv reporting and rely on our metrics to measure throughput/latency correctly. In that case, we don't need to compute the various metrics and also can get rid of the println and showDetailedStats parameter. With this change, we can view ProducerPerformance like a workload generator only. Not sure if I'm missing something here though. 8. If the changes specified in #7 are implemented, let's change the system test script to dump the producer metrics for ProducerPerformance through the new metrics-dir option. 9. generateProducerData is a bit unreadable. Recommend you separately compute the messageId and messageSize and pass it into generateMessageWithSeqId
        Hide
        Yang Ye added a comment -

        change since v6:

        1. the producer performance furthur simplified, long line decoupled into two lines

        2. "showdetai" option is removed from producerPerformance

        3. producer performance use async for default

        4. #messages sent in replication_test case 1 and test 0001 are set to 100,000, max log file size is changed to 1024000000, so that few log segments are created, the log checksum checking could be speed up.

        Show
        Yang Ye added a comment - change since v6: 1. the producer performance furthur simplified, long line decoupled into two lines 2. "showdetai" option is removed from producerPerformance 3. producer performance use async for default 4. #messages sent in replication_test case 1 and test 0001 are set to 100,000, max log file size is changed to 1024000000, so that few log segments are created, the log checksum checking could be speed up.
        Yang Ye made changes -
        Attachment kafka-267-v6.patch [ 12550736 ]
        Hide
        Yang Ye added a comment -


        Move the logging of successfully sending messages from Producer Performance to syncProducer

        Show
        Yang Ye added a comment - Move the logging of successfully sending messages from Producer Performance to syncProducer
        Yang Ye made changes -
        Attachment kafka-267-v7.patch [ 12550928 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v7. Not sure if printing messages in syncProducer is a good idea. In general, messages sent in syncProducer are not necessarily strings and may not be printable. ProducerPerformance, on the other hand, always sends string messages, which are printable. So, I suggest that we keep the message printing in ProducerPerformance.

        Show
        Jun Rao added a comment - Thanks for patch v7. Not sure if printing messages in syncProducer is a good idea. In general, messages sent in syncProducer are not necessarily strings and may not be printable. ProducerPerformance, on the other hand, always sends string messages, which are printable. So, I suggest that we keep the message printing in ProducerPerformance.
        Hide
        Neha Narkhede added a comment -

        Keeping the printing in ProducerPerformance would work, but we need to have a consistent way of counting the messages that were successfully sent by the async producer, during system testing. Right now, ProducerPerformance prints messages assuming they will get sent but only SyncProducer really knows if the messages were sent successfully. Thoughts ?

        Show
        Neha Narkhede added a comment - Keeping the printing in ProducerPerformance would work, but we need to have a consistent way of counting the messages that were successfully sent by the async producer, during system testing. Right now, ProducerPerformance prints messages assuming they will get sent but only SyncProducer really knows if the messages were sent successfully. Thoughts ?
        Hide
        Jun Rao added a comment -

        What we can do is that, in system tests, tune #retries and backoff time in ProducerPerformance according to the failure scenario so that we expect no data loss on the producer side. Both knobs are exposed in ProducerPerformance now.

        Show
        Jun Rao added a comment - What we can do is that, in system tests, tune #retries and backoff time in ProducerPerformance according to the failure scenario so that we expect no data loss on the producer side. Both knobs are exposed in ProducerPerformance now.
        Hide
        Yang Ye added a comment -

        V6 should be the right one, please review this one

        Show
        Yang Ye added a comment - V6 should be the right one, please review this one
        Hide
        Jun Rao added a comment -

        +1 on patch v6. Committed to 0.8.

        Show
        Jun Rao added a comment - +1 on patch v6. Committed to 0.8.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Resolution Fixed [ 1 ]
        Hide
        Yang Ye added a comment -

        This is a follow up patch in order to simulate real data load. Before we generate sequential messages padded with a series of character "x"'s, which leads to very good compression ratio which is not typical in real case. So we want to generate random payload.

        To give a comparison: sending 400 messages with batch size 200, before the data size is 48800 bytes, after the patch the size is 65600 bytes.

        Show
        Yang Ye added a comment - This is a follow up patch in order to simulate real data load. Before we generate sequential messages padded with a series of character "x"'s, which leads to very good compression ratio which is not typical in real case. So we want to generate random payload. To give a comparison: sending 400 messages with batch size 200, before the data size is 48800 bytes, after the patch the size is 65600 bytes.
        Yang Ye made changes -
        Attachment kafka_267_additional.diff [ 12551687 ]

          People

          • Assignee:
            Yang Ye
            Reporter:
            John Fung
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development