Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1835

Kafka new producer needs options to make blocking behavior explicit

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 0.8.2.0, 0.9.0.0, 0.10.1.0
    • Fix Version/s: 0.9.0.0
    • Component/s: clients
    • Labels:
      None

      Description

      The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path.

      Discussing on the mailing list, the most viable option is to have the following settings:
      pre.initialize.topics=x,y,z
      pre.initialize.timeout=x

      This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time.

      There is the question of what to do when initialization fails. There are a couple of options that I'd like available:

      • Fail creation of the client
      • Fail all sends until the meta is available

      Open to input on how the above option should be expressed.

      It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point.

      I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool).

        Attachments

        1. KAFKA-1835-New-producer--blocking_v0.patch
          17 kB
          Paul Pearcy
        2. KAFKA-1835.patch
          16 kB
          Paul Pearcy

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                ppearcy Paul Pearcy
                Reviewer:
                Jun Rao
              • Votes:
                2 Vote for this issue
                Watchers:
                17 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - 504h
                  504h
                  Remaining:
                  Remaining Estimate - 504h
                  504h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified