Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1865

Add a flush() call to the new producer API

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0.0
    • Component/s: None
    • Labels:
      None

      Description

      The postconditions of this would be that any record enqueued prior to flush() would have completed being sent (either successfully or not).

      An open question is whether you can continue sending new records while this call is executing (on other threads).

      We should only do this if it doesn't add inefficiencies for people who don't use it.

      1. KAFKA-1865_2015-02-21_15:36:54.patch
        24 kB
        Jay Kreps
      2. KAFKA-1865_2015-02-22_16:26:46.patch
        45 kB
        Jay Kreps
      3. KAFKA-1865_2015-02-23_18:29:16.patch
        118 kB
        Jay Kreps
      4. KAFKA-1865_2015-02-25_17:15:26.patch
        118 kB
        Jay Kreps
      5. KAFKA-1865_2015-02-26_10:37:16.patch
        118 kB
        Jay Kreps
      6. KAFKA-1865.patch
        13 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          jkreps Jay Kreps added a comment -

          A key aspect of this that isn't obvious is that flush() has to disable linger.

          That is say I have linger.ms=3000
          If I do

          for(int i = 0; i < 1000; i++)
             producer.send(new ProducerRecord("topic", Integer.toString(i));
          producer.flush();
          

          The flush call isn't as simple as just blocking on the record accumulator draining since that would mean waiting an extra 3 seconds during which of course no other records will be written. So flush should trigger immediate send just as close and memory exhaustion do in the record accumulator.

          Show
          jkreps Jay Kreps added a comment - A key aspect of this that isn't obvious is that flush() has to disable linger. That is say I have linger.ms=3000 If I do for ( int i = 0; i < 1000; i++) producer.send( new ProducerRecord( "topic" , Integer .toString(i)); producer.flush(); The flush call isn't as simple as just blocking on the record accumulator draining since that would mean waiting an extra 3 seconds during which of course no other records will be written. So flush should trigger immediate send just as close and memory exhaustion do in the record accumulator.
          Hide
          jkreps Jay Kreps added a comment -

          Created reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Created reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          jkreps Jay Kreps added a comment -

          Updated reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Updated reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          jkreps Jay Kreps added a comment -

          Actually this approach is fundamentally flawed. This patch waits on the completion of all requests in the record accumulator but not any requests that have already been sent. To be correct we need to wait on both.

          I think the right approach is to keep a set of all the ProduceRequestResults that are currently incomplete. We would add to this set as soon as a new batch is created and remote it once it is completed. Then flush would just wait on all these.

          I'll redo this but not tonight.

          Show
          jkreps Jay Kreps added a comment - Actually this approach is fundamentally flawed. This patch waits on the completion of all requests in the record accumulator but not any requests that have already been sent. To be correct we need to wait on both. I think the right approach is to keep a set of all the ProduceRequestResults that are currently incomplete. We would add to this set as soon as a new batch is created and remote it once it is completed. Then flush would just wait on all these. I'll redo this but not tonight.
          Hide
          becket_qin Jiangjie Qin added a comment -

          Yes you are right, Jay. This approach sounds good. Thanks.

          Show
          becket_qin Jiangjie Qin added a comment - Yes you are right, Jay. This approach sounds good. Thanks.
          Hide
          jkreps Jay Kreps added a comment -

          Updated reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Updated reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          jkreps Jay Kreps added a comment -

          Uploaded a new patch that tracks all incomplete RecordBatch's in the RecordAccumulator and uses these to block on for flush.

          I was having trouble with test hangs, but I'm not sure if they are related to this patch or not so I haven't yet validated the tests.

          I also improved the producer javadoc while in there since I was adding docs for flush.

          Show
          jkreps Jay Kreps added a comment - Uploaded a new patch that tracks all incomplete RecordBatch's in the RecordAccumulator and uses these to block on for flush. I was having trouble with test hangs, but I'm not sure if they are related to this patch or not so I haven't yet validated the tests. I also improved the producer javadoc while in there since I was adding docs for flush.
          Hide
          guozhang Guozhang Wang added a comment -

          Does it hang on ConsumerTest? Maybe we can disable it for now while I work on fixing the test.

          Show
          guozhang Guozhang Wang added a comment - Does it hang on ConsumerTest? Maybe we can disable it for now while I work on fixing the test.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          +1

          Show
          nehanarkhede Neha Narkhede added a comment - +1
          Hide
          jkreps Jay Kreps added a comment -

          Updated reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Updated reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          jkreps Jay Kreps added a comment -

          Updated reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Updated reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          jkreps Jay Kreps added a comment -

          Updated reviewboard https://reviews.apache.org/r/30763/diff/
          against branch trunk

          Show
          jkreps Jay Kreps added a comment - Updated reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk
          Hide
          junrao Jun Rao added a comment -

          The KIP (https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API) still says one flush at a time. We will need to change that.

          Show
          junrao Jun Rao added a comment - The KIP ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API ) still says one flush at a time. We will need to change that.

            People

            • Assignee:
              jkreps Jay Kreps
              Reporter:
              jkreps Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development