Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6246

Fix generic type of OutputTag in operator Output

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      The current signature is

      <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record)
      

      which can be improved to

      <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record)
      

      This is probably leftover from an intermediate stage of development.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3662

          FLINK-6246 Fix generic type of OutputTag in operator Output<Paste>

          R: @dawidwys

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-6246-fix-output-tag-param

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3662.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3662


          commit 3cfc5b18c27312de0e3e95a56cd78b46f80cf928
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-04-03T14:10:14Z

          FLINK-6246 Fix generic type of OutputTag in operator Output<Paste>


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3662 FLINK-6246 Fix generic type of OutputTag in operator Output<Paste> R: @dawidwys You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6246-fix-output-tag-param Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3662.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3662 commit 3cfc5b18c27312de0e3e95a56cd78b46f80cf928 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-04-03T14:10:14Z FLINK-6246 Fix generic type of OutputTag in operator Output<Paste>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3662#discussion_r109428555

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
          @@ -619,20 +619,20 @@ public void collect(StreamRecord<T> record) {
          }

          @Override

          • public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
            + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            for (int i = 0; i < outputs.length - 1; i++) {
            Output<StreamRecord<T>> output = outputs[i];

          // due to side outputs, StreamRecords of varying types can pass through the broadcasting
          // collector so we need to cast
          @SuppressWarnings(

          {"unchecked", "rawtypes"}

          )
          — End diff –

          Suppression not needed anymore. Also the comment is not adequate, as we do not cast anything.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428555 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -619,20 +619,20 @@ public void collect(StreamRecord<T> record) { } @Override public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { for (int i = 0; i < outputs.length - 1; i++) { Output<StreamRecord<T>> output = outputs [i] ; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings( {"unchecked", "rawtypes"} ) — End diff – Suppression not needed anymore. Also the comment is not adequate, as we do not cast anything.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3662#discussion_r109428611

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
          @@ -619,20 +619,20 @@ public void collect(StreamRecord<T> record) {
          }

          @Override

          • public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
            + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            for (int i = 0; i < outputs.length - 1; i++) {
            Output<StreamRecord<T>> output = outputs[i];

          // due to side outputs, StreamRecords of varying types can pass through the broadcasting
          // collector so we need to cast
          @SuppressWarnings(

          {"unchecked", "rawtypes"})
          - StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue());
          + StreamRecord<X> shallowCopy = record.copy(record.getValue());
          output.collect(outputTag, shallowCopy);
          }

          // don't copy for the last output
          @SuppressWarnings({"unchecked", "rawtypes"}

          )
          — End diff –

          Remove suppression.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428611 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -619,20 +619,20 @@ public void collect(StreamRecord<T> record) { } @Override public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { for (int i = 0; i < outputs.length - 1; i++) { Output<StreamRecord<T>> output = outputs [i] ; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings( {"unchecked", "rawtypes"}) - StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue()); + StreamRecord<X> shallowCopy = record.copy(record.getValue()); output.collect(outputTag, shallowCopy); } // don't copy for the last output @SuppressWarnings({"unchecked", "rawtypes"} ) — End diff – Remove suppression.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3662#discussion_r109428703

          — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java —
          @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception {
          }

          // execute the program

          • env.execute("Streaming Iteration Example");
            + System.out.println(env.getExecutionPlan());
              • End diff –

          I think this change should be reverted.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428703 — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java — @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); End diff – I think this change should be reverted.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3662#discussion_r109608264

          — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java —
          @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception {
          }

          // execute the program

          • env.execute("Streaming Iteration Example");
            + System.out.println(env.getExecutionPlan());
              • End diff –

          Oh no, I was playing around with something. 😱

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109608264 — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java — @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); End diff – Oh no, I was playing around with something. 😱
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3662

          @dawidwys thanks for reviewing so quickly. I pushed a commit to address your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3662 @dawidwys thanks for reviewing so quickly. I pushed a commit to address your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dawidwys commented on the issue:

          https://github.com/apache/flink/pull/3662

          Now, I think it is good to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3662 Now, I think it is good to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3662

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3662 merging.
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: 9bdbe6071f1946391598709bfa637fd76a8c7396

          Show
          Zentol Chesnay Schepler added a comment - 1.3: 9bdbe6071f1946391598709bfa637fd76a8c7396
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3662

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3662

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development