Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6435

AsyncWaitOperator does not handle exceptions properly

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      A user reported that the AsyncWaitOperator does not handle exceptions properly. The following code snipped does not make the job fail.

      public void test() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));
      
          AsyncDataStream.unorderedWait(withTimestamps, 
              (AsyncFunction<Integer, String>) (input, collector) -> {
                  if (input == 3){
                      collector.collect(new RuntimeException("Test"));
                      return;
                  }
                  collector.collect(Collections.singleton("Ok"));
              }, 10, TimeUnit.MILLISECONDS)
              .returns(String.class)
              .print();
      
          env.execute("unit-test");
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                till.rohrmann Till Rohrmann
                Reporter:
                till.rohrmann Till Rohrmann
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: