Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32680

Job vertex names get messed up once there is a source vertex chained with a MultipleInput vertex in job graph

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Take the following test(put it to MultipleInputITCase) as example:

          @Test
          public void testMultipleInputDoesNotChainedWithSource() throws Exception {
              testJobVertexName(false);
          }
          
          @Test
          public void testMultipleInputChainedWithSource() throws Exception {
              testJobVertexName(true);
          }
      
          public void testJobVertexName(boolean chain) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
      
              TestListResultSink<Long> resultSink = new TestListResultSink<>();
      
              DataStream<Long> source1 = env.fromSequence(0L, 3L).name("source1");
              DataStream<Long> source2 = env.fromElements(4L, 6L).name("source2");
              DataStream<Long> source3 = env.fromElements(7L, 9L).name("source3");
      
              KeyedMultipleInputTransformation<Long> transform =
                      new KeyedMultipleInputTransformation<>(
                              "MultipleInput",
                              new KeyedSumMultipleInputOperatorFactory(),
                              BasicTypeInfo.LONG_TYPE_INFO,
                              1,
                              BasicTypeInfo.LONG_TYPE_INFO);
              if (chain) {
                  transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
              }
              KeySelector<Long, Long> keySelector = (KeySelector<Long, Long>) value -> value % 3;
      
              env.addOperator(
                      transform
                              .addInput(source1.getTransformation(), keySelector)
                              .addInput(source2.getTransformation(), keySelector)
                              .addInput(source3.getTransformation(), keySelector));
      
              new MultipleConnectedStreams(env).transform(transform).rebalance().addSink(resultSink).name("sink");
      
              env.execute();
          }

       

      When we run testMultipleInputDoesNotChainedWithSource , all job vertex names are normal:

      When we run testMultipleInputChainedWithSource (the MultipleInput chained with source1), job vertex names get messed up (all job vertex names contain Source: source1):

       

      I think it's a bug.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            JunRuiLi Junrui Li
            wanglijie Lijie Wang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment