Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-3189

ConsumeKafka 0.9 and 0.10 can cause consumer rebalance when backpressure is engaged

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: None
    • Labels:
      None

      Description

      ConsumeKafka processors can alert to rebalance issues when backpressure is engaged on the output connection and is then freed up. This is because we're not doing anything with those consumers for a period of time and the kafka client detects this and initiates a rebalance. We should ensure that even when we cannot send more data due to back pressure that we at least have some sort of keep alive behavior with the kafka client. Or, if that isn't an option we should at least document the situation.

        Issue Links

          Activity

          Hide
          ijokarumawak Koji Kawamura added a comment - - edited

          I've done an experiment if Kafka consumer's pause/resume API can be useful to retain consumer connection. I've put the experimentation code in my remote branch here.
          https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf

          It seems working as expected. After back-pressure is engaged, ConsumeKafka is not triggered for a while, then when downstream recovers from back-pressure, ConsumeKafka resumes consuming messages without any loss or warning messages using the same consumer instance.

          If this is a right direction, I am going to add more synchronization, start/stop processor handling, and make it work more properly with onTrigger so that it only try to retain when there hasn't been any polling activity for some period of time.

          Any feedback would be appreciated. Thanks!

          Show
          ijokarumawak Koji Kawamura added a comment - - edited I've done an experiment if Kafka consumer's pause/resume API can be useful to retain consumer connection. I've put the experimentation code in my remote branch here. https://github.com/ijokarumawak/nifi/commit/28ba134771ec7a7e810924f655662a29662ba9bf It seems working as expected. After back-pressure is engaged, ConsumeKafka is not triggered for a while, then when downstream recovers from back-pressure, ConsumeKafka resumes consuming messages without any loss or warning messages using the same consumer instance. If this is a right direction, I am going to add more synchronization, start/stop processor handling, and make it work more properly with onTrigger so that it only try to retain when there hasn't been any polling activity for some period of time. Any feedback would be appreciated. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ijokarumawak opened a pull request:

          https://github.com/apache/nifi/pull/1527

          NIFI-3189: ConsumeKafka & Back-pressure. ConsumeKafka_0_10

          While downstream connections are full, ConsumeKafka is not scheduled to
          run onTrigger, and it won't perform poll, which is important to tell
          Kafka server that this client is alive. Thus, after a while in that
          situation, Kafka server rebalances the client, and then if downstream
          flow backs to normal and ConsumeKafka is scheduled again, the client is no
          longer has a valid connection, and a warning message is logged.

          This PR uses Kafka pause/resume consumer API to tell Kafka server that
          the client doesn't want to consume any message but still alive. It uses
          another internal thread to do so by periodically checks when was the
          last time that onTrigger is executed. If it has been a while since last
          onTrigger (poll), then it pauses the consumer and poll to send a
          heartbeat so that the connection will be kept.

          Thank you for submitting a contribution to Apache NiFi.

          In order to streamline the review of the contribution we ask you
          to ensure the following steps have been taken:

              1. For all changes:
          • [x] Is there a JIRA ticket associated with this PR? Is it referenced
            in the commit message?
          • [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
          • [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
          • [x] Is your initial contribution a single, squashed commit?
              1. For code changes:
          • [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
          • [x] Have you written or updated unit tests to verify your changes?
          • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
          • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
          • [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
          • [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
              1. For documentation related changes:
          • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
              1. Note:
                Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ijokarumawak/nifi nifi-3189

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1527.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1527


          commit 79bc337a8812af41502c5a4fc046b183b72d7ee3
          Author: Koji Kawamura <ijokarumawak@apache.org>
          Date: 2017-02-20T10:08:23Z

          NIFI-3189: ConsumeKafka & Back-pressure.

          While downstream connections are full, ConsumeKafka is not scheduled to
          run onTrigger, and it won't perform poll, which is important to tell
          Kafka server that this client is alive. Thus, after a while in that
          situation, Kafka server rebalances the client, and then if downstream
          flow backs to normal and ConsumeKafka is scheduled again, the client is no
          longer has a valid connection, and a warning message is logged.

          This PR uses Kafka pause/resume consumer API to tell Kafka server that
          the client doesn't want to consume any message but still alive. It uses
          another internal thread to do so by periodically checks when was the
          last time that onTrigger is executed. If it has been a while since last
          onTrigger (poll), then it pauses the consumer and poll to send a
          heartbeat so that the connection will be kept.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/1527 NIFI-3189 : ConsumeKafka & Back-pressure. ConsumeKafka_0_10 While downstream connections are full, ConsumeKafka is not scheduled to run onTrigger, and it won't perform poll, which is important to tell Kafka server that this client is alive. Thus, after a while in that situation, Kafka server rebalances the client, and then if downstream flow backs to normal and ConsumeKafka is scheduled again, the client is no longer has a valid connection, and a warning message is logged. This PR uses Kafka pause/resume consumer API to tell Kafka server that the client doesn't want to consume any message but still alive. It uses another internal thread to do so by periodically checks when was the last time that onTrigger is executed. If it has been a while since last onTrigger (poll), then it pauses the consumer and poll to send a heartbeat so that the connection will be kept. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: For all changes: [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. [x] Has your PR been rebased against the latest commit within the target branch (typically master)? [x] Is your initial contribution a single, squashed commit? For code changes: [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? [x] Have you written or updated unit tests to verify your changes? [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0] ( http://www.apache.org/legal/resolved.html#category-a)? [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? For documentation related changes: [ ] Have you ensured that format looks appropriate for the output in which it is rendered? Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-3189 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1527 commit 79bc337a8812af41502c5a4fc046b183b72d7ee3 Author: Koji Kawamura <ijokarumawak@apache.org> Date: 2017-02-20T10:08:23Z NIFI-3189 : ConsumeKafka & Back-pressure. While downstream connections are full, ConsumeKafka is not scheduled to run onTrigger, and it won't perform poll, which is important to tell Kafka server that this client is alive. Thus, after a while in that situation, Kafka server rebalances the client, and then if downstream flow backs to normal and ConsumeKafka is scheduled again, the client is no longer has a valid connection, and a warning message is logged. This PR uses Kafka pause/resume consumer API to tell Kafka server that the client doesn't want to consume any message but still alive. It uses another internal thread to do so by periodically checks when was the last time that onTrigger is executed. If it has been a while since last onTrigger (poll), then it pauses the consumer and poll to send a heartbeat so that the connection will be kept.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on the issue:

          https://github.com/apache/nifi/pull/1527

          Dear reviewers,

          This PR contains handful debug level logs that would make easier to understand how the additional thread is working. To see those log messages, modify `conf/logback.xml` as follows:

          ```xml
          <logger name="org.apache.nifi" level="INFO"/>
          <logger name="org.apache.nifi.processors" level="WARN"/>
          <!-- Add this line -->
          <logger name="org.apache.nifi.processors.kafka.pubsub" level="DEBUG" />
          <logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
          <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
          ```

          This PR is aimed for ConsumeKafka_0_10. Once this is proved to work properly, I'll create another PR for ConsumeKafka (0.9) similarly.

          Thanks a lot in advance!

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1527 Dear reviewers, This PR contains handful debug level logs that would make easier to understand how the additional thread is working. To see those log messages, modify `conf/logback.xml` as follows: ```xml <logger name="org.apache.nifi" level="INFO"/> <logger name="org.apache.nifi.processors" level="WARN"/> <!-- Add this line --> <logger name="org.apache.nifi.processors.kafka.pubsub" level="DEBUG" /> <logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/> <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" /> ``` This PR is aimed for ConsumeKafka_0_10. Once this is proved to work properly, I'll create another PR for ConsumeKafka (0.9) similarly. Thanks a lot in advance!
          Hide
          ijokarumawak Koji Kawamura added a comment -

          For ConsumeKafka_0_10.

          Show
          ijokarumawak Koji Kawamura added a comment - For ConsumeKafka_0_10.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on a diff in the pull request:

          https://github.com/apache/nifi/pull/1527#discussion_r102721436

          — Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java —
          @@ -126,6 +128,25 @@ public ConsumerLease obtainConsumer(final ProcessSession session)

          { return lease; }

          + public void retainConsumers() {
          + pooledLeases.forEach(lease -> {
          — End diff –

          Is there any issue if another thread calls obtainConsumer while retainConsumers is in the middle of looping over the consumers?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/1527#discussion_r102721436 — Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java — @@ -126,6 +128,25 @@ public ConsumerLease obtainConsumer(final ProcessSession session) { return lease; } + public void retainConsumers() { + pooledLeases.forEach(lease -> { — End diff – Is there any issue if another thread calls obtainConsumer while retainConsumers is in the middle of looping over the consumers?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on the issue:

          https://github.com/apache/nifi/pull/1527

          @bbende Thanks for reviewing. Incorporated comment. While I was doing so, I found that Kafka client library started supporting heartbeat in background thread KAFKA-3888(https://issues.apache.org/jira/browse/KAFKA-3888) since 0.10.1.0.

          Maybe we should test the latest Kafka client, that may automatically solve this issue without any modification on NiFi side, although I haven't looked at the new client code yet. If it does, this PR will be purely for ConsumeKafka (0.9).

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1527 @bbende Thanks for reviewing. Incorporated comment. While I was doing so, I found that Kafka client library started supporting heartbeat in background thread KAFKA-3888 ( https://issues.apache.org/jira/browse/KAFKA-3888 ) since 0.10.1.0. Maybe we should test the latest Kafka client, that may automatically solve this issue without any modification on NiFi side, although I haven't looked at the new client code yet. If it does, this PR will be purely for ConsumeKafka (0.9).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user ijokarumawak commented on the issue:

          https://github.com/apache/nifi/pull/1527

          @bbende It turned out to be updating Kafka client is sufficient for 0.10. Confirmed that ConsumeKafka_0_10 can rejoin the consumer group when it resumes after being back-pressure engaged more than 5 minutes. For 0.9, I've added the manual connection retaining logic using paused poll.

          • For ConsumeKafka_0_10, use latest client library

          Above issue has been addressed by KIP-62.
          The latest Kafka consumer poll checks if the client instance is still valid, and rejoin the group if not, before consuming messages.

          • For ConsumeKafka (0.9), added manual retention logic using pause/resume

          Kafka client 0.9 doesn't have background thread heartbeat, so similar machanism is added manually.
          Use Kafka pause/resume consumer API to tell Kafka server that the client stops consuming messages but is still alive.
          Another internal thread is used to perform paused poll periodically based on the time passed since the last onTrigger(poll) is executed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1527 @bbende It turned out to be updating Kafka client is sufficient for 0.10. Confirmed that ConsumeKafka_0_10 can rejoin the consumer group when it resumes after being back-pressure engaged more than 5 minutes. For 0.9, I've added the manual connection retaining logic using paused poll. For ConsumeKafka_0_10, use latest client library Above issue has been addressed by KIP-62. The latest Kafka consumer poll checks if the client instance is still valid, and rejoin the group if not, before consuming messages. For ConsumeKafka (0.9), added manual retention logic using pause/resume Kafka client 0.9 doesn't have background thread heartbeat, so similar machanism is added manually. Use Kafka pause/resume consumer API to tell Kafka server that the client stops consuming messages but is still alive. Another internal thread is used to perform paused poll periodically based on the time passed since the last onTrigger(poll) is executed.
          Hide
          joewitt Joseph Witt added a comment -

          Koji Kawamura updating to fix 1.2.0. Also, can you take a quick look at the latest kafka client that is out. Perhaps we should go straight to it.

          Show
          joewitt Joseph Witt added a comment - Koji Kawamura updating to fix 1.2.0. Also, can you take a quick look at the latest kafka client that is out. Perhaps we should go straight to it.
          Hide
          bende Bryan Bende added a comment -

          I can review this... Joseph Witt I believe this PR and NIFI-3528 both bump the 0.10 kafka processors to the 0.10.2.0 client which looks like the latest.

          Show
          bende Bryan Bende added a comment - I can review this... Joseph Witt I believe this PR and NIFI-3528 both bump the 0.10 kafka processors to the 0.10.2.0 client which looks like the latest.
          Hide
          joewitt Joseph Witt added a comment -

          rgr that Bryan Bende sounds good

          Show
          joewitt Joseph Witt added a comment - rgr that Bryan Bende sounds good
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bbende commented on the issue:

          https://github.com/apache/nifi/pull/1527

          +1 I recreated the scenario with 0.9 and 0.10 by setting back-pressure to 1 on the queues after ConsumeKafka and ConsumeKafka_0_10, and verified that after this patch they have no problem resuming after sitting with back-pressure for long periods of time. Will merge to master.

          Show
          githubbot ASF GitHub Bot added a comment - Github user bbende commented on the issue: https://github.com/apache/nifi/pull/1527 +1 I recreated the scenario with 0.9 and 0.10 by setting back-pressure to 1 on the queues after ConsumeKafka and ConsumeKafka_0_10, and verified that after this patch they have no problem resuming after sitting with back-pressure for long periods of time. Will merge to master.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit fd92999dafc040940011c87bb2ee2c8edf5f96a2 in nifi's branch refs/heads/master from Koji Kawamura
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=fd92999 ]

          NIFI-3189: ConsumeKafka 0.9 and 0.10 with downstream backpressure

          Currently, NiFi Kafka consumer processors have following issue.

          While downstream connections are full, ConsumeKafka is not scheduled to run onTrigger.
          It stopps executing poll to tell Kafka server that this client is alive.
          Thus, after a while in that situation, Kafka server rebalances the client.
          When downstream connections back to normal, although ConsumeKafka is scheduled again,
          the client is no longer a part of a consumer group.

          If this happens, Kafka client succeeds polling messages when ConsumeKafka processor resumes, but fails to commit offset.
          Received messages are already committed into NiFi flow, but since consumer offset is not updated, those will be consumed again, duplicated.

          In order to address above issue:

          • For ConsumeKafka_0_10, use latest client library

          Above issue has been addressed by KIP-62.
          The latest Kafka consumer poll checks if the client instance is still valid, and rejoin the group if not, before consuming messages.

          • For ConsumeKafka (0.9), added manual retention logic using pause/resume

          Kafka client 0.9 doesn't have background thread heartbeat, so similar machanism is added manually.
          Use Kafka pause/resume consumer API to tell Kafka server that the client stops consuming messages but is still alive.
          Another internal thread is used to perform paused poll periodically based on the time passed since the last onTrigger(poll) is executed.

          This closes #1527.

          Signed-off-by: Bryan Bende <bbende@apache.org>

          Show
          jira-bot ASF subversion and git services added a comment - Commit fd92999dafc040940011c87bb2ee2c8edf5f96a2 in nifi's branch refs/heads/master from Koji Kawamura [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=fd92999 ] NIFI-3189 : ConsumeKafka 0.9 and 0.10 with downstream backpressure Currently, NiFi Kafka consumer processors have following issue. While downstream connections are full, ConsumeKafka is not scheduled to run onTrigger. It stopps executing poll to tell Kafka server that this client is alive. Thus, after a while in that situation, Kafka server rebalances the client. When downstream connections back to normal, although ConsumeKafka is scheduled again, the client is no longer a part of a consumer group. If this happens, Kafka client succeeds polling messages when ConsumeKafka processor resumes, but fails to commit offset. Received messages are already committed into NiFi flow, but since consumer offset is not updated, those will be consumed again, duplicated. In order to address above issue: For ConsumeKafka_0_10, use latest client library Above issue has been addressed by KIP-62. The latest Kafka consumer poll checks if the client instance is still valid, and rejoin the group if not, before consuming messages. For ConsumeKafka (0.9), added manual retention logic using pause/resume Kafka client 0.9 doesn't have background thread heartbeat, so similar machanism is added manually. Use Kafka pause/resume consumer API to tell Kafka server that the client stops consuming messages but is still alive. Another internal thread is used to perform paused poll periodically based on the time passed since the last onTrigger(poll) is executed. This closes #1527. Signed-off-by: Bryan Bende <bbende@apache.org>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1527

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1527

            People

            • Assignee:
              ijokarumawak Koji Kawamura
              Reporter:
              joewitt Joseph Witt
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development