Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17599

Update Consumer Subscription with Current Assignment Before Partition Revocation

    XMLWordPrintableJSON

Details

    Description

      Summary:
      Ensure that the Kafka consumer subscription object is updated with the current partition assignment before partition revocation, so that close() method of the sink connector's has the access to current updated partition.
      Description:
      In scenarios where Kafka Connect Sink Tasks handle partition revocation events, it is critical to update the consumer subscription object with the current partition assignment before the revocation process begins. This will allow for better management of partition state and ensure that partition ownership is clear and accurately reflected in the consumer group.
      Current Behaviour:
      When partitions are revoked due to rebalancing, the current assignment is not explicitly updated in the consumer subscription object before revocation, leading close() being rendered to the old assignment state and it might be possible that no open is called for this.
      Race Conditions:
      Also this leads to some very rare race conditions. Race conditions in some scenario in sink connectors: There could be race conditions leading to sometime updated values and sometime old values of the current assigned partitions in some scenarios.
      Consider this:
      Since the put call goes in a loop and we have some work in put which is using "context.assignment" to access the current assignment. Let's say the task is assigned [0, 1] and [2] is being added to it and [0] is being removed from this. If the call happens in this case:
      1. Close call partition [0].
      2. Put call comes with a batch of records
      3. Open call comes [2]
      In this scenario accessing context.assignment inside put gives -> [0,1].
      But if the call happens in this way:
      1. Close call partition [0].
      2. Open call comes for partition [2]
      3. Put call comes with a batch of records.
      In this scenario accessing context.assignment inside put gives -> [1,2].
      This leads to stale and inconsistent situation which leads to inconsistent behaviour for the connectors.

      Proposed Behavior:
      Before partition revocation occurs, update the consumer subscription object with the current partition assignment to ensure consistent state tracking and smoother transitions during rebalancing.
       

      Attachments

        Issue Links

          Activity

            People

              kumarpritam8634 Pritam Kumar
              kumarpritam863 Pritam Kumar
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: