Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.16.2, 1.18.0, 1.17.1
-
None
Description
Take the following test(put it to MultipleInputITCase) as example:
@Test public void testMultipleInputDoesNotChainedWithSource() throws Exception { testJobVertexName(false); } @Test public void testMultipleInputChainedWithSource() throws Exception { testJobVertexName(true); } public void testJobVertexName(boolean chain) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); TestListResultSink<Long> resultSink = new TestListResultSink<>(); DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1"); DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2"); DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3"); KeyedMultipleInputTransformation<Long> transform = new KeyedMultipleInputTransformation<>( "MultipleInput", new KeyedSumMultipleInputOperatorFactory(), BasicTypeInfo.LONG_TYPE_INFO, 1, BasicTypeInfo.LONG_TYPE_INFO); if (chain) { transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES); } KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value -> value % 3; env.addOperator( transform .addInput(source1.getTransformation(), keySelector) .addInput(source2.getTransformation(), keySelector) .addInput(source3.getTransformation(), keySelector)); new MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink"); env.execute(); }
When we run testMultipleInputDoesNotChainedWithSource , all job vertex names are normal:
When we run testMultipleInputChainedWithSource (the MultipleInput chained with source1), job vertex names get messed up (all job vertex names contain Source: source1):
I think it's a bug.
Attachments
Attachments
Issue Links
- links to