Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5031

Consecutive DataStream.split() ignored

    XMLWordPrintableJSON

Details

    Description

      The output of the following program

      static final class ThresholdSelector implements OutputSelector<Long> {
      	long threshold;
      
      	public ThresholdSelector(long threshold) {
      		this.threshold = threshold;
      	}
      
      	@Override
      	public Iterable<String> select(Long value) {
      		if (value < threshold) {
      			return Collections.singletonList("Less");
      		} else {
      			return Collections.singletonList("GreaterEqual");
      		}
      	}
      }
      
      public static void main(String[] args) throws Exception {
      	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      	env.setParallelism(1);
      
      	SplitStream<Long> split1 = env.generateSequence(1, 11)
      		.split(new ThresholdSelector(6));
      	// stream11 should be [1,2,3,4,5]
      	DataStream<Long> stream11 = split1.select("Less");
      	SplitStream<Long> split2 = stream11
      //		.map(new MapFunction<Long, Long>() {
      //			@Override
      //			public Long map(Long value) throws Exception {
      //				return value;
      //			}
      //		})
      		.split(new ThresholdSelector(3));
      	DataStream<Long> stream21 = split2.select("Less");
      	// stream21 should be [1,2]
      	stream21.print();
      
      	env.execute();
      }
      

      should be 1, 2, however it is 1, 2, 3, 4, 5. It seems that the second split operation is ignored.

      The program is correctly evaluate if the identity MapFunction is added to the program.

      Attachments

        Issue Links

          Activity

            People

              RenkaiGe Renkai Ge
              fhueske Fabian Hueske
              Votes:
              3 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m