Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5031

Consecutive DataStream.split() ignored

    XMLWordPrintableJSON

    Details

      Description

      The output of the following program

      static final class ThresholdSelector implements OutputSelector<Long> {
      	long threshold;
      
      	public ThresholdSelector(long threshold) {
      		this.threshold = threshold;
      	}
      
      	@Override
      	public Iterable<String> select(Long value) {
      		if (value < threshold) {
      			return Collections.singletonList("Less");
      		} else {
      			return Collections.singletonList("GreaterEqual");
      		}
      	}
      }
      
      public static void main(String[] args) throws Exception {
      	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      	env.setParallelism(1);
      
      	SplitStream<Long> split1 = env.generateSequence(1, 11)
      		.split(new ThresholdSelector(6));
      	// stream11 should be [1,2,3,4,5]
      	DataStream<Long> stream11 = split1.select("Less");
      	SplitStream<Long> split2 = stream11
      //		.map(new MapFunction<Long, Long>() {
      //			@Override
      //			public Long map(Long value) throws Exception {
      //				return value;
      //			}
      //		})
      		.split(new ThresholdSelector(3));
      	DataStream<Long> stream21 = split2.select("Less");
      	// stream21 should be [1,2]
      	stream21.print();
      
      	env.execute();
      }
      

      should be 1, 2, however it is 1, 2, 3, 4, 5. It seems that the second split operation is ignored.

      The program is correctly evaluate if the identity MapFunction is added to the program.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                RenkaiGe Renkai Ge
                Reporter:
                fhueske Fabian Hueske
              • Votes:
                3 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m