Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6552

Side outputs don't allow differing output types

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.4.0
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      When calling

      {SingleOutputStreamOperator#getSideOutput(OutputTag<X>}

      multiple times with the output tags having different types you get the following exception:

       "Trying to add a side input for the same id with a different type. This is not allowed." 
      

      This error message is ambiguous, as it could either mean that you cannot add 2 side outputs with the same name but different types or that 2 side outputs with different types cannot be retrieved from a single operator.

      Furthermore, the error message contains the concept of node id's (i guess?) which users aren't exposed to. This is confusing and should be reworded to work with operators.

      Lastly, i find this limitation rather odd. It is possible for an operator to have multiple side outputs. It is also possible to have a side output with a different type than the main output. Yet, it is not possible to have multiple side outputs with different types.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Absolutely right, the message should be "Trying to add a side output for the same side-output id with a different type. This is not allowed." (Notice how it also says "side input" in the original message).

          It should be possible to have multiple side outputs with different types. Where did this not work? I'm also wondering why you see this error message, it should already be caught by this: https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L447-L447

          Show
          aljoscha Aljoscha Krettek added a comment - Absolutely right, the message should be "Trying to add a side output for the same side-output id with a different type. This is not allowed." (Notice how it also says "side input" in the original message). It should be possible to have multiple side outputs with different types. Where did this not work? I'm also wondering why you see this error message, it should already be caught by this: https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L447-L447
          Hide
          Zentol Chesnay Schepler added a comment -

          Here's a minimal self-contained example that causes the exception to be thrown:

          	private static final OutputTag<Integer> intTag = new OutputTag<Integer>("int") {};
          	private static final OutputTag<Long> longTag = new OutputTag<Long>("long") {};
          
          	public static void main(String[] args) throws Exception {
          		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          		SingleOutputStreamOperator out =
          			env.generateSequence(0, 100).process(new MyProcessFunction());
          
          		out.getSideOutput(intTag).print();
          		out.getSideOutput(longTag).print();
          
          		env.execute();
          	}
          
          	public static class MyProcessFunction extends ProcessFunction<Long, String> {
          
          		@Override
          		public void processElement(Long value, Context ctx, Collector<String> out) throws Exception {
          		}
          	}
          
          Show
          Zentol Chesnay Schepler added a comment - Here's a minimal self-contained example that causes the exception to be thrown: private static final OutputTag< Integer > intTag = new OutputTag< Integer >( " int " ) {}; private static final OutputTag< Long > longTag = new OutputTag< Long >( " long " ) {}; public static void main( String [] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator out = env.generateSequence(0, 100).process( new MyProcessFunction()); out.getSideOutput(intTag).print(); out.getSideOutput(longTag).print(); env.execute(); } public static class MyProcessFunction extends ProcessFunction< Long , String > { @Override public void processElement( Long value, Context ctx, Collector< String > out) throws Exception { } }
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Congratulations, you found a bug! 😅

          I have a PR coming up in a sec. Thanks a lot for looking into this!

          Show
          aljoscha Aljoscha Krettek added a comment - Congratulations, you found a bug! 😅 I have a PR coming up in a sec. Thanks a lot for looking into this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3885

          FLINK-6552 Allow differing types for side outputs

          R: @zentol

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-6552-fix-side-output-types

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3885.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3885


          commit aef59811512c60bf4111ddac076ecbf8166ec947
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-05-12T12:40:44Z

          FLINK-6552 Allow differing types for side outputs


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3885 FLINK-6552 Allow differing types for side outputs R: @zentol You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6552-fix-side-output-types Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3885.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3885 commit aef59811512c60bf4111ddac076ecbf8166ec947 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-05-12T12:40:44Z FLINK-6552 Allow differing types for side outputs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3885

          You're right. 😓 I'm fixing and then merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3885 You're right. 😓 I'm fixing and then merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3885#discussion_r116488176

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
          @@ -318,9 +318,10 @@ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, Outp
          continue;
          }

          • if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
          • throw new IllegalArgumentException("Trying to add a side input for the same id " +
          • "with a different type. This is not allowed.");
            + if (tag.f1.getId().equals(outputTag.getId()) &&
            + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
            + throw new IllegalArgumentException("Trying to add a side output for the same" +
            + "side-output id with a different type. This is not allowed.");
              • End diff –

          Could also include the tag name for good measure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3885#discussion_r116488176 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -318,9 +318,10 @@ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, Outp continue; } if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { throw new IllegalArgumentException("Trying to add a side input for the same id " + "with a different type. This is not allowed."); + if (tag.f1.getId().equals(outputTag.getId()) && + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { + throw new IllegalArgumentException("Trying to add a side output for the same" + + "side-output id with a different type. This is not allowed."); End diff – Could also include the tag name for good measure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3885

          Merged

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3885 Merged
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/3885

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3885
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fixed on release-1.3 in
          446d651c11a6dd0f78c3b39870d192a09acbe38f

          Fixed on master in
          4651a1690ac8d5784071eae1fad8ce179385cdaa

          Show
          aljoscha Aljoscha Krettek added a comment - Fixed on release-1.3 in 446d651c11a6dd0f78c3b39870d192a09acbe38f Fixed on master in 4651a1690ac8d5784071eae1fad8ce179385cdaa
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tedyu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3885#discussion_r116611287

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
          @@ -318,9 +318,10 @@ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, Outp
          continue;
          }

          • if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
          • throw new IllegalArgumentException("Trying to add a side input for the same id " +
          • "with a different type. This is not allowed.");
            + if (tag.f1.getId().equals(outputTag.getId()) &&
            + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) {
            + throw new IllegalArgumentException("Trying to add a side output for the same" +
              • End diff –

          minor: missing space after "same"

          Show
          githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3885#discussion_r116611287 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -318,9 +318,10 @@ public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, Outp continue; } if (!tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { throw new IllegalArgumentException("Trying to add a side input for the same id " + "with a different type. This is not allowed."); + if (tag.f1.getId().equals(outputTag.getId()) && + !tag.f1.getTypeInfo().equals(outputTag.getTypeInfo())) { + throw new IllegalArgumentException("Trying to add a side output for the same" + End diff – minor: missing space after "same"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3885

          Thanks @tedyu and @zentol! I pushed another commit with these minor fixes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3885 Thanks @tedyu and @zentol! I pushed another commit with these minor fixes.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              Zentol Chesnay Schepler
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development