Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6435

AsyncWaitOperator does not handle exceptions properly

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      A user reported that the AsyncWaitOperator does not handle exceptions properly. The following code snipped does not make the job fail.

      public void test() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));
      
          AsyncDataStream.unorderedWait(withTimestamps, 
              (AsyncFunction<Integer, String>) (input, collector) -> {
                  if (input == 3){
                      collector.collect(new RuntimeException("Test"));
                      return;
                  }
                  collector.collect(Collections.singleton("Ok"));
              }, 10, TimeUnit.MILLISECONDS)
              .returns(String.class)
              .print();
      
          env.execute("unit-test");
      }
      

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed via 93758082273618d9fdbb3a9b3ed916a4b637760f

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via 93758082273618d9fdbb3a9b3ed916a4b637760f
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3814

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3814
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3814

          Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3814 Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3814

          FLINK-6435 [async] React to exceptionally completed StreamElementQueueEntry

          The AsyncWaitOperator should not only react to orderly completed
          StreamElementQueueEntries but also to those completed with a user exception
          or those which timed out.

          This PR fixes the problem by calling the onComplete function passed to
          StreamElementQueueEntry#onComplete also in the exceptional case.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixAsyncWaitOperator

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3814.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3814


          commit 18ede824016e8714dd1f82f7ac4574eeeee85f70
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-03T12:40:46Z

          FLINK-6435 [async] React to exceptionally completed StreamElementQueueEntry

          The AsyncWaitOperator should not only react to orderly completed
          StreamElementQueueEntries but also to those completed with a user exception
          or those which timed out.

          This PR fixes the problem by calling the onComplete function passed to
          StreamElementQueueEntry#onComplete also in the exceptional case.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3814 FLINK-6435 [async] React to exceptionally completed StreamElementQueueEntry The AsyncWaitOperator should not only react to orderly completed StreamElementQueueEntries but also to those completed with a user exception or those which timed out. This PR fixes the problem by calling the onComplete function passed to StreamElementQueueEntry#onComplete also in the exceptional case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixAsyncWaitOperator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3814.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3814 commit 18ede824016e8714dd1f82f7ac4574eeeee85f70 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-03T12:40:46Z FLINK-6435 [async] React to exceptionally completed StreamElementQueueEntry The AsyncWaitOperator should not only react to orderly completed StreamElementQueueEntries but also to those completed with a user exception or those which timed out. This PR fixes the problem by calling the onComplete function passed to StreamElementQueueEntry#onComplete also in the exceptional case.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development