Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16665

Fail to get partition's position from within onPartitionsAssigned callback in new consumer

    XMLWordPrintableJSON

Details

    Description

      If we attempt to call consumer.position(tp) from within the onPartitionsAssigned callback, the new consumer fails with a TimeoutException. The expectation is that we should be able to retrieve the position of newly assigned partitions, as it happens with the legacy consumer, that allows this call. This is actually used from places within Kafka itself (ex. Connect WorkerSinkTask)

      The failure in the new consumer is because the partitions that are assigned but awaiting the onPartitionsAssigned callback, are excluded from the list of partitions that should initialize. We should allow the partitions to initialize their positions, without allowing to fetch data from them (which is already achieve based on the isFetchable flag in the subscription state).

      Note that a partition position can be updated from 2 places: call to consumer.position or call to consumer.poll. Both will attempt to `updateFetchPositions` if there is no valid position yet, but even after having a valid position after those calls, the partition will remain non-fetchable until the onPartitionsAssigned callback completes (fetchable considers that the partitions has a valid position AND is not awaiting the callback)

      Attachments

        Issue Links

          Activity

            People

              lianetm Lianet Magrans
              lianetm Lianet Magrans
              Lucas Brutschy Lucas Brutschy
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: