Details

      Issue Links

        Activity

        Hide
        danbress Dan Bress added a comment -

        I would like to see WindowedStream make use of this feature for late events. I could see the WindowStream emitting the late events to a side channel so that I can do with them what I please.

        Show
        danbress Dan Bress added a comment - I would like to see WindowedStream make use of this feature for late events. I could see the WindowStream emitting the late events to a side channel so that I can do with them what I please.
        Hide
        aljoscha Aljoscha Krettek added a comment -

        Yup, this was one of the motivations for this feature.

        Show
        aljoscha Aljoscha Krettek added a comment - Yup, this was one of the motivations for this feature.
        Hide
        foxss Chen Qin added a comment -

        Ongoing implementation reflecting feedbacks, missing window stream impl
        https://github.com/apache/flink/compare/master...chenqin:flip

        Show
        foxss Chen Qin added a comment - Ongoing implementation reflecting feedbacks, missing window stream impl https://github.com/apache/flink/compare/master...chenqin:flip
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user chenqin opened a pull request:

        https://github.com/apache/flink/pull/2982

        FLINK-4460 Side Outputs in Flink

        [FLIP-13](https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink) Expose sideOutput with `OutputTag<T>`,

        For those userFunction provide `Collector<OUT> collector` as a parameter,

        • it offer a util class`CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(collector);` which can write sideOutput element `wrapper.collect(OutputTag<SIDEOUT> tag, sideout)` as well as `getSideOutput(OutputTag<SIDEOUT> tag)` in `singleStreamOutputOpeator` and get sideOutput DataStream<SIDEOUT>.
        • each OutputTag with same type can have different value, getSideOutput will only expose element with exact same OutputTag type and value.

        sideOutput Late arriving event if

        • time characteristic set to eventTime
        • all assigned window(s) isLate return(s) true
        • event timestamp no later than currentWatermark+ allowedLateness)

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/chenqin/flink flip

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/2982.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #2982


        commit de674f19fcbe9955cb4208ef0938fe5b0f7adc90
        Author: Chen Qin <qinnchen@fgmail.com>
        Date: 2016-10-21T19:38:04Z

        allow mutpile output stream

        commit 3d91e6c69dbfbcb2c73dcc37ac2d8ed637a374eb
        Author: Chen Qin <cq@uber.com>
        Date: 2016-11-29T21:24:09Z

        Merge branch 'master' into flip

        commit 977b2d7fc54e1f9663a5ceb8a62ed2af5a955ca6
        Author: Chen Qin <cq@uber.com>
        Date: 2016-12-01T22:19:56Z

        allow mutiple OutputTag with same type
        implement windowopeator late arriving events
        add unit/integration tests


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user chenqin opened a pull request: https://github.com/apache/flink/pull/2982 FLINK-4460 Side Outputs in Flink [FLIP-13] ( https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink ) Expose sideOutput with `OutputTag<T>`, For those userFunction provide `Collector<OUT> collector` as a parameter, it offer a util class`CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(collector);` which can write sideOutput element `wrapper.collect(OutputTag<SIDEOUT> tag, sideout)` as well as `getSideOutput(OutputTag<SIDEOUT> tag)` in `singleStreamOutputOpeator` and get sideOutput DataStream<SIDEOUT>. each OutputTag with same type can have different value, getSideOutput will only expose element with exact same OutputTag type and value. sideOutput Late arriving event if time characteristic set to eventTime all assigned window(s) isLate return(s) true event timestamp no later than currentWatermark+ allowedLateness) You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenqin/flink flip Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2982.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2982 commit de674f19fcbe9955cb4208ef0938fe5b0f7adc90 Author: Chen Qin <qinnchen@fgmail.com> Date: 2016-10-21T19:38:04Z allow mutpile output stream commit 3d91e6c69dbfbcb2c73dcc37ac2d8ed637a374eb Author: Chen Qin <cq@uber.com> Date: 2016-11-29T21:24:09Z Merge branch 'master' into flip commit 977b2d7fc54e1f9663a5ceb8a62ed2af5a955ca6 Author: Chen Qin <cq@uber.com> Date: 2016-12-01T22:19:56Z allow mutiple OutputTag with same type implement windowopeator late arriving events add unit/integration tests
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        cc @aljoscha

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 cc @aljoscha
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/2982

        @chenqin I had a quick look at the implementation and it looks quite good. I'll look at it in more detail once the 1.2 release is out and then I'll also have more thorough comments.

        These are some quick comments off the top of my head:

        • I think we can extend `Collector` with a `collect(OutputTag, T)` method. Then we wouldn't need the extra `RichCollector` and `CollectorWrapper` to work around that.
        • For `WindowedStream` I would like to have something like this:
          ```
          OutputTag<T> lateElementsOutput = ...;
          DataStream<T> input = ...
          SingleOutputStreamOperator<O> windowed = input
          .keyBy(...)
          .window(...)
          .apply(Function, lateElementsOutput);

        DataStream<T> lateElements = windowed.getSideOutput(lateElementsOutput);
        ```
        or maube something else if we find a better Idea. With the `WindowedStream.tooLateElements()` this would instantiate an extra `WindowOperator` just for getting late elements while another window operator would be responsible for processing the actual elements. That seems wasteful.

        What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I had a quick look at the implementation and it looks quite good. I'll look at it in more detail once the 1.2 release is out and then I'll also have more thorough comments. These are some quick comments off the top of my head: I think we can extend `Collector` with a `collect(OutputTag, T)` method. Then we wouldn't need the extra `RichCollector` and `CollectorWrapper` to work around that. For `WindowedStream` I would like to have something like this: ``` OutputTag<T> lateElementsOutput = ...; DataStream<T> input = ... SingleOutputStreamOperator<O> windowed = input .keyBy(...) .window(...) .apply(Function, lateElementsOutput); DataStream<T> lateElements = windowed.getSideOutput(lateElementsOutput); ``` or maube something else if we find a better Idea. With the `WindowedStream.tooLateElements()` this would instantiate an extra `WindowOperator` just for getting late elements while another window operator would be responsible for processing the actual elements. That seems wasteful. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        @aljoscha Thanks for your time. We can chat more after 1.2 release!

        I think it makes sense to extends Collector, even though we may not remove collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in [FLIP-13 email thread](http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html)

        Only point I would like to add: there seems decent amount of refactor to replace underlining output chains using collect(tag, element), yet seems reasonable investment moving forward (multiple inputs/ multiple outputs)

        `tooLateEvents()` method is something added for user's convenience. should be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same type as input (which is like already fixed once input type defined). Add late arriving tag within apply method seems redudant. In fact, without any changes to this diff, user also be able to access late arriving events via following way.

        `
        OutputTag<T> lateElementsOutput = new LateArrivingOutputTag<T>();
        DataStream<T> input = ...
        SingleOutputStreamOperator<O> windowed = input
        .keyBy(...)
        .window(...)
        .apply(Function);

        DataStream<T> lateElements = windowed.getSideOutput(lateElementsOutput);
        `

        Thanks,
        Chen

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Thanks for your time. We can chat more after 1.2 release! I think it makes sense to extends Collector, even though we may not remove collect(T obj) due to API compability issue in 1.X. Per @fhueske comments in [FLIP-13 email thread] ( http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-td14204.html ) Only point I would like to add: there seems decent amount of refactor to replace underlining output chains using collect(tag, element), yet seems reasonable investment moving forward (multiple inputs/ multiple outputs) `tooLateEvents()` method is something added for user's convenience. should be fine to remove if doesn't gain much benefit. `LateArrivingTag` share same type as input (which is like already fixed once input type defined). Add late arriving tag within apply method seems redudant. In fact, without any changes to this diff, user also be able to access late arriving events via following way. ` OutputTag<T> lateElementsOutput = new LateArrivingOutputTag<T>(); DataStream<T> input = ... SingleOutputStreamOperator<O> windowed = input .keyBy(...) .window(...) .apply(Function); DataStream<T> lateElements = windowed.getSideOutput(lateElementsOutput); ` Thanks, Chen
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        @aljoscha,

        diff has been updated and merged to support `apply(Function, lateElementsOutput);`
        `Collector` refactor seems not viable option as stated above.

        Seems travis-ci timeouted here, is there a way to increase timeout setting?
        https://travis-ci.org/chenqin/flink/builds/201329246

        Chen

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha, diff has been updated and merged to support `apply(Function, lateElementsOutput);` `Collector` refactor seems not viable option as stated above. Seems travis-ci timeouted here, is there a way to increase timeout setting? https://travis-ci.org/chenqin/flink/builds/201329246 Chen
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/2982

        @chenqin

        I rebased and consolidated your PR a bit and played around with the APIs.

        Some of the changes:

        • Separation into internal changes/window operator changes and user-function changes. I have a prototype commit that exposes side outputs using `ProcessFunction`.
        • I removed `CollectorWrapper`/`RichCollector` in favour of the `ProcessFunction` approach
        • The internal implementation now doesn't store the `OutputTag` on the `StreamRecord` but instead adds an additional method on `Output` that should be used for emitting data to a side output. Side outputs now also work with chaining.
        • `WindowedStream` is changed to add a `sideOutputLateData()` method that is used to specify that late data side output is required. This is more general than putting it into the method signature of `apply()` because it will simply work for all different window types.

        I quickly talked to @StephanEwen and we agreed that we need to further think about how we want to expose side outputs for user-defined functions. Especially, we have to think about what this addition means for the `split()`/`select()` pattern. I will also do another change where `sideOutputLateData()` is not required but instead will be added when the late-date stream is requested.

        What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I rebased and consolidated your PR a bit and played around with the APIs. Some of the changes: Separation into internal changes/window operator changes and user-function changes. I have a prototype commit that exposes side outputs using `ProcessFunction`. I removed `CollectorWrapper`/`RichCollector` in favour of the `ProcessFunction` approach The internal implementation now doesn't store the `OutputTag` on the `StreamRecord` but instead adds an additional method on `Output` that should be used for emitting data to a side output. Side outputs now also work with chaining. `WindowedStream` is changed to add a `sideOutputLateData()` method that is used to specify that late data side output is required. This is more general than putting it into the method signature of `apply()` because it will simply work for all different window types. I quickly talked to @StephanEwen and we agreed that we need to further think about how we want to expose side outputs for user-defined functions. Especially, we have to think about what this addition means for the `split()`/`select()` pattern. I will also do another change where `sideOutputLateData()` is not required but instead will be added when the late-date stream is requested. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        @aljoscha Looks good to me 👍

        I briefly looked at your git branch, a minor comment would be adding comments to `sideOutputLateData` so user get better idea when they opt-in to late arriving event stream.

        Initial late arriving event is decided by comparing watermark & eventTime, do you think there is a need to allow user pass a kinda `Evaluator` and enable user sideOutput any kind of sideOutputs?

        `window.sideOutput(OutputTag<IN>, Evaluator)`

        `interface Evaluator

        { MergedWindows, key, watermark}

        `

        • Regarding `split` `select`, I think there is a chance of consolidate select and build upon `OutputTag`, but might be out of this PR's scope.
        • Regarding to `WindowStream`, I am a bit confused to figure out if I use `allowedlateness` and `sideOutputLateData` at same time.

        Thanks,
        Chen

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Looks good to me 👍 I briefly looked at your git branch, a minor comment would be adding comments to `sideOutputLateData` so user get better idea when they opt-in to late arriving event stream. Initial late arriving event is decided by comparing watermark & eventTime, do you think there is a need to allow user pass a kinda `Evaluator` and enable user sideOutput any kind of sideOutputs? `window.sideOutput(OutputTag<IN>, Evaluator)` `interface Evaluator { MergedWindows, key, watermark} ` Regarding `split` `select`, I think there is a chance of consolidate select and build upon `OutputTag`, but might be out of this PR's scope. Regarding to `WindowStream`, I am a bit confused to figure out if I use `allowedlateness` and `sideOutputLateData` at same time. Thanks, Chen
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/2982

        Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that.

        Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 Thanks for looking at that! I'll open a new discussion thread on the Mailing lists to discuss Side Outputs and split/select and how we're going to proceed with that. Regarding your other questions: I think we might add such an `Evaluator` interface in the future but for now I would like to keep it simple and see if that works for people. And yes, a user would have to use `allowedLateness` and `sideOutputLateData` at the same time if they want to use late data, or they can go with the default allowed lateness of zero and also get the late data as a side output.
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user aljoscha opened a pull request:

        https://github.com/apache/flink/pull/3438

        FLINK-4460 Allow ProcessFunction on non-keyed streams

        This is in preparation for side outputs, which will only work on `ProcessFunction`. We still want side outputs on non-keyed streams so we have to make `ProcessFunction` available there.

        See this ML thread for reference: https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/aljoscha/flink jira-4460-process-for-everyone

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3438.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3438


        commit 38f33f2399598b521a7e34e8dea1d236f5672042
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-01T09:57:12Z

        FLINK-4460 Make ProcessFunction abstract, add default onTime() method

        This is in preparation of allowing ProcessFunction on DataStream because
        we will use it to allow side outputs from the ProcessFunction Context.

        commit f0dd2c0d81a847cfa4f3d241ce874db6807caee2
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-01T10:41:02Z

        FLINK-4660 Allow ProcessFunction on DataStream

        Introduce new ProcessOperator for this. Rename the pre-existing
        ProcessOperator to KeyedProcessOperator.

        commit 29ca9b4b794522b35d84d9f19edc5b0bb9f64912
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-01T10:33:03Z

        FLINK-4460 Make CoProcessFunction abstract, add default onTime() method

        This is in preparation of allowing CoProcessFunction on a non-keyed
        connected stream. we will use it to allow side outputs from the
        ProcessFunction Context.

        commit a26accf8feebaf9d3566055c0d1eb3006986bfd6
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-01T11:02:34Z

        FLINK-4660 Allow CoProcessFunction on non-keyed ConnectedStreams

        Introduce new CoProcessOperator for this. Rename the pre-existing
        CoProcessOperator to KeyedCoProcessOperator.


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3438 FLINK-4460 Allow ProcessFunction on non-keyed streams This is in preparation for side outputs, which will only work on `ProcessFunction`. We still want side outputs on non-keyed streams so we have to make `ProcessFunction` available there. See this ML thread for reference: https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-4460-process-for-everyone Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3438.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3438 commit 38f33f2399598b521a7e34e8dea1d236f5672042 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-01T09:57:12Z FLINK-4460 Make ProcessFunction abstract, add default onTime() method This is in preparation of allowing ProcessFunction on DataStream because we will use it to allow side outputs from the ProcessFunction Context. commit f0dd2c0d81a847cfa4f3d241ce874db6807caee2 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-01T10:41:02Z FLINK-4660 Allow ProcessFunction on DataStream Introduce new ProcessOperator for this. Rename the pre-existing ProcessOperator to KeyedProcessOperator. commit 29ca9b4b794522b35d84d9f19edc5b0bb9f64912 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-01T10:33:03Z FLINK-4460 Make CoProcessFunction abstract, add default onTime() method This is in preparation of allowing CoProcessFunction on a non-keyed connected stream. we will use it to allow side outputs from the ProcessFunction Context. commit a26accf8feebaf9d3566055c0d1eb3006986bfd6 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-01T11:02:34Z FLINK-4660 Allow CoProcessFunction on non-keyed ConnectedStreams Introduce new CoProcessOperator for this. Rename the pre-existing CoProcessOperator to KeyedCoProcessOperator.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        R: @uce or @rmetzger for review, please

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 R: @uce or @rmetzger for review, please
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/2982

        @chenqin Would you like your commits to be attributed to "Chen Qin <qinnchen@gmail.com>"
        or "Chen Qin <cq@uber.com>"? I see both in your set of commits?

        I'm finally putting everything together and hopefully merging soon since the mailing list discussion seems to favour our approach. 😄

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin Would you like your commits to be attributed to "Chen Qin <qinnchen@gmail.com>" or "Chen Qin <cq@uber.com>"? I see both in your set of commits? I'm finally putting everything together and hopefully merging soon since the mailing list discussion seems to favour our approach. 😄
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        @aljoscha
        Nice! Let's do "Chen Qin qinnchen@gmail.com"

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 @aljoscha Nice! Let's do "Chen Qin qinnchen@gmail.com"
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user rmetzger commented on the issue:

        https://github.com/apache/flink/pull/3438

        I looked over the changes and didn't find anything critical. The only thing that made me thinking was the boxed `Long` type for the timestamp(). I assume you decided for this approach to signal timestamp unavailability using `null`. The Java documentation does not recommend to rely on autoboxing for performance critical code: http://docs.oracle.com/javase/1.5.0/docs/guide/language/autoboxing.html

        Tests, Scala API were done. I assume that we don't need to explicitly mention support for the process function on non-keyed streams.

        Show
        githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3438 I looked over the changes and didn't find anything critical. The only thing that made me thinking was the boxed `Long` type for the timestamp(). I assume you decided for this approach to signal timestamp unavailability using `null`. The Java documentation does not recommend to rely on autoboxing for performance critical code: http://docs.oracle.com/javase/1.5.0/docs/guide/language/autoboxing.html Tests, Scala API were done. I assume that we don't need to explicitly mention support for the process function on non-keyed streams.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        @rmetzger Yes, it's unfortunate that in our model not all elements always have a timestamp. The other alternative is throwing an exception when trying to access a non-existing timestamp.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @rmetzger Yes, it's unfortunate that in our model not all elements always have a timestamp. The other alternative is throwing an exception when trying to access a non-existing timestamp.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user rmetzger commented on the issue:

        https://github.com/apache/flink/pull/3438

        In addition to throwing an exception, we should also expose `element.hasTimestamp()` to offer our users a clean way of checking for timestamps.
        Lets see what @uce or other reviewers think about this.

        Show
        githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3438 In addition to throwing an exception, we should also expose `element.hasTimestamp()` to offer our users a clean way of checking for timestamps. Lets see what @uce or other reviewers think about this.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        I think the discussion of timestamps and additional interfaces is orthogonal to this PR: `KeyedProcessOperator` is a renaming of the pre-existing `ProcessOperator` and the new `ProcessOperator` is a simplification that does away with timers. The interface for timestamps exists in the current code base, if we want to change that we should open other Jira issues.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 I think the discussion of timestamps and additional interfaces is orthogonal to this PR: `KeyedProcessOperator` is a renaming of the pre-existing `ProcessOperator` and the new `ProcessOperator` is a simplification that does away with timers. The interface for timestamps exists in the current code base, if we want to change that we should open other Jira issues.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wenlong88 commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104334699

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java —
        @@ -19,30 +19,35 @@
        package org.apache.flink.streaming.api.functions;

        import org.apache.flink.annotation.PublicEvolving;
        -import org.apache.flink.api.common.functions.Function;
        +import org.apache.flink.api.common.functions.AbstractRichFunction;
        import org.apache.flink.streaming.api.TimeDomain;
        import org.apache.flink.streaming.api.TimerService;
        import org.apache.flink.util.Collector;

        /**

        • A function that processes elements of a stream.
          *
        • * <p>The function will be called for every element in the input stream and can produce
        • * zero or more output. The function can also query the time and set timers. When
        • * reacting to the firing of set timers the function can emit yet more elements.
          + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}

          + * is invoked. This can produce zero or more elements as output. Implementations can also
          + * query the time and set timers through the provided

          {@link Context}. For firing timers
          + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
          + * zero or more elements as output and register further timers.
          *
          - * <p>The function will be called for every element in the input stream and can produce
          - * zero or more output elements. Contrary to the
          - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
          - * the time (both event and processing) and set timers, through the provided {@link Context}

          .

        • * When reacting to the firing of set timers the function can directly emit a result, and/or
        • * register a timer that will trigger an action in the future.
          + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
          + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
          + *
          + * <p><b>NOTE:</b> A {@code ProcessFunction}

          is always a
          + *

          {@link org.apache.flink.api.common.functions.RichFunction}

          . Therefore, access to the
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          as always available and setup and
          + * teardown methods can be implemented. See
          + *

          {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}

          + * and

          {@link org.apache.flink.api.common.functions.RichFunction#close()}

          .
          *

        • @param <I> Type of the input elements.
        • @param <O> Type of the output elements.
          */
          @PublicEvolving
          -public interface ProcessFunction<I, O> extends Function {
          +public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
            • End diff –

        hi, changing form `interface` to `class` is incompatible on the user side. Can't ProcessFunction just extend RichFunction?

        Show
        githubbot ASF GitHub Bot added a comment - Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104334699 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java — @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** A function that processes elements of a stream. * * <p>The function will be called for every element in the input stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * <p>The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context} . * When reacting to the firing of set timers the function can directly emit a result, and/or * register a timer that will trigger an action in the future. + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * <p><b>NOTE:</b> A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction} . Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()} . * @param <I> Type of the input elements. @param <O> Type of the output elements. */ @PublicEvolving -public interface ProcessFunction<I, O> extends Function { +public abstract class ProcessFunction<I, O> extends AbstractRichFunction { End diff – hi, changing form `interface` to `class` is incompatible on the user side. Can't ProcessFunction just extend RichFunction?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104377392

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java —
        @@ -19,30 +19,35 @@
        package org.apache.flink.streaming.api.functions;

        import org.apache.flink.annotation.PublicEvolving;
        -import org.apache.flink.api.common.functions.Function;
        +import org.apache.flink.api.common.functions.AbstractRichFunction;
        import org.apache.flink.streaming.api.TimeDomain;
        import org.apache.flink.streaming.api.TimerService;
        import org.apache.flink.util.Collector;

        /**

        • A function that processes elements of a stream.
          *
        • * <p>The function will be called for every element in the input stream and can produce
        • * zero or more output. The function can also query the time and set timers. When
        • * reacting to the firing of set timers the function can emit yet more elements.
          + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}

          + * is invoked. This can produce zero or more elements as output. Implementations can also
          + * query the time and set timers through the provided

          {@link Context}. For firing timers
          + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
          + * zero or more elements as output and register further timers.
          *
          - * <p>The function will be called for every element in the input stream and can produce
          - * zero or more output elements. Contrary to the
          - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
          - * the time (both event and processing) and set timers, through the provided {@link Context}

          .

        • * When reacting to the firing of set timers the function can directly emit a result, and/or
        • * register a timer that will trigger an action in the future.
          + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
          + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
          + *
          + * <p><b>NOTE:</b> A {@code ProcessFunction}

          is always a
          + *

          {@link org.apache.flink.api.common.functions.RichFunction}

          . Therefore, access to the
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          as always available and setup and

            • End diff –

        typo: as => is?

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104377392 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java — @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** A function that processes elements of a stream. * * <p>The function will be called for every element in the input stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * <p>The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context} . * When reacting to the firing of set timers the function can directly emit a result, and/or * register a timer that will trigger an action in the future. + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * <p><b>NOTE:</b> A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction} . Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and End diff – typo: as => is?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104378368

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java —
        @@ -19,30 +19,35 @@
        package org.apache.flink.streaming.api.functions;

        import org.apache.flink.annotation.PublicEvolving;
        -import org.apache.flink.api.common.functions.Function;
        +import org.apache.flink.api.common.functions.AbstractRichFunction;
        import org.apache.flink.streaming.api.TimeDomain;
        import org.apache.flink.streaming.api.TimerService;
        import org.apache.flink.util.Collector;

        /**

        • A function that processes elements of a stream.
          *
        • * <p>The function will be called for every element in the input stream and can produce
        • * zero or more output. The function can also query the time and set timers. When
        • * reacting to the firing of set timers the function can emit yet more elements.
          + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}

          + * is invoked. This can produce zero or more elements as output. Implementations can also
          + * query the time and set timers through the provided

          {@link Context}. For firing timers
          + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
          + * zero or more elements as output and register further timers.
          *
          - * <p>The function will be called for every element in the input stream and can produce
          - * zero or more output elements. Contrary to the
          - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
          - * the time (both event and processing) and set timers, through the provided {@link Context}

          .

        • * When reacting to the firing of set timers the function can directly emit a result, and/or
        • * register a timer that will trigger an action in the future.
          + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
          + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
          + *
          + * <p><b>NOTE:</b> A {@code ProcessFunction}

          is always a
          + *

          {@link org.apache.flink.api.common.functions.RichFunction}

          . Therefore, access to the
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          as always available and setup and
          + * teardown methods can be implemented. See
          + *

          {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}

          + * and

          {@link org.apache.flink.api.common.functions.RichFunction#close()}

          .
          *

        • @param <I> Type of the input elements.
        • @param <O> Type of the output elements.
          */
          @PublicEvolving
          -public interface ProcessFunction<I, O> extends Function {
          +public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
            • End diff –

        Missing serialVersionUID

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104378368 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java — @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** A function that processes elements of a stream. * * <p>The function will be called for every element in the input stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * <p>The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context} . * When reacting to the firing of set timers the function can directly emit a result, and/or * register a timer that will trigger an action in the future. + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * <p><b>NOTE:</b> A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction} . Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()} . * @param <I> Type of the input elements. @param <O> Type of the output elements. */ @PublicEvolving -public interface ProcessFunction<I, O> extends Function { +public abstract class ProcessFunction<I, O> extends AbstractRichFunction { End diff – Missing serialVersionUID
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104377167

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java —
        @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
        }

        /**
        + * Applies the given

        {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction}

        that is called for each element
        + * in the stream.
        + *
        + * @param <R> The type of elements emitted by the

        {@code ProcessFunction}.
        + *
        + * @return The transformed {@link DataStream}.
        + */
        + @PublicEvolving
        + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + }
        +
        + /**
        + * Applies the given {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction} that is called for each element
        + * in the stream.
        + * @param outputType {@link TypeInformation} for the result type of the function.
        + *
        + * @param <R> The type of elements emitted by the {@code ProcessFunction}

        .
        + *
        + * @return The transformed

        {@link DataStream}

        .
        + */
        + @Internal
        + public <R> SingleOutputStreamOperator<R> process(
        — End diff –

        Is this internal method only exposed as `public` for the Scala API? If yes, I'm wondering if it makes sense to call `transform` manually in the Scala `DataStream` API.

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104377167 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java — @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction}. + * + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * @param outputType {@link TypeInformation} for the result type of the function. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction} . + * + * @return The transformed {@link DataStream} . + */ + @Internal + public <R> SingleOutputStreamOperator<R> process( — End diff – Is this internal method only exposed as `public` for the Scala API? If yes, I'm wondering if it makes sense to call `transform` manually in the Scala `DataStream` API.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on the issue:

        https://github.com/apache/flink/pull/3438

        @rmetzger @aljoscha I would agree with Aljoscha that your point is independent of this PR. Is there an issue for 2.0 to track this?

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3438 @rmetzger @aljoscha I would agree with Aljoscha that your point is independent of this PR. Is there an issue for 2.0 to track this?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104383800

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java —
        @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
        }

        /**
        + * Applies the given

        {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction}

        that is called for each element
        + * in the stream.
        + *
        + * @param <R> The type of elements emitted by the

        {@code ProcessFunction}.
        + *
        + * @return The transformed {@link DataStream}.
        + */
        + @PublicEvolving
        + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + }
        +
        + /**
        + * Applies the given {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction} that is called for each element
        + * in the stream.
        + * @param outputType {@link TypeInformation} for the result type of the function.
        + *
        + * @param <R> The type of elements emitted by the {@code ProcessFunction}

        .
        + *
        + * @return The transformed

        {@link DataStream}

        .
        + */
        + @Internal
        + public <R> SingleOutputStreamOperator<R> process(
        — End diff –

        Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a `TypeInformation` because we get the `TypeInformation` from the context bound in the Scala API.

        Calling `transform()` manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in `(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method.

        What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104383800 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java — @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction}. + * + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * @param outputType {@link TypeInformation} for the result type of the function. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction} . + * + * @return The transformed {@link DataStream} . + */ + @Internal + public <R> SingleOutputStreamOperator<R> process( — End diff – Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a `TypeInformation` because we get the `TypeInformation` from the context bound in the Scala API. Calling `transform()` manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in `(Java)DataStream` while `(Scala)DataStream.flatMap()` calls that method. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104384206

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java —
        @@ -19,30 +19,35 @@
        package org.apache.flink.streaming.api.functions;

        import org.apache.flink.annotation.PublicEvolving;
        -import org.apache.flink.api.common.functions.Function;
        +import org.apache.flink.api.common.functions.AbstractRichFunction;
        import org.apache.flink.streaming.api.TimeDomain;
        import org.apache.flink.streaming.api.TimerService;
        import org.apache.flink.util.Collector;

        /**

        • A function that processes elements of a stream.
          *
        • * <p>The function will be called for every element in the input stream and can produce
        • * zero or more output. The function can also query the time and set timers. When
        • * reacting to the firing of set timers the function can emit yet more elements.
          + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}

          + * is invoked. This can produce zero or more elements as output. Implementations can also
          + * query the time and set timers through the provided

          {@link Context}. For firing timers
          + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
          + * zero or more elements as output and register further timers.
          *
          - * <p>The function will be called for every element in the input stream and can produce
          - * zero or more output elements. Contrary to the
          - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
          - * the time (both event and processing) and set timers, through the provided {@link Context}

          .

        • * When reacting to the firing of set timers the function can directly emit a result, and/or
        • * register a timer that will trigger an action in the future.
          + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
          + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
          + *
          + * <p><b>NOTE:</b> A {@code ProcessFunction}

          is always a
          + *

          {@link org.apache.flink.api.common.functions.RichFunction}

          . Therefore, access to the
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          as always available and setup and

            • End diff –

        fixing

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104384206 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java — @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** A function that processes elements of a stream. * * <p>The function will be called for every element in the input stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * <p>The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context} . * When reacting to the firing of set timers the function can directly emit a result, and/or * register a timer that will trigger an action in the future. + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * <p><b>NOTE:</b> A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction} . Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and End diff – fixing
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        @uce There is not issue for 2.0 to track this because I don't think there is consensus about always having timestamps.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @uce There is not issue for 2.0 to track this because I don't think there is consensus about always having timestamps.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        @uce There is some documentation that says that `ProcessFunction` is only available on keyed streams. I'll change that.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 @uce There is some documentation that says that `ProcessFunction` is only available on keyed streams. I'll change that.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104399614

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java —
        @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() {
        }

        /**
        + * Applies the given

        {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction}

        that is called for each element
        + * in the stream.
        + *
        + * @param <R> The type of elements emitted by the

        {@code ProcessFunction}.
        + *
        + * @return The transformed {@link DataStream}.
        + */
        + @PublicEvolving
        + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + }
        +
        + /**
        + * Applies the given {@link ProcessFunction} on the input stream, thereby
        + * creating a transformed output stream.
        + *
        + * <p>The function will be called for every element in the input streams and can produce zero
        + * or more output elements.
        + *
        + * @param processFunction The {@link ProcessFunction} that is called for each element
        + * in the stream.
        + * @param outputType {@link TypeInformation} for the result type of the function.
        + *
        + * @param <R> The type of elements emitted by the {@code ProcessFunction}

        .
        + *
        + * @return The transformed

        {@link DataStream}

        .
        + */
        + @Internal
        + public <R> SingleOutputStreamOperator<R> process(
        — End diff –

        Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. 😄 So +1 to keep it as is. 👍 I was just wondering whether users would be confused by this.

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104399614 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java — @@ -556,6 +558,60 @@ public ExecutionConfig getExecutionConfig() { } /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction}. + * + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) { + + TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + false, + true, + getType(), + Utils.getCallLocationName(), + true); + + return process(processFunction, outType); + } + + /** + * Applies the given {@link ProcessFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the input streams and can produce zero + * or more output elements. + * + * @param processFunction The {@link ProcessFunction} that is called for each element + * in the stream. + * @param outputType {@link TypeInformation} for the result type of the function. + * + * @param <R> The type of elements emitted by the {@code ProcessFunction} . + * + * @return The transformed {@link DataStream} . + */ + @Internal + public <R> SingleOutputStreamOperator<R> process( — End diff – Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. 😄 So +1 to keep it as is. 👍 I was just wondering whether users would be confused by this.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3438#discussion_r104446345

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java —
        @@ -19,30 +19,35 @@
        package org.apache.flink.streaming.api.functions;

        import org.apache.flink.annotation.PublicEvolving;
        -import org.apache.flink.api.common.functions.Function;
        +import org.apache.flink.api.common.functions.AbstractRichFunction;
        import org.apache.flink.streaming.api.TimeDomain;
        import org.apache.flink.streaming.api.TimerService;
        import org.apache.flink.util.Collector;

        /**

        • A function that processes elements of a stream.
          *
        • * <p>The function will be called for every element in the input stream and can produce
        • * zero or more output. The function can also query the time and set timers. When
        • * reacting to the firing of set timers the function can emit yet more elements.
          + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}

          + * is invoked. This can produce zero or more elements as output. Implementations can also
          + * query the time and set timers through the provided

          {@link Context}. For firing timers
          + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
          + * zero or more elements as output and register further timers.
          *
          - * <p>The function will be called for every element in the input stream and can produce
          - * zero or more output elements. Contrary to the
          - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
          - * the time (both event and processing) and set timers, through the provided {@link Context}

          .

        • * When reacting to the firing of set timers the function can directly emit a result, and/or
        • * register a timer that will trigger an action in the future.
          + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
          + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
          + *
          + * <p><b>NOTE:</b> A {@code ProcessFunction}

          is always a
          + *

          {@link org.apache.flink.api.common.functions.RichFunction}

          . Therefore, access to the
          + *

          {@link org.apache.flink.api.common.functions.RuntimeContext}

          as always available and setup and
          + * teardown methods can be implemented. See
          + *

          {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}

          + * and

          {@link org.apache.flink.api.common.functions.RichFunction#close()}

          .
          *

        • @param <I> Type of the input elements.
        • @param <O> Type of the output elements.
          */
          @PublicEvolving
          -public interface ProcessFunction<I, O> extends Function {
          +public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
            • End diff –

        hi @wenlong88 in the ML discussion (https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E) we decided to make `ProcessFunction` available on non-keyed streams as well to allow using side outputs there. This requires making the `onTimer()` method abstract, otherwise every user would always have to implement it. We marked `ProcessFunction` as `@PublicEvolcing` just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3438#discussion_r104446345 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java — @@ -19,30 +19,35 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; /** A function that processes elements of a stream. * * <p>The function will be called for every element in the input stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. + * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. * - * <p>The function will be called for every element in the input stream and can produce - * zero or more output elements. Contrary to the - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query - * the time (both event and processing) and set timers, through the provided {@link Context} . * When reacting to the firing of set timers the function can directly emit a result, and/or * register a timer that will trigger an action in the future. + * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}. + * + * <p><b>NOTE:</b> A {@code ProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction} . Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()} . * @param <I> Type of the input elements. @param <O> Type of the output elements. */ @PublicEvolving -public interface ProcessFunction<I, O> extends Function { +public abstract class ProcessFunction<I, O> extends AbstractRichFunction { End diff – hi @wenlong88 in the ML discussion ( https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E ) we decided to make `ProcessFunction` available on non-keyed streams as well to allow using side outputs there. This requires making the `onTimer()` method abstract, otherwise every user would always have to implement it. We marked `ProcessFunction` as `@PublicEvolcing` just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3438

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3438
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3438

        Merged

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3438 Merged
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user wenlong88 commented on the issue:

        https://github.com/apache/flink/pull/3438

        thanks for explaination, I have such concern because we have just suggested our users to use processFunction to implement their jobs, they need to change their code too when we sync the cimmit.after all, it is really nice to have timer in more scenarios.

        Show
        githubbot ASF GitHub Bot added a comment - Github user wenlong88 commented on the issue: https://github.com/apache/flink/pull/3438 thanks for explaination, I have such concern because we have just suggested our users to use processFunction to implement their jobs, they need to change their code too when we sync the cimmit.after all, it is really nice to have timer in more scenarios.
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user aljoscha opened a pull request:

        https://github.com/apache/flink/pull/3484

        FLINK-4460 Side Outputs in Flink

        This is a refinement of #2982 by @chenqin.

        I changed the API a bit, added support for side outputs to `ProcessFunction`, enabled side outputs to work with chaining, added proper Scala API and a Scala API test and added documentation.

        R: @uce @kl0u and @chenqin for review, please

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3484.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3484


        commit 1746c7e5981942dfb0e57954f8241a400b699120
        Author: Chen Qin <qinnchen@gmail.com>
        Date: 2016-10-21T19:38:04Z

        FLINK-4460 Add support for side outputs

        This does not yet allow users to emit to side outputs in user functions.
        Only operators (StreamOperator) can emit to side outputs. A side output
        can be retrieved on a SingleOutputStreamOperator.

        commit 362fcb38abf525e704e97505186923ae77dc167b
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2016-10-21T19:38:04Z

        FLINK-4460 Add side outputs for ProcessFunction

        commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-02-16T13:09:26Z

        FLINK-4460 Make chaining work with side outputs

        commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-02-16T13:41:10Z

        FLINK-4460 Expose OutputTag constructor that takes TypeInformation

        commit 935ff90dadad380db293265cc49d072d764cf510
        Author: Chen Qin <qinnchen@gmail.com>
        Date: 2017-03-01T14:36:17Z

        FLINK-4460 Provide late-data output for window operations

        We use side outputs to emit dropped late data.

        commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-07T10:06:31Z

        FLINK-4460 Add proper side output API for Scala API

        The Scala side output API uses context bounds to get a TypeInformation
        for an OutputTag. This also adds a SideOutputITCase for the Scala API.

        commit 66182b79931016a04286279f5aba63af30442b02
        Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
        Date: 2017-03-07T11:06:01Z

        FLINK-4460 Add documentation for side outputs


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3484 FLINK-4460 Side Outputs in Flink This is a refinement of #2982 by @chenqin. I changed the API a bit, added support for side outputs to `ProcessFunction`, enabled side outputs to work with chaining, added proper Scala API and a Scala API test and added documentation. R: @uce @kl0u and @chenqin for review, please You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink finish-pr-2982-side-outputs-cp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3484 commit 1746c7e5981942dfb0e57954f8241a400b699120 Author: Chen Qin <qinnchen@gmail.com> Date: 2016-10-21T19:38:04Z FLINK-4460 Add support for side outputs This does not yet allow users to emit to side outputs in user functions. Only operators (StreamOperator) can emit to side outputs. A side output can be retrieved on a SingleOutputStreamOperator. commit 362fcb38abf525e704e97505186923ae77dc167b Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-10-21T19:38:04Z FLINK-4460 Add side outputs for ProcessFunction commit 3ff4f8284691e9cbfba6f4c23b4e7fe1c584df50 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-02-16T13:09:26Z FLINK-4460 Make chaining work with side outputs commit 5a48cdfb67a1008bf4c7cba8ff76b2982db55a40 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-02-16T13:41:10Z FLINK-4460 Expose OutputTag constructor that takes TypeInformation commit 935ff90dadad380db293265cc49d072d764cf510 Author: Chen Qin <qinnchen@gmail.com> Date: 2017-03-01T14:36:17Z FLINK-4460 Provide late-data output for window operations We use side outputs to emit dropped late data. commit 3ae8a673c6c29fe2e110f5745ddf72deae71aafd Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-07T10:06:31Z FLINK-4460 Add proper side output API for Scala API The Scala side output API uses context bounds to get a TypeInformation for an OutputTag. This also adds a SideOutputITCase for the Scala API. commit 66182b79931016a04286279f5aba63af30442b02 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-07T11:06:01Z FLINK-4460 Add documentation for side outputs
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104847832

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java —
        @@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
        && (edge.getPartitioner() instanceof ForwardPartitioner)
        && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
        + && edge.getOutputTag() == null // disable chaining for side outputs
        — End diff –

        I remember you mentioned latest version side output works with chain or no?

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847832 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java — @@ -439,6 +450,7 @@ public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() + && edge.getOutputTag() == null // disable chaining for side outputs — End diff – I remember you mentioned latest version side output works with chain or no?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104847733

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java —
        @@ -60,6 +60,7 @@
        import java.util.List;
        import java.util.Map;
        import java.util.Map.Entry;
        +import com.google.common.collect.Iterables;
        — End diff –

        Do you think introduce this dependency is good idea or bad idea? Up to you

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847733 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java — @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; — End diff – Do you think introduce this dependency is good idea or bad idea? Up to you
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104847407

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
        downStreamVertexID,
        typeNumber,
        null,

        • new ArrayList<String>());
          + new ArrayList<String>(), null);

        }

        private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,

        • List<String> outputNames) {
          -
          + List<String> outputNames,
          + OutputTag outputTag) {
        • if (virtualSelectNodes.containsKey(upStreamVertexID)) {
          + if (virtualOutputNodes.containsKey(upStreamVertexID)) {
          + int virtualId = upStreamVertexID;
          + upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
          + if (outputTag == null) {
          + // selections that happen downstream override earlier selections
            • End diff –

        may consider call out this behavior in `getSideOutput` comments

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847407 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, new ArrayList<String>()); + new ArrayList<String>(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) { - + List<String> outputNames, + OutputTag outputTag) { if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections End diff – may consider call out this behavior in `getSideOutput` comments
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104846393

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
        @@ -416,4 +418,35 @@ private boolean canBeParallel()

        { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

        +
        + /**
        + * Gets the

        {@link DataStream}

        that contains the elements that are emitted from an operation
        + * into the side output with the given

        {@link OutputTag}

        .
        + *
        + * <p>Example:
        + * <pre>{@code
        + * static final OutputTag<X> sideOutputTag = new OutputTag<X>("side-output") {};
        + *
        + * public void flatMap(X value, Collector<String> out) throws Exception {
        — End diff –

        Comments seems out of date, I think we already decided to get ride of CollectorWrapper

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104846393 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag} . + * + * <p>Example: + * <pre>{@code + * static final OutputTag<X> sideOutputTag = new OutputTag<X>("side-output") {}; + * + * public void flatMap(X value, Collector<String> out) throws Exception { — End diff – Comments seems out of date, I think we already decided to get ride of CollectorWrapper
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104847005

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -85,6 +86,7 @@
        private Set<Integer> sources;
        private Set<Integer> sinks;
        private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
        + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
        — End diff –

        We might consider use `addVirtualSideOutputNode` and `virtualSideOutputNodes`. Unless we want to refactor move away from current assumption `<IN>operator<OUT>` to `<<tag1,IN1>...<tagX,INX> operator <<taga,OUTa>...<tagx,OUTX>`

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104847005 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -85,6 +86,7 @@ private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes; + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes; — End diff – We might consider use `addVirtualSideOutputNode` and `virtualSideOutputNodes`. Unless we want to refactor move away from current assumption `<IN>operator<OUT>` to `<<tag1,IN1>...<tagX,INX> operator <<taga,OUTa>...<tagx,OUTX>`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104880412

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
        @@ -416,4 +418,35 @@ private boolean canBeParallel()

        { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

        +
        + /**
        + * Gets the

        {@link DataStream}

        that contains the elements that are emitted from an operation
        + * into the side output with the given

        {@link OutputTag}

        .
        + *
        + * <p>Example:
        + * <pre>{@code
        + * static final OutputTag<X> sideOutputTag = new OutputTag<X>("side-output") {};
        + *
        + * public void flatMap(X value, Collector<String> out) throws Exception {
        — End diff –

        Fixing

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880412 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -416,4 +418,35 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag} . + * + * <p>Example: + * <pre>{@code + * static final OutputTag<X> sideOutputTag = new OutputTag<X>("side-output") {}; + * + * public void flatMap(X value, Collector<String> out) throws Exception { — End diff – Fixing
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104880615

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -85,6 +86,7 @@
        private Set<Integer> sources;
        private Set<Integer> sinks;
        private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
        + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
        — End diff –

        The method is already called `addVirtualSideOutputNode()`. I'm adjusting the name of the field. Thanks!

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104880615 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -85,6 +86,7 @@ private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes; + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes; — End diff – The method is already called `addVirtualSideOutputNode()`. I'm adjusting the name of the field. Thanks!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104881269

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java —
        @@ -60,6 +60,7 @@
        import java.util.List;
        import java.util.Map;
        import java.util.Map.Entry;
        +import com.google.common.collect.Iterables;
        — End diff –

        You're right, I'm changing this to simply have two loops.

        I think you introduced this in the first place, though. 😉

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104881269 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java — @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; — End diff – You're right, I'm changing this to simply have two loops. I think you introduced this in the first place, though. 😉
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104895136

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
        downStreamVertexID,
        typeNumber,
        null,

        • new ArrayList<String>());
          + new ArrayList<String>(), null);

        }

        private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,

        • List<String> outputNames) {
          -
          + List<String> outputNames,
          + OutputTag outputTag) {
        • if (virtualSelectNodes.containsKey(upStreamVertexID)) {
          + if (virtualOutputNodes.containsKey(upStreamVertexID)) {
          + int virtualId = upStreamVertexID;
          + upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
          + if (outputTag == null) {
          + // selections that happen downstream override earlier selections
            • End diff –

        I think the comment is a leftover from copying this code from split/select. For side outputs it can't happen that you have multiple "selects" after one another. Will remove the comment. What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104895136 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, new ArrayList<String>()); + new ArrayList<String>(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) { - + List<String> outputNames, + OutputTag outputTag) { if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections End diff – I think the comment is a leftover from copying this code from split/select. For side outputs it can't happen that you have multiple "selects" after one another. Will remove the comment. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104995971

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
        downStreamVertexID,
        typeNumber,
        null,

        • new ArrayList<String>());
          + new ArrayList<String>(), null);

        }

        private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,

        • List<String> outputNames) {
          -
          + List<String> outputNames,
          + OutputTag outputTag) {
        • if (virtualSelectNodes.containsKey(upStreamVertexID)) {
          + if (virtualOutputNodes.containsKey(upStreamVertexID)) {
          + int virtualId = upStreamVertexID;
          + upStreamVertexID = virtualOutputNodes.get(virtualId).f0;
          + if (outputTag == null) {
          + // selections that happen downstream override earlier selections
            • End diff –

        sounds good to me!

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104995971 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -333,32 +356,41 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, new ArrayList<String>()); + new ArrayList<String>(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) { - + List<String> outputNames, + OutputTag outputTag) { if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualOutputNodes.containsKey(upStreamVertexID)) { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualOutputNodes.get(virtualId).f0; + if (outputTag == null) { + // selections that happen downstream override earlier selections End diff – sounds good to me!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104996349

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -85,6 +86,7 @@
        private Set<Integer> sources;
        private Set<Integer> sinks;
        private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
        + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes;
        — End diff –

        sounds good

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104996349 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -85,6 +86,7 @@ private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes; + private Map<Integer, Tuple2<Integer, OutputTag>> virtualOutputNodes; — End diff – sounds good
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r104997566

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java —
        @@ -60,6 +60,7 @@
        import java.util.List;
        import java.util.Map;
        import java.util.Map.Entry;
        +import com.google.common.collect.Iterables;
        — End diff –

        That sounds right, good catch!
        Thanks for fixing!

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r104997566 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java — @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import com.google.common.collect.Iterables; — End diff – That sounds right, good catch! Thanks for fixing!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user uce commented on the issue:

        https://github.com/apache/flink/pull/3484

        Unfortunately, I won't have time to look over this PR this week. Thanks for pinging me @aljoscha @chenqin.

        Show
        githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3484 Unfortunately, I won't have time to look over this PR this week. Thanks for pinging me @aljoscha @chenqin.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105225561

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -419,6 +435,14 @@ public void merge(W mergeResult,
        registerCleanupTimer(window);
        }
        }
        +
        + // side output input event if
        + // element not handled by any window
        + // late arriving tag has been set
        + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) {
        — End diff –

        @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105225561 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { — End diff – @chenqin and @aljoscha I am starting to review the PR and I was wondering when is this new `isLate()` check needed? At least for the out-of-box window assigners, this seems to be a redundant check.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105235710

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -419,6 +435,14 @@ public void merge(W mergeResult,
        registerCleanupTimer(window);
        }
        }
        +
        + // side output input event if
        + // element not handled by any window
        + // late arriving tag has been set
        + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) {
        — End diff –

        Thanks @kl0u Good catch!

        I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events.

        @aljoscha If that is redundant check, we might just remove `isLate`.
        What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105235710 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { — End diff – Thanks @kl0u Good catch! I put `isLate` there with intention to filter out `dropped events with other reasons` which I may not aware of. lateArrivingEvents is really `late arriving` and `dropped` events. @aljoscha If that is redundant check, we might just remove `isLate`. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105368360

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -419,6 +435,14 @@ public void merge(W mergeResult,
        registerCleanupTimer(window);
        }
        }
        +
        + // side output input event if
        + // element not handled by any window
        + // late arriving tag has been set
        + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) {
        — End diff –

        I thought about this again. I think it doesn't hurt to have it because it catches the case when a `WindowAssigner` doesn't assign any windows. In that case an element is also "skipped" but it is not necessarily considered late. What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105368360 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { — End diff – I thought about this again. I think it doesn't hurt to have it because it catches the case when a `WindowAssigner` doesn't assign any windows. In that case an element is also "skipped" but it is not necessarily considered late. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105372676

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -419,6 +435,14 @@ public void merge(W mergeResult,
        registerCleanupTimer(window);
        }
        }
        +
        + // side output input event if
        + // element not handled by any window
        + // late arriving tag has been set
        + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) {
        — End diff –

        I just added a test for the behaviour with a "weird" `WindowAssigner`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105372676 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -419,6 +435,14 @@ public void merge(W mergeResult, registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if(isSkippedElement && lateDataOutputTag != null && isLate(element)) { — End diff – I just added a test for the behaviour with a "weird" `WindowAssigner`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105397096

        — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java —
        @@ -46,7 +46,15 @@
        public TypeHint()

        { this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); }
        • +
          + /**
          + * Creates a hint for the generic type in the class signature.
          + */
          + public TypeHint(Class<?> baseClass, Object instance, int genericParameterPos)

          { + this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos); + }

          +
          +

            • End diff –

        Remove one of the 2 empty lines.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397096 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java — @@ -46,7 +46,15 @@ public TypeHint() { this.typeInfo = TypeExtractor.createTypeInfo(this, TypeHint.class, getClass(), 0); } + + /** + * Creates a hint for the generic type in the class signature. + */ + public TypeHint(Class<?> baseClass, Object instance, int genericParameterPos) { + this.typeInfo = TypeExtractor.createTypeInfo(instance, baseClass, instance.getClass(), genericParameterPos); + } + + End diff – Remove one of the 2 empty lines.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105397424

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        — End diff –

        No need for line breaking:
        `TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};`

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397424 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag} . + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { — End diff – No need for line breaking: `TypeHint<T> typeHint = new TypeHint<T>(OutputTag.class, this, 0) {};`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105399939

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java —
        @@ -72,6 +76,11 @@ public RecordWriterOutput(

        — End diff –

        The bodies of the following two `collect(...)` methods are identical modulo the check for the output tag.
        It makes sense to put the common code in the same `private` method and call that method from each one of these (after doing the necessary checks).

        As a side comment, should we just `return` if the wrong method is called or throw an exception ?

        This "Duplicate code" pattern appears some times in the code.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105399939 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java — @@ -72,6 +76,11 @@ public RecordWriterOutput( — End diff – The bodies of the following two `collect(...)` methods are identical modulo the check for the output tag. It makes sense to put the common code in the same `private` method and call that method from each one of these (after doing the necessary checks). As a side comment, should we just `return` if the wrong method is called or throw an exception ? This "Duplicate code" pattern appears some times in the code.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105397306

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        — End diff –

        We do not need both lines with the checks. We can just have:

        `this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");`

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397306 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag} . + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); — End diff – We do not need both lines with the checks. We can just have: `this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");`
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105401808

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -441,26 +491,55 @@ public void close() {
        private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {

        private final TypeSerializer<T> serializer;

        • +
          public CopyingChainingOutput(
          OneInputStreamOperator<T, ?> operator,
          TypeSerializer<T> serializer,
          + OutputTag<T> outputTag,
          StreamStatusProvider streamStatusProvider)

          { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; }

        — End diff –

        Again the two `collect()` have duplicate code (after the casting).

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401808 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -441,26 +491,55 @@ public void close() { private static final class CopyingChainingOutput<T> extends ChainingOutput<T> { private final TypeSerializer<T> serializer; + public CopyingChainingOutput( OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer, + OutputTag<T> outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } — End diff – Again the two `collect()` have duplicate code (after the casting).
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105398836

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java —
        @@ -300,6 +303,36 @@ private StreamGraph generateInternal(List<StreamTransformation<?>> transformatio
        }

        /**
        + * Transforms a

        {@code SideOutputTransformation}

        .
        + *
        + * <p>
        + * For this we create a virtual node in the

        {@code StreamGraph}

        that holds the side-output
        + *

        {@link org.apache.flink.util.OutputTag}

        .
        + *
        + * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
        + */
        + private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) {
        + StreamTransformation<T> input = sideOutput.getInput();
        + Collection<Integer> resultIds = transform(input);
        +
        +
        + // the recursive transform might have already transformed this
        + if (alreadyTransformed.containsKey(sideOutput))

        { + return alreadyTransformed.get(sideOutput); + }

        +
        + List<Integer> virtualResultIds = new ArrayList<>();
        +
        + for (int inputId : resultIds)

        { + int virtualId = StreamTransformation.getNewNodeId(); + streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag()); + virtualResultIds.add(virtualId); + }

        + return virtualResultIds;
        + }
        +
        +
        — End diff –

        Leave only one empty line.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398836 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java — @@ -300,6 +303,36 @@ private StreamGraph generateInternal(List<StreamTransformation<?>> transformatio } /** + * Transforms a {@code SideOutputTransformation} . + * + * <p> + * For this we create a virtual node in the {@code StreamGraph} that holds the side-output + * {@link org.apache.flink.util.OutputTag} . + * + * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator + */ + private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) { + StreamTransformation<T> input = sideOutput.getInput(); + Collection<Integer> resultIds = transform(input); + + + // the recursive transform might have already transformed this + if (alreadyTransformed.containsKey(sideOutput)) { + return alreadyTransformed.get(sideOutput); + } + + List<Integer> virtualResultIds = new ArrayList<>(); + + for (int inputId : resultIds) { + int virtualId = StreamTransformation.getNewNodeId(); + streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag()); + virtualResultIds.add(virtualId); + } + return virtualResultIds; + } + + — End diff – Leave only one empty line.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402746

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -1528,14 +1572,16 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce
        stateDesc,
        new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
        PurgingTrigger.of(EventTimeTrigger.create()),

        • LATENESS);
          + LATENESS,
          + lateOutputTag);
            • End diff –

        wrong alignment

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402746 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -1528,14 +1572,16 @@ public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); + LATENESS, + lateOutputTag); End diff – wrong alignment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105399457

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java —
        @@ -0,0 +1,72 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.streaming.api.transformations;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import com.google.common.collect.Lists;
        +import org.apache.flink.util.OutputTag;
        +import org.apache.flink.streaming.api.operators.ChainingStrategy;
        +
        +import java.util.Collection;
        +import java.util.List;
        +
        +
        +/**
        + * This transformation represents a selection of a side output of an upstream operation with a
        + * given

        {@link OutputTag}

        .
        + *
        + * <p>This does not create a physical operation, it only affects how upstream operations are
        + * connected to downstream operations.
        + *
        + * @param <T> The type of the elements that result from this

        {@code SideOutputTransformation}

        + */
        +public class SideOutputTransformation<T> extends StreamTransformation<T> {
        + private final StreamTransformation<T> input;
        — End diff –

        Leave a blank line here.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105399457 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java — @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag} . + * + * <p>This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param <T> The type of the elements that result from this {@code SideOutputTransformation} + */ +public class SideOutputTransformation<T> extends StreamTransformation<T> { + private final StreamTransformation<T> input; — End diff – Leave a blank line here.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105401503

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -387,14 +403,25 @@ public int getChainLength() {

        protected final StreamStatusProvider streamStatusProvider;

        • public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
          + protected final OutputTag<T> outputTag;
          +
          + public ChainingOutput(
          + OneInputStreamOperator<T, ?> operator,
          + StreamStatusProvider streamStatusProvider,
          + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; }

        @Override
        — End diff –

        Again the two `collect()` methods have much identical code. We can put that common code in a separate private method and calls this instead of repeating the code.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401503 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag<T> outputTag; + + public ChainingOutput( + OneInputStreamOperator<T, ?> operator, + StreamStatusProvider streamStatusProvider, + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override — End diff – Again the two `collect()` methods have much identical code. We can put that common code in a separate private method and calls this instead of repeating the code.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402616

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -1267,14 +1291,16 @@ public void testLateness() throws Exception {
        stateDesc,
        new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
        PurgingTrigger.of(EventTimeTrigger.create()),

        • LATENESS);
          + LATENESS,
          + lateOutputTag);
            • End diff –

        wrong alignment

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402616 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -1267,14 +1291,16 @@ public void testLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); + LATENESS, + lateOutputTag); End diff – wrong alignment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105403355

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java —
        @@ -40,6 +41,12 @@ public void collect(StreamRecord<T> record) {
        }

        @Override
        + public <X> void collect(
        — End diff –

        The signature fits in one line

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403355 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java — @@ -40,6 +41,12 @@ public void collect(StreamRecord<T> record) { } @Override + public <X> void collect( — End diff – The signature fits in one line
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402830

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -1618,14 +1664,16 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception {
        stateDesc,
        new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
        EventTimeTrigger.create(),

        • LATENESS);
          + LATENESS,
            • End diff –

        wrong alignment

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402830 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -1618,14 +1664,16 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), EventTimeTrigger.create(), LATENESS); + LATENESS, End diff – wrong alignment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105398702

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
        downStreamVertexID,
        typeNumber,
        null,

        • new ArrayList<String>());
          + new ArrayList<String>(), null);

        }

        private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,

        • List<String> outputNames) {
          -
          + List<String> outputNames,
          + OutputTag outputTag) {
        • if (virtualSelectNodes.containsKey(upStreamVertexID)) {
          + if (virtualSideOutputNodes.containsKey(upStreamVertexID))
          Unknown macro: { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; + if (outputTag == null) { + outputTag = virtualSideOutputNodes.get(virtualId).f1; + } + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag); + }

          else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
          int virtualId = upStreamVertexID;
          upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
          if (outputNames.isEmpty())

          { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; }
        • addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
          + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
          } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
          int virtualId = upStreamVertexID;
          upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
          if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; }
        • addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
          +
            • End diff –

        Remove line for uniformity with the `if() ...` above.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398702 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -333,32 +356,40 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, new ArrayList<String>()); + new ArrayList<String>(), null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) { - + List<String> outputNames, + OutputTag outputTag) { if (virtualSelectNodes.containsKey(upStreamVertexID)) { + if (virtualSideOutputNodes.containsKey(upStreamVertexID)) Unknown macro: { + int virtualId = upStreamVertexID; + upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0; + if (outputTag == null) { + outputTag = virtualSideOutputNodes.get(virtualId).f1; + } + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag); + } else if (virtualSelectNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames); + End diff – Remove line for uniformity with the `if() ...` above.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402678

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws Exception {
        stateDesc,
        new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
        EventTimeTrigger.create(),

        • LATENESS);
          + LATENESS,
          + lateOutputTag);
            • End diff –

        wrong alignment

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402678 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -1393,14 +1428,16 @@ public void testDropDueToLatenessTumbling() throws Exception { stateDesc, new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), EventTimeTrigger.create(), LATENESS); + LATENESS, + lateOutputTag); End diff – wrong alignment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402262

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record)

        { // don't copy for the last output outputs[outputs.length - 1].collect(record); }

        +
        + @Override
        + public <X> void collect(
        + OutputTag<?> outputTag, StreamRecord<X> record) {
        + for (int i = 0; i < outputs.length - 1; i++) {
        + Output<StreamRecord<T>> output = outputs[i];
        +
        + // due to side outputs, StreamRecords of varying types can pass through the broadcasting
        + // collector so we need to cast
        + @SuppressWarnings(

        {"unchecked", "rawtypes"}

        )
        + StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue());
        + output.collect(shallowCopy);
        + }
        +
        — End diff –

        Duplicate comment with the one inside the loop. Also the "// don't copy for the last output" is not right because we actually create a copy before.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402262 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + + @Override + public <X> void collect( + OutputTag<?> outputTag, StreamRecord<X> record) { + for (int i = 0; i < outputs.length - 1; i++) { + Output<StreamRecord<T>> output = outputs [i] ; + + // due to side outputs, StreamRecords of varying types can pass through the broadcasting + // collector so we need to cast + @SuppressWarnings( {"unchecked", "rawtypes"} ) + StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue()); + output.collect(shallowCopy); + } + — End diff – Duplicate comment with the one inside the loop. Also the "// don't copy for the last output" is not right because we actually create a copy before.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105402899

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
        @@ -1702,15 +1754,16 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce
        stateDesc,
        new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
        PurgingTrigger.of(EventTimeTrigger.create()),

        • LATENESS);
          + LATENESS,
            • End diff –

        wrong alignment

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105402899 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -1702,15 +1754,16 @@ public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exce stateDesc, new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), PurgingTrigger.of(EventTimeTrigger.create()), LATENESS); + LATENESS, End diff – wrong alignment
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105397483

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}.
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        + TypeHint<T> typeHint =
        + new TypeHint<T>(OutputTag.class, this, 0) {};
        + this.typeInfo = typeHint.getTypeInfo();
        + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + }
        + }
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id and output

        {@link TypeInformation}

        .
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + * @param typeInfo The

        {@code TypeInformation}

        for the side output.
        + */
        + public OutputTag(String id, TypeInformation<T> typeInfo) {
        + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.typeInfo =
        — End diff –

        No need for line breaking.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397483 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation} . + * + * @param id The id of the created {@code OutputTag} . + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = — End diff – No need for line breaking.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105400890

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -326,33 +327,48 @@ public int getChainLength() {
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,

        • List<StreamOperator<?>> allOperators)
          + List<StreamOperator<?>> allOperators,
          + OutputTag<IN> outputTag)
          {
          // create the output that the operator writes to first. this may recursively create more operators
          Output<StreamRecord<OUT>> output = createOutputCollector(
          containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);

        // now create the operator and give it the output collector to write its output to
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
        +
        chainedOperator.setup(containingTask, operatorConfig, output);

        allOperators.add(chainedOperator);

        if (containingTask.getExecutionConfig().isObjectReuseEnabled())

        { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); }

        else

        { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); }

        }

        private <T> RecordWriterOutput<T> createStreamOutput(
        StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
        Environment taskEnvironment,

        • String taskName)
        • {
        • TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
          + String taskName) {
          + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
          +
          + TypeSerializer outSerializer = null;
          +
            • End diff –

        too much line breaking in the following `if() else()` . We could easily reduce it or even replace it with:
        ```
        TypeSerializer outSerializer = (edge.getOutputTag() != null) ?
        upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader()) :
        upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
        ```

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105400890 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -326,33 +327,48 @@ public int getChainLength() { Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) + List<StreamOperator<?>> allOperators, + OutputTag<IN> outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output<StreamRecord<OUT>> output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private <T> RecordWriterOutput<T> createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, String taskName) { TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + End diff – too much line breaking in the following `if() else()` . We could easily reduce it or even replace it with: ``` TypeSerializer outSerializer = (edge.getOutputTag() != null) ? upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); ```
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105398138

        — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java —
        @@ -17,13 +17,19 @@

        package org.apache.flink.streaming.examples.windowing;

        — End diff –

        Unused imports.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398138 — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java — @@ -17,13 +17,19 @@ package org.apache.flink.streaming.examples.windowing; — End diff – Unused imports.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105403240

        — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java —
        @@ -53,5 +54,11 @@ public void collect(StreamRecord<T> record) {
        }

        @Override
        — End diff –

        The signature can go on the same line

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105403240 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java — @@ -53,5 +54,11 @@ public void collect(StreamRecord<T> record) { } @Override — End diff – The signature can go on the same line
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105401947

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record)

        { // don't copy for the last output outputs[outputs.length - 1].collect(record); }

        +
        — End diff –

        The arguments can go in the same line as the method name.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105401947 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + — End diff – The arguments can go in the same line as the method name.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105397933

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}.
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        + TypeHint<T> typeHint =
        + new TypeHint<T>(OutputTag.class, this, 0) {};
        + this.typeInfo = typeHint.getTypeInfo();
        + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + }
        + }
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id and output

        {@link TypeInformation}

        .
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + * @param typeInfo The

        {@code TypeInformation}

        for the side output.
        + */
        + public OutputTag(String id, TypeInformation<T> typeInfo)

        { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + }

        +
        + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException

        { + in.defaultReadObject(); + typeInfo = null; + }

        +
        + public String getId()

        { + return id; + }

        +
        + public TypeInformation<T> getTypeInfo()

        { + return typeInfo; + }

        +
        + @Override
        + public boolean equals(Object obj) {
        + return obj instanceof OutputTag
        — End diff –

        Two points:
        1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` because we check at the constructor, so this method can be simplified.
        2) we never check for uniqueness of the `outputTag.id`. We should do it at the translation. This is also a correctness issue as this may result in undesired sideoutput "collisions.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105397933 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation} . + * + * @param id The id of the created {@code OutputTag} . + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag — End diff – Two points: 1) we cannot have `this.id == null` or `(OutputTag) obj).id == null` because we check at the constructor, so this method can be simplified. 2) we never check for uniqueness of the `outputTag.id`. We should do it at the translation. This is also a correctness issue as this may result in undesired sideoutput "collisions.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105409533

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java —
        @@ -72,6 +76,11 @@ public RecordWriterOutput(

        — End diff –

        We can't throw, because that would crash the program. This is a good catch, though! I will remove the duplication.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105409533 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java — @@ -72,6 +76,11 @@ public RecordWriterOutput( — End diff – We can't throw, because that would crash the program. This is a good catch, though! I will remove the duplication.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105435806

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java —
        @@ -72,6 +76,11 @@ public RecordWriterOutput(

        — End diff –

        I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to other outputs but instead push into the operator or into the network. For the other `Outputs` removing the duplication is not possible because inside the respective `output()` method they call `output()` of another `Output`. They call either with an `OutputTag` or without, so the method body is not actually a duplicate.

        I did find another bug, though, where `CopyingBroadcastingOutputCollector` in `OperatorChain` was not calling the correct `collect()` method on the downstream `Outputs`. 😃

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105435806 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java — @@ -72,6 +76,11 @@ public RecordWriterOutput( — End diff – I can remove the reduplication on `RecordWriterOutput`, `ChainingOutput` and `CopyingChainingOutput`. Basically those `Outputs` that don't forward to other outputs but instead push into the operator or into the network. For the other `Outputs` removing the duplication is not possible because inside the respective `output()` method they call `output()` of another `Output`. They call either with an `OutputTag` or without, so the method body is not actually a duplicate. I did find another bug, though, where `CopyingBroadcastingOutputCollector` in `OperatorChain` was not calling the correct `collect()` method on the downstream `Outputs`. 😃
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105436193

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -387,14 +403,25 @@ public int getChainLength() {

        protected final StreamStatusProvider streamStatusProvider;

        • public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
          + protected final OutputTag<T> outputTag;
          +
          + public ChainingOutput(
          + OneInputStreamOperator<T, ?> operator,
          + StreamStatusProvider streamStatusProvider,
          + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; }

        @Override
        — End diff –

        This I'm fixing, as I mentioned above.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436193 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -387,14 +403,25 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag<T> outputTag; + + public ChainingOutput( + OneInputStreamOperator<T, ?> operator, + StreamStatusProvider streamStatusProvider, + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override — End diff – This I'm fixing, as I mentioned above.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105436408

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record)

        { // don't copy for the last output outputs[outputs.length - 1].collect(record); }

        +
        — End diff –

        fixed.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105436408 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -539,5 +625,26 @@ public void collect(StreamRecord<T> record) { // don't copy for the last output outputs[outputs.length - 1].collect(record); } + — End diff – fixed.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3484

        Thanks @kl0u for the (already) quite thorough review! I'll push a commit with fixes.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks @kl0u for the (already) quite thorough review! I'll push a commit with fixes.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105441456

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}.
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        + TypeHint<T> typeHint =
        + new TypeHint<T>(OutputTag.class, this, 0) {};
        + this.typeInfo = typeHint.getTypeInfo();
        + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + }
        + }
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id and output

        {@link TypeInformation}

        .
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + * @param typeInfo The

        {@code TypeInformation}

        for the side output.
        + */
        + public OutputTag(String id, TypeInformation<T> typeInfo)

        { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + }

        +
        + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException

        { + in.defaultReadObject(); + typeInfo = null; + }

        +
        + public String getId()

        { + return id; + }

        +
        + public TypeInformation<T> getTypeInfo()

        { + return typeInfo; + }

        +
        + @Override
        + public boolean equals(Object obj) {
        + return obj instanceof OutputTag
        — End diff –

        I would have liked to include the `TypeInformation` into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441456 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation} . + * + * @param id The id of the created {@code OutputTag} . + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag — End diff – I would have liked to include the `TypeInformation` into the check but we can't do that because it's transient. I'll try and figure something out for checking that side outputs are unique, not as easy as it seems.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r105441679

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}.
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        + TypeHint<T> typeHint =
        + new TypeHint<T>(OutputTag.class, this, 0) {};
        + this.typeInfo = typeHint.getTypeInfo();
        + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + }
        + }
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id and output

        {@link TypeInformation}

        .
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + * @param typeInfo The

        {@code TypeInformation}

        for the side output.
        + */
        + public OutputTag(String id, TypeInformation<T> typeInfo)

        { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + }

        +
        + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException

        { + in.defaultReadObject(); + typeInfo = null; + }

        +
        + public String getId()

        { + return id; + }

        +
        + public TypeInformation<T> getTypeInfo()

        { + return typeInfo; + }

        +
        + @Override
        + public boolean equals(Object obj) {
        + return obj instanceof OutputTag
        — End diff –

        I see. The problem is that if this does not work, then we can have important side effects.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105441679 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation} . + * + * @param id The id of the created {@code OutputTag} . + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag — End diff – I see. The problem is that if this does not work, then we can have important side effects.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3484

        @kl0u @chenqin I cleaned up the commits, distributed the fixes from the comments to the right commits. I also added more tests/ITCases for: detecting name clashes in side output IDs, side outputs with multiple consumers.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 @kl0u @chenqin I cleaned up the commits, distributed the fixes from the comments to the right commits. I also added more tests/ITCases for: detecting name clashes in side output IDs, side outputs with multiple consumers.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on the issue:

        https://github.com/apache/flink/pull/3484

        Thanks @aljoscha I will have a look on Monday.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3484 Thanks @aljoscha I will have a look on Monday.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106138342

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
        @@ -416,4 +428,26 @@ private boolean canBeParallel()

        { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

        +
        + /**
        + * Gets the

        {@link DataStream}

        that contains the elements that are emitted from an operation
        + * into the side output with the given

        {@link OutputTag}

        .
        + *
        + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
        + */
        + public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
        + sideOutputTag = clean(sideOutputTag);
        — End diff –

        I think it is better to not reuse the argument variable but create a new one.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138342 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag} . + * + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) + */ + public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){ + sideOutputTag = clean(sideOutputTag); — End diff – I think it is better to not reuse the argument variable but create a new one.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106138473

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
        @@ -416,4 +428,26 @@ private boolean canBeParallel()

        { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

        +
        + /**
        + * Gets the

        {@link DataStream}

        that contains the elements that are emitted from an operation
        + * into the side output with the given

        {@link OutputTag}

        .
        + *
        + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
        + */
        + public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){
        + sideOutputTag = clean(sideOutputTag);
        +
        + TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
        + if (type != null && !type.equals(sideOutputTag.getTypeInfo()))

        { + throw new UnsupportedOperationException("A side output with a matching id was " + + "already requested with a different type. This is not allowed, side output " + + "ids need to be unique."); + }

        +
        + requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
        +
        — End diff –

        The `requireNotNull` should be in the beginning of the method.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138473 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag} . + * + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) + */ + public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag){ + sideOutputTag = clean(sideOutputTag); + + TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag); + if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { + throw new UnsupportedOperationException("A side output with a matching id was " + + "already requested with a different type. This is not allowed, side output " + + "ids need to be unique."); + } + + requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); + — End diff – The `requireNotNull` should be in the beginning of the method.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106138237

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java —
        @@ -416,4 +428,26 @@ private boolean canBeParallel()

        { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

        +
        + /**
        + * Gets the

        {@link DataStream}

        that contains the elements that are emitted from an operation
        + * into the side output with the given

        {@link OutputTag}

        .
        + *
        + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object)
        + */
        — End diff –

        Missing space between the ) and the {

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138237 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java — @@ -416,4 +428,26 @@ private boolean canBeParallel() { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag} . + * + * @see org.apache.flink.streaming.api.functions.ProcessFunction.Context#output(OutputTag, Object) + */ — End diff – Missing space between the ) and the {
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106139079

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -567,6 +600,17 @@ protected boolean isLate(W window) {
        }

        /**
        + * Decide if a record is currently late, based on current watermark and allowed lateness.
        + *
        + * @param element The element to check
        + * @return The element for which should be considered when sideoutputs
        + */
        + protected boolean isLate(StreamRecord<IN> element){
        — End diff –

        This is not used any more, right? So it can be deleted.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139079 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** + * Decide if a record is currently late, based on current watermark and allowed lateness. + * + * @param element The element to check + * @return The element for which should be considered when sideoutputs + */ + protected boolean isLate(StreamRecord<IN> element){ — End diff – This is not used any more, right? So it can be deleted.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106139530

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -441,26 +486,53 @@ public void close() {
        private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {

        private final TypeSerializer<T> serializer;

        • +
          public CopyingChainingOutput(
          OneInputStreamOperator<T, ?> operator,
          TypeSerializer<T> serializer,
          + OutputTag<T> outputTag,
          StreamStatusProvider streamStatusProvider)

          { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; }

        @Override
        public void collect(StreamRecord<T> record) {
        + if (this.outputTag != null)

        { + // we are only responsible for emitting to the main input + return; + }

        +
        + pushToOperator(record);
        + }
        +
        + @Override
        + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
        + if (this.outputTag == null || !this.outputTag.equals(outputTag))

        { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + }

        +
        + pushToOperator(record);
        + }
        +
        + @Override
        — End diff –

        This can become `private`, as before.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139530 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput<T> extends ChainingOutput<T> { private final TypeSerializer<T> serializer; + public CopyingChainingOutput( OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer, + OutputTag<T> outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord<T> record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override — End diff – This can become `private`, as before.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106139593

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -441,26 +486,53 @@ public void close() {
        private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {

        private final TypeSerializer<T> serializer;

        • +
          public CopyingChainingOutput(
          OneInputStreamOperator<T, ?> operator,
          TypeSerializer<T> serializer,
          + OutputTag<T> outputTag,
          StreamStatusProvider streamStatusProvider)

          { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; }

        @Override
        public void collect(StreamRecord<T> record) {
        + if (this.outputTag != null)

        { + // we are only responsible for emitting to the main input + return; + }

        +
        + pushToOperator(record);
        + }
        +
        + @Override
        + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
        + if (this.outputTag == null || !this.outputTag.equals(outputTag))

        { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + }

        +
        + pushToOperator(record);
        + }
        +
        + @Override
        + protected <X> void pushToOperator(StreamRecord<X> record) {
        try

        { + // we know that the given outputTag matches our OutputTag so the record + // must be of the type that our operator (and Serializer) expects. + @SuppressWarnings("unchecked") + StreamRecord<T> castRecord = (StreamRecord<T>) record; + numRecordsIn.inc(); - StreamRecord<T> copy = record.copy(serializer.copy(record.getValue())); + StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); - operator.processElement(copy); - }
        • catch (Exception e) {
          + operator.processElement(castRecord);
            • End diff –

        This should be `copy`, not `castRecord`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139593 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -441,26 +486,53 @@ public void close() { private static final class CopyingChainingOutput<T> extends ChainingOutput<T> { private final TypeSerializer<T> serializer; + public CopyingChainingOutput( OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer, + OutputTag<T> outputTag, StreamStatusProvider streamStatusProvider) { - super(operator, streamStatusProvider); + super(operator, streamStatusProvider, outputTag); this.serializer = serializer; } @Override public void collect(StreamRecord<T> record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + + @Override + protected <X> void pushToOperator(StreamRecord<X> record) { try { + // we know that the given outputTag matches our OutputTag so the record + // must be of the type that our operator (and Serializer) expects. + @SuppressWarnings("unchecked") + StreamRecord<T> castRecord = (StreamRecord<T>) record; + numRecordsIn.inc(); - StreamRecord<T> copy = record.copy(serializer.copy(record.getValue())); + StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); - operator.processElement(copy); - } catch (Exception e) { + operator.processElement(castRecord); End diff – This should be `copy`, not `castRecord`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106139449

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -387,18 +401,49 @@ public int getChainLength() {

        protected final StreamStatusProvider streamStatusProvider;

        • public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) {
          + protected final OutputTag<T> outputTag;
          +
          + public ChainingOutput(
          + OneInputStreamOperator<T, ?> operator,
          + StreamStatusProvider streamStatusProvider,
          + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; }

        @Override
        public void collect(StreamRecord<T> record) {
        + if (this.outputTag != null)

        { + // we are only responsible for emitting to the main input + return; + }

        +
        + pushToOperator(record);
        + }
        +
        + @Override
        + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) {
        + if (this.outputTag == null || !this.outputTag.equals(outputTag))

        { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + }

        +
        + pushToOperator(record);
        + }
        +
        — End diff –

        This can become `private` as the copying alternative has its own implementation, right?

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139449 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -387,18 +401,49 @@ public int getChainLength() { protected final StreamStatusProvider streamStatusProvider; public ChainingOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider) { + protected final OutputTag<T> outputTag; + + public ChainingOutput( + OneInputStreamOperator<T, ?> operator, + StreamStatusProvider streamStatusProvider, + OutputTag<T> outputTag) { this.operator = operator; this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); this.streamStatusProvider = streamStatusProvider; + this.outputTag = outputTag; } @Override public void collect(StreamRecord<T> record) { + if (this.outputTag != null) { + // we are only responsible for emitting to the main input + return; + } + + pushToOperator(record); + } + + @Override + public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + if (this.outputTag == null || !this.outputTag.equals(outputTag)) { + // we are only responsible for emitting to the side-output specified by our + // OutputTag. + return; + } + + pushToOperator(record); + } + — End diff – This can become `private` as the copying alternative has its own implementation, right?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106139311

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java —
        @@ -326,33 +327,46 @@ public int getChainLength() {
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,

        • List<StreamOperator<?>> allOperators)
          + List<StreamOperator<?>> allOperators,
          + OutputTag<IN> outputTag)
          {
          // create the output that the operator writes to first. this may recursively create more operators
          Output<StreamRecord<OUT>> output = createOutputCollector(
          containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);

        // now create the operator and give it the output collector to write its output to
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
        +
        chainedOperator.setup(containingTask, operatorConfig, output);

        allOperators.add(chainedOperator);

        if (containingTask.getExecutionConfig().isObjectReuseEnabled())

        { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); }

        else

        { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); }

        }

        private <T> RecordWriterOutput<T> createStreamOutput(
        StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
        Environment taskEnvironment,

        • String taskName)
        • {
        • TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
          + String taskName) {
          + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
          +
          + TypeSerializer outSerializer = null;
          +
          + if (edge.getOutputTag() != null) { + // side output + outSerializer = upStreamConfig.getTypeSerializerSideOut( + edge.getOutputTag(), taskEnvironment.getUserClassLoader()); + }

          else {
          + // main output

            • End diff –

        this can become one line.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106139311 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java — @@ -326,33 +327,46 @@ public int getChainLength() { Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) + List<StreamOperator<?>> allOperators, + OutputTag<IN> outputTag) { // create the output that the operator writes to first. this may recursively create more operators Output<StreamRecord<OUT>> output = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); + chainedOperator.setup(containingTask, operatorConfig, output); allOperators.add(chainedOperator); if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { - return new ChainingOutput<>(chainedOperator, this); + return new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); - return new CopyingChainingOutput<>(chainedOperator, inSerializer, this); + return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } } private <T> RecordWriterOutput<T> createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, String taskName) { TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); + String taskName) { + OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput + + TypeSerializer outSerializer = null; + + if (edge.getOutputTag() != null) { + // side output + outSerializer = upStreamConfig.getTypeSerializerSideOut( + edge.getOutputTag(), taskEnvironment.getUserClassLoader()); + } else { + // main output End diff – this can become one line.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106138062

        — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java —
        @@ -0,0 +1,115 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.util;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import java.io.IOException;
        +import java.io.ObjectInputStream;
        +import java.io.Serializable;
        +
        +import org.apache.flink.annotation.PublicEvolving;
        +import org.apache.flink.api.common.functions.InvalidTypesException;
        +import org.apache.flink.api.common.typeinfo.TypeHint;
        +import org.apache.flink.api.common.typeinfo.TypeInformation;
        +
        +
        +/**
        + * An

        {@link OutputTag}

        is a typed and named tag to use for tagging side outputs
        + * of an operator.
        + *
        + * <p>An

        {@code OutputTag} must always be an anonymous inner class so that Flink can derive
        + * a {@link TypeInformation} for the generic type parameter.
        + *
        + * <p>Example:
        + * <pre>{@code
        + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){});
        + * }</pre>
        + *
        + * @param <T> the type of elements in the side-output stream.
        + */
        +@PublicEvolving
        +public class OutputTag<T> implements Serializable {
        +
        + private static final long serialVersionUID = 1L;
        +
        + private final String id;
        +
        + private transient TypeInformation<T> typeInfo;
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id.
        + *
        + * @param id The id of the created

        {@code OutputTag}.
        + */
        + public OutputTag(String id) {
        + Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
        + this.id = requireNonNull(id);
        +
        + try {
        + TypeHint<T> typeHint =
        + new TypeHint<T>(OutputTag.class, this, 0) {};
        + this.typeInfo = typeHint.getTypeInfo();
        + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + }
        + }
        +
        + /**
        + * Creates a new named {@code OutputTag}

        with the given id and output

        {@link TypeInformation}

        .
        + *
        + * @param id The id of the created

        {@code OutputTag}

        .
        + * @param typeInfo The

        {@code TypeInformation}

        for the side output.
        + */
        + public OutputTag(String id, TypeInformation<T> typeInfo)

        { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + }

        +
        + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException

        { + in.defaultReadObject(); + typeInfo = null; + }

        +
        + public String getId()

        { + return id; + }

        +
        + public TypeInformation<T> getTypeInfo()

        { + return typeInfo; + }

        +
        + @Override
        + public boolean equals(Object obj) {
        + return obj instanceof OutputTag
        — End diff –

        Still the first comment applies: the `equals` can be simplified given that `id != null`.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138062 — Diff: flink-core/src/main/java/org/apache/flink/util/OutputTag.java — @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.util; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; + + +/** + * An {@link OutputTag} is a typed and named tag to use for tagging side outputs + * of an operator. + * + * <p>An {@code OutputTag} must always be an anonymous inner class so that Flink can derive + * a {@link TypeInformation} for the generic type parameter. + * + * <p>Example: + * <pre>{@code + * OutputTag<Tuple2<String, Long>> info = new OutputTag<Tuple2<String, Long>>("late-data"){}); + * }</pre> + * + * @param <T> the type of elements in the side-output stream. + */ +@PublicEvolving +public class OutputTag<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String id; + + private transient TypeInformation<T> typeInfo; + + /** + * Creates a new named {@code OutputTag} with the given id. + * + * @param id The id of the created {@code OutputTag}. + */ + public OutputTag(String id) { + Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.id = requireNonNull(id); + + try { + TypeHint<T> typeHint = + new TypeHint<T>(OutputTag.class, this, 0) {}; + this.typeInfo = typeHint.getTypeInfo(); + } catch (InvalidTypesException e) { + throw new InvalidTypesException("Could not determine TypeInformation for generic " + + "OutputTag type. Did you forget to make your OutputTag an anonymous inner class?", e); + } + } + + /** + * Creates a new named {@code OutputTag} with the given id and output {@link TypeInformation} . + * + * @param id The id of the created {@code OutputTag} . + * @param typeInfo The {@code TypeInformation} for the side output. + */ + public OutputTag(String id, TypeInformation<T> typeInfo) { + this.id = Preconditions.checkNotNull(id, "OutputTag id cannot be null."); + this.typeInfo = + Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + typeInfo = null; + } + + public String getId() { + return id; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof OutputTag — End diff – Still the first comment applies: the `equals` can be simplified given that `id != null`.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106138895

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java —
        @@ -0,0 +1,73 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.flink.streaming.api.transformations;
        +
        +import static java.util.Objects.requireNonNull;
        +
        +import com.google.common.collect.Lists;
        +import org.apache.flink.util.OutputTag;
        +import org.apache.flink.streaming.api.operators.ChainingStrategy;
        +
        +import java.util.Collection;
        +import java.util.List;
        +
        +
        +/**
        + * This transformation represents a selection of a side output of an upstream operation with a
        + * given

        {@link OutputTag}

        .
        + *
        + * <p>This does not create a physical operation, it only affects how upstream operations are
        + * connected to downstream operations.
        + *
        + * @param <T> The type of the elements that result from this

        {@code SideOutputTransformation}

        + */
        — End diff –

        Here we do not check if the `input` is `null` (we do it in the caller method only) but we try get the parallelism. We could have the parallelism as a separate argument, and then, after the `super()` check if the input is null.
        This makes the code of the class self-contained as you do not have to check other classes to see if the `input` can be `null` or not. What do you think?

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106138895 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SideOutputTransformation.java — @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.transformations; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; +import org.apache.flink.util.OutputTag; +import org.apache.flink.streaming.api.operators.ChainingStrategy; + +import java.util.Collection; +import java.util.List; + + +/** + * This transformation represents a selection of a side output of an upstream operation with a + * given {@link OutputTag} . + * + * <p>This does not create a physical operation, it only affects how upstream operations are + * connected to downstream operations. + * + * @param <T> The type of the elements that result from this {@code SideOutputTransformation} + */ — End diff – Here we do not check if the `input` is `null` (we do it in the caller method only) but we try get the parallelism. We could have the parallelism as a separate argument, and then, after the `super()` check if the input is null. This makes the code of the class self-contained as you do not have to check other classes to see if the `input` can be `null` or not. What do you think?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106163521

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java —
        @@ -63,6 +66,7 @@
        private TypeSerializer<?> typeSerializerIn1;
        private TypeSerializer<?> typeSerializerIn2;
        private TypeSerializer<?> typeSerializerOut;
        + private Map<OutputTag<?>, TypeSerializer<?>> typeSerializerMap;
        — End diff –

        This is not used anywhere in the code. Can it be removed, along with the `getTypeSerializerOut()` and `setTypeSerializerOut()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163521 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java — @@ -63,6 +66,7 @@ private TypeSerializer<?> typeSerializerIn1; private TypeSerializer<?> typeSerializerIn2; private TypeSerializer<?> typeSerializerOut; + private Map<OutputTag<?>, TypeSerializer<?>> typeSerializerMap; — End diff – This is not used anywhere in the code. Can it be removed, along with the `getTypeSerializerOut()` and `setTypeSerializerOut()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106162749

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java —
        @@ -48,15 +49,25 @@

        • output selection).
          */
          private final List<String> selectedNames;
          +
          + /**
          + * The side-output tag (if any) of this {@link StreamEdge}.
          + */
          + private final OutputTag outputTag;
          +
          + /**
          + * The {@link StreamPartitioner} on this {@link StreamEdge}

          .
          + */
          private StreamPartitioner<?> outputPartitioner;

        public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,

        • List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
          + List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
          this.sourceVertex = sourceVertex;
          this.targetVertex = targetVertex;
          this.typeNumber = typeNumber;
          this.selectedNames = selectedNames;
          this.outputPartitioner = outputPartitioner;
          + this.outputTag = outputTag;

        — End diff –

        Does it make sense to add the outputTag also in the `edgeId`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162749 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java — @@ -48,15 +49,25 @@ output selection). */ private final List<String> selectedNames; + + /** + * The side-output tag (if any) of this {@link StreamEdge}. + */ + private final OutputTag outputTag; + + /** + * The {@link StreamPartitioner} on this {@link StreamEdge} . + */ private StreamPartitioner<?> outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, List<String> selectedNames, StreamPartitioner<?> outputPartitioner) { + List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; — End diff – Does it make sense to add the outputTag also in the `edgeId`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106163045

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java —
        @@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty
        downStreamVertexID,
        typeNumber,
        null,

        • new ArrayList<String>());
          + new ArrayList<String>(), null);
            • End diff –

        The `null` should go to the next line for uniformity.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106163045 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java — @@ -333,32 +373,39 @@ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int ty downStreamVertexID, typeNumber, null, new ArrayList<String>()); + new ArrayList<String>(), null); End diff – The `null` should go to the next line for uniformity.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106162497

        — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java —
        @@ -0,0 +1,139 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.streaming.examples.sideoutput;
        +
        +import org.apache.flink.api.java.functions.KeySelector;
        +import org.apache.flink.streaming.api.TimeCharacteristic;
        +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        +import org.apache.flink.streaming.api.functions.ProcessFunction;
        +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
        +import org.apache.flink.streaming.api.windowing.time.Time;
        +import org.apache.flink.util.OutputTag;
        +import org.apache.flink.api.java.tuple.Tuple2;
        +import org.apache.flink.api.java.utils.ParameterTool;
        +import org.apache.flink.examples.java.wordcount.util.WordCountData;
        +import org.apache.flink.streaming.api.datastream.DataStream;
        +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        +import org.apache.flink.util.Collector;
        +
        +/**
        + * An example that illustrates the use of side outputs.
        + *
        + * <p>This is a modified version of

        {@link org.apache.flink.streaming.examples.windowing.WindowWordCount}

        + * that has a filter in the tokenizer and only emits some words for counting
        + * while emitting the other words to a side output.
        + */
        +public class SideOutputExample {
        +
        + /**
        + * We need to create an

        {@link OutputTag}

        so that we can reference it when emitting
        + * data to a side output and also to retrieve the side output stream from an operation.
        + */
        + static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};
        — End diff –

        Here we add a side output but we do nothing to show that it works. Probably we can add a prefix "rejected-" to the record and print it, so that the user can see what the side output does.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106162497 — Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java — @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.sideoutput; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.OutputTag; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * An example that illustrates the use of side outputs. + * + * <p>This is a modified version of {@link org.apache.flink.streaming.examples.windowing.WindowWordCount} + * that has a filter in the tokenizer and only emits some words for counting + * while emitting the other words to a side output. + */ +public class SideOutputExample { + + /** + * We need to create an {@link OutputTag} so that we can reference it when emitting + * data to a side output and also to retrieve the side output stream from an operation. + */ + static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {}; — End diff – Here we add a side output but we do nothing to show that it works. Probably we can add a prefix "rejected-" to the record and print it, so that the user can see what the side output does.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106684096

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -567,6 +600,17 @@ protected boolean isLate(W window) {
        }

        /**
        + * Decide if a record is currently late, based on current watermark and allowed lateness.
        + *
        + * @param element The element to check
        + * @return The element for which should be considered when sideoutputs
        + */
        + protected boolean isLate(StreamRecord<IN> element){
        — End diff –

        I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right?

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684096 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** + * Decide if a record is currently late, based on current watermark and allowed lateness. + * + * @param element The element to check + * @return The element for which should be considered when sideoutputs + */ + protected boolean isLate(StreamRecord<IN> element){ — End diff – I must have removed the check by accident. I think we agreed to rename this to something more meaningful and keep it, right?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user kl0u commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106684413

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
        @@ -567,6 +600,17 @@ protected boolean isLate(W window) {
        }

        /**
        + * Decide if a record is currently late, based on current watermark and allowed lateness.
        + *
        + * @param element The element to check
        + * @return The element for which should be considered when sideoutputs
        + */
        + protected boolean isLate(StreamRecord<IN> element){
        — End diff –

        That is what I remember as well.

        Show
        githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106684413 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -567,6 +600,17 @@ protected boolean isLate(W window) { } /** + * Decide if a record is currently late, based on current watermark and allowed lateness. + * + * @param element The element to check + * @return The element for which should be considered when sideoutputs + */ + protected boolean isLate(StreamRecord<IN> element){ — End diff – That is what I remember as well.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3484#discussion_r106685158

        — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java —
        @@ -48,15 +49,25 @@

        • output selection).
          */
          private final List<String> selectedNames;
          +
          + /**
          + * The side-output tag (if any) of this {@link StreamEdge}.
          + */
          + private final OutputTag outputTag;
          +
          + /**
          + * The {@link StreamPartitioner} on this {@link StreamEdge}

          .
          + */
          private StreamPartitioner<?> outputPartitioner;

        public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,

        • List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
          + List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
          this.sourceVertex = sourceVertex;
          this.targetVertex = targetVertex;
          this.typeNumber = typeNumber;
          this.selectedNames = selectedNames;
          this.outputPartitioner = outputPartitioner;
          + this.outputTag = outputTag;

        — End diff –

        Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now.

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r106685158 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java — @@ -48,15 +49,25 @@ output selection). */ private final List<String> selectedNames; + + /** + * The side-output tag (if any) of this {@link StreamEdge}. + */ + private final OutputTag outputTag; + + /** + * The {@link StreamPartitioner} on this {@link StreamEdge} . + */ private StreamPartitioner<?> outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, List<String> selectedNames, StreamPartitioner<?> outputPartitioner) { + List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; + this.outputTag = outputTag; — End diff – Not sure what the edge id exactly does and who uses it so I prefer to not touch it, for now.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/3484

        Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge.

        @chenqin A lot of thanks also to you for working on this and pushing it with me! 😃

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3484 Thanks for reviewing again, @kl0u! I incorporated all your suggestions. I'm now waiting for travis to give the green light and then I'll merge. @chenqin A lot of thanks also to you for working on this and pushing it with me! 😃
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha closed the pull request at:

        https://github.com/apache/flink/pull/3484

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3484
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user aljoscha commented on the issue:

        https://github.com/apache/flink/pull/2982

        @chenqin I finally merged it. Could you please also close this PR?

        And thanks again for working on this for so long! 👍

        Show
        githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2982 @chenqin I finally merged it. Could you please also close this PR? And thanks again for working on this for so long! 👍
        Hide
        aljoscha Aljoscha Krettek added a comment -

        Implemented in
        e134d27589ead89882d94969edeeb171ee4433b1

        And follow-up commits.

        Show
        aljoscha Aljoscha Krettek added a comment - Implemented in e134d27589ead89882d94969edeeb171ee4433b1 And follow-up commits.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin commented on the issue:

        https://github.com/apache/flink/pull/2982

        Wow, finally in!

        It was fun and thanks for you help along every step of the journey!

        Chen

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin commented on the issue: https://github.com/apache/flink/pull/2982 Wow, finally in! It was fun and thanks for you help along every step of the journey! Chen
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user chenqin closed the pull request at:

        https://github.com/apache/flink/pull/2982

        Show
        githubbot ASF GitHub Bot added a comment - Github user chenqin closed the pull request at: https://github.com/apache/flink/pull/2982
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user jgrier commented on the issue:

        https://github.com/apache/flink/pull/2982

        Nice

        Show
        githubbot ASF GitHub Bot added a comment - Github user jgrier commented on the issue: https://github.com/apache/flink/pull/2982 Nice

          People

          • Assignee:
            foxss Chen Qin
            Reporter:
            foxss Chen Qin
          • Votes:
            2 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development