Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Invalid
-
1.1.3, 1.2.0
-
None
Description
The output of the following program
static final class ThresholdSelector implements OutputSelector<Long> { long threshold; public ThresholdSelector(long threshold) { this.threshold = threshold; } @Override public Iterable<String> select(Long value) { if (value < threshold) { return Collections.singletonList("Less"); } else { return Collections.singletonList("GreaterEqual"); } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SplitStream<Long> split1 = env.generateSequence(1, 11) .split(new ThresholdSelector(6)); // stream11 should be [1,2,3,4,5] DataStream<Long> stream11 = split1.select("Less"); SplitStream<Long> split2 = stream11 // .map(new MapFunction<Long, Long>() { // @Override // public Long map(Long value) throws Exception { // return value; // } // }) .split(new ThresholdSelector(3)); DataStream<Long> stream21 = split2.select("Less"); // stream21 should be [1,2] stream21.print(); env.execute(); }
should be 1, 2, however it is 1, 2, 3, 4, 5. It seems that the second split operation is ignored.
The program is correctly evaluate if the identity MapFunction is added to the program.
Attachments
Issue Links
- is duplicated by
-
FLINK-11084 Incorrect ouput after two consecutive split and select
- Closed
- relates to
-
FLINK-19083 Remove deprecated DataStream#split
- Closed
- links to