Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-2751

When a processor pulls a batch of FlowFiles, it keeps pulling from the same inbound connection instead of round-robin'ing

    Details

      Description

      When a Processor calls ProcessSession.get(int) or ProcessSession.get(FlowFileFilter), the FlowFiles only come from the first inbound connection, unless that connection is empty or doesn't have enough FlowFiles to complete the get() call. It should instead round-robin between the incoming connections, just as ProcessSession.get() does.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1211

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1211
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user markap14 commented on the issue:

          https://github.com/apache/nifi/pull/1211

          All looks good. +1 merged to master. Thanks for addressing this, @pvillard31!

          Show
          githubbot ASF GitHub Bot added a comment - Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1211 All looks good. +1 merged to master. Thanks for addressing this, @pvillard31!
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 3c694b641e3a85a55e0b09e7020d5d3cda75ac10 in nifi's branch refs/heads/master from Mark Payne
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=3c694b6 ]

          NIFI-2751: This closes #1211.

          Show
          jira-bot ASF subversion and git services added a comment - Commit 3c694b641e3a85a55e0b09e7020d5d3cda75ac10 in nifi's branch refs/heads/master from Mark Payne [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=3c694b6 ] NIFI-2751 : This closes #1211.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 45bc3e054d3c241e6abf79fc1dfe4bf22ae90a6f in nifi's branch refs/heads/master from Pierre Villard
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=45bc3e0 ]

          NIFI-2751 - Pull batch in round-robin manner - fix for / by 0

          Show
          jira-bot ASF subversion and git services added a comment - Commit 45bc3e054d3c241e6abf79fc1dfe4bf22ae90a6f in nifi's branch refs/heads/master from Pierre Villard [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=45bc3e0 ] NIFI-2751 - Pull batch in round-robin manner - fix for / by 0
          Hide
          ijokarumawak Koji Kawamura added a comment -

          I was trying to review the PR, but couldn't reproduce the ArithmeticException. The stack trace Mark posted came from BinFiles, which is a super class of MergeContent processor, and since MergeContent processor requires input connections, I couldn't setup a flow as Pierre mentioned above.

          The only possibility I think, is a case that MergeContent is scheduled, but before it polls the incoming flow files, other thread added more flow files into downstream relationship, and it becomes full:

          ProcessContext.pollFromSelfLoopsOnly
              private boolean pollFromSelfLoopsOnly() {
                  if (isTriggerWhenAnyDestinationAvailable()) {
                      // we can pull from any incoming connection, as long as at least one downstream connection
                      // is available for each relationship.
                      // I.e., we can poll only from self if no relationships are available
                      return !Connectables.anyRelationshipAvailable(connectable);
                  } else {
                      for (final Connection connection : connectable.getConnections()) {
                          // A downstream connection is full. We are only allowed to pull from self-loops.
                          if (connection.getFlowFileQueue().isFull()) { <<<< HERE?
                              return true;
                          }
                      }
                  }
          
                  return false;
              }
          

          So I tried running MergeContent with Concurrent Tasks set higher (like 4 or even 32), and let 'merged' connection get full, but ArithmeticException was not thrown.
          The change looks good to me, but since I couldn't reproduce the issue, I couldn't give it a plus one. I'd like Mark Payne to confirm the fix.

          Show
          ijokarumawak Koji Kawamura added a comment - I was trying to review the PR, but couldn't reproduce the ArithmeticException. The stack trace Mark posted came from BinFiles, which is a super class of MergeContent processor, and since MergeContent processor requires input connections, I couldn't setup a flow as Pierre mentioned above. The only possibility I think, is a case that MergeContent is scheduled, but before it polls the incoming flow files, other thread added more flow files into downstream relationship, and it becomes full: ProcessContext.pollFromSelfLoopsOnly private boolean pollFromSelfLoopsOnly() { if (isTriggerWhenAnyDestinationAvailable()) { // we can pull from any incoming connection, as long as at least one downstream connection // is available for each relationship. // I.e., we can poll only from self if no relationships are available return !Connectables.anyRelationshipAvailable(connectable); } else { for ( final Connection connection : connectable.getConnections()) { // A downstream connection is full. We are only allowed to pull from self-loops. if (connection.getFlowFileQueue().isFull()) { <<<< HERE? return true ; } } } return false ; } So I tried running MergeContent with Concurrent Tasks set higher (like 4 or even 32), and let 'merged' connection get full, but ArithmeticException was not thrown. The change looks good to me, but since I couldn't reproduce the issue, I couldn't give it a plus one. I'd like Mark Payne to confirm the fix.
          Hide
          pvillard Pierre Villard added a comment -

          nvm, I guess that when input is allowed but not required we can be in such a situation.

          Show
          pvillard Pierre Villard added a comment - nvm, I guess that when input is allowed but not required we can be in such a situation.
          Hide
          pvillard Pierre Villard added a comment -

          Mark Payne, I just submitted a PR to address this case. Not sure to clearly see why we are trying to pull a batch of flow files without having incoming connections, though. I guess this is a special situation, no? Anyway, always better to prevent a division by zero!

          Show
          pvillard Pierre Villard added a comment - Mark Payne , I just submitted a PR to address this case. Not sure to clearly see why we are trying to pull a batch of flow files without having incoming connections, though. I guess this is a special situation, no? Anyway, always better to prevent a division by zero!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user pvillard31 opened a pull request:

          https://github.com/apache/nifi/pull/1211

          NIFI-2751 - Pull batch in round-robin manner - fix for / by 0

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/pvillard31/nifi NIFI-2751

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1211.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1211


          commit 72e09eab333ee0a14936d77b76a5c5efca5a1395
          Author: Pierre Villard <pierre.villard.fr@gmail.com>
          Date: 2016-11-12T11:10:43Z

          NIFI-2751 - Pull batch in round-robin manner - fix for / by 0


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user pvillard31 opened a pull request: https://github.com/apache/nifi/pull/1211 NIFI-2751 - Pull batch in round-robin manner - fix for / by 0 You can merge this pull request into a Git repository by running: $ git pull https://github.com/pvillard31/nifi NIFI-2751 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1211 commit 72e09eab333ee0a14936d77b76a5c5efca5a1395 Author: Pierre Villard <pierre.villard.fr@gmail.com> Date: 2016-11-12T11:10:43Z NIFI-2751 - Pull batch in round-robin manner - fix for / by 0
          Hide
          markap14 Mark Payne added a comment -

          Re-opening and setting to a blocker, as I'm seeing an Exception getting thrown now from StandardProcessSession on occasion:

          2016-11-11 15:41:18,087 WARN [Timer-Driven Process Thread-10] o.a.n.c.t.ContinuallyRunProcessorTask
          java.lang.ArithmeticException: / by zero
                  at org.apache.nifi.controller.repository.StandardProcessSession.get(StandardProcessSession.java:1446) ~[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.processor.util.bin.BinFiles.binFlowFiles(BinFiles.java:266) ~[nifi-processor-utils-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:178) ~[nifi-processor-utils-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) ~[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT]
                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]
                  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_60]
                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_60]
                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_60]
                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]
                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]
                  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
          
          Show
          markap14 Mark Payne added a comment - Re-opening and setting to a blocker, as I'm seeing an Exception getting thrown now from StandardProcessSession on occasion: 2016-11-11 15:41:18,087 WARN [Timer-Driven Process Thread -10] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.ArithmeticException: / by zero at org.apache.nifi.controller.repository.StandardProcessSession.get(StandardProcessSession.java:1446) ~[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.processor.util.bin.BinFiles.binFlowFiles(BinFiles.java:266) ~[nifi-processor-utils-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:178) ~[nifi-processor-utils-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) ~[nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.0-SNAPSHOT.jar:1.1.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_60] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_60] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60] at java.lang. Thread .run( Thread .java:745) [na:1.8.0_60]
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/nifi/pull/1111

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1111
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 6aefc0b9102bc83456ff537ade22946715bdcd30 in nifi's branch refs/heads/master from Pierre Villard
          [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=6aefc0b ]

          NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner

          This closes #1111

          Show
          jira-bot ASF subversion and git services added a comment - Commit 6aefc0b9102bc83456ff537ade22946715bdcd30 in nifi's branch refs/heads/master from Pierre Villard [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=6aefc0b ] NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner This closes #1111
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mattyb149 commented on the issue:

          https://github.com/apache/nifi/pull/1111

          +1 LGTM, tried with and without the fix, verified that the flow files are drained in a fair manner. Merging to master

          Show
          githubbot ASF GitHub Bot added a comment - Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/1111 +1 LGTM, tried with and without the fix, verified that the flow files are drained in a fair manner. Merging to master
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gresockj commented on the issue:

          https://github.com/apache/nifi/pull/1111

          I tested this patch on a NiFi 1.0.0 instance and it resolves NIFI-2848!

          Thanks Pierre.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gresockj commented on the issue: https://github.com/apache/nifi/pull/1111 I tested this patch on a NiFi 1.0.0 instance and it resolves NIFI-2848 ! Thanks Pierre.
          Hide
          jgresock Joseph Gresock added a comment -

          GitHub is down, presumably due to the cyber attack, otherwise I'd comment there.

          Yes, I tested this patch on a NiFi 1.0.0 instance and it resolves NIFI-2848!

          Thanks Pierre.

          Show
          jgresock Joseph Gresock added a comment - GitHub is down, presumably due to the cyber attack, otherwise I'd comment there. Yes, I tested this patch on a NiFi 1.0.0 instance and it resolves NIFI-2848 ! Thanks Pierre.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gresockj commented on the issue:

          https://github.com/apache/nifi/pull/1111

          Yes, I will take a look today

          Show
          githubbot ASF GitHub Bot added a comment - Github user gresockj commented on the issue: https://github.com/apache/nifi/pull/1111 Yes, I will take a look today
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user trixpan commented on the issue:

          https://github.com/apache/nifi/pull/1111

          @gresockj can you confirm thij has this fixed NIFI-2848 (as per your template)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user trixpan commented on the issue: https://github.com/apache/nifi/pull/1111 @gresockj can you confirm thij has this fixed NIFI-2848 (as per your template)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user pvillard31 opened a pull request:

          https://github.com/apache/nifi/pull/1111

          NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/pvillard31/nifi NIFI-2848

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/nifi/pull/1111.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1111


          commit 7035a185ad5b4c2a4872042451cd83ac8fa98551
          Author: Pierre Villard <pierre.villard.fr@gmail.com>
          Date: 2016-10-06T20:06:45Z

          NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user pvillard31 opened a pull request: https://github.com/apache/nifi/pull/1111 NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner You can merge this pull request into a Git repository by running: $ git pull https://github.com/pvillard31/nifi NIFI-2848 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1111 commit 7035a185ad5b4c2a4872042451cd83ac8fa98551 Author: Pierre Villard <pierre.villard.fr@gmail.com> Date: 2016-10-06T20:06:45Z NIFI-2751 NIFI-2848 Get batch of flow files in a round-robin manner

            People

            • Assignee:
              pvillard Pierre Villard
              Reporter:
              markap14 Mark Payne
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development