Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5017

Introduce StreamStatus stream element to allow for temporarily idle streaming sources

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Streaming
    • Labels:
      None

      Description

      A StreamStatus element informs receiving operators whether or not they should continue to expect watermarks from the sending operator. There are 2 kinds of status, namely IDLE and ACTIVE. Watermark status elements are generated at the sources, and may be propagated through the operators of the topology using Output#emitWatermarkStatus(WatermarkStatus).
      Sources and downstream operators should emit either of the status elements once it changes between "watermark-idle" and "watermark-active" states.

      A source is considered "watermark-idle" if it will not emit records for an indefinite amount of time. This is the case, for example, for Flink's Kafka Consumer, where sources might initially have no assigned partitions to read from, or no records can be read from the assigned partitions. Once the source detects that it will resume emitting data, it is considered "watermark-active".

      Downstream operators with multiple inputs (ex. head operators of a OneInputStreamTask or TwoInputStreamTask) should not wait for watermarks from an upstream operator that is "watermark-idle" when deciding whether or not to advance the operator's current watermark. When a downstream operator determines that all upstream operators are "watermark-idle" (i.e. when all input channels have received the watermark idle status element), then the operator is considered to also be "watermark-idle", as it will temporarily be unable to advance its own watermark. This is always the case for operators that only read from a single upstream operator. Once an operator is considered "watermark-idle", it should itself forward its idle status to inform downstream operators. The operator is considered to be back to "watermark-active" as soon as at least one of its upstream operators resume to be "watermark-active" (i.e. when at least one input channel receives the watermark active status element), and should also forward its active status to inform downstream operators.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          It might be possible to extend Watermark for this purpose, so that we don't need a bunch of new methods on operators/outputs. Haven't thought about it too deep, though, so it might not be easily possible.

          Show
          aljoscha Aljoscha Krettek added a comment - It might be possible to extend Watermark for this purpose, so that we don't need a bunch of new methods on operators/outputs. Haven't thought about it too deep, though, so it might not be easily possible.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I had considered this, by defining values "-1" and "-2" as special timestamp values that represent idle / active watermark status.
          However, I wasn't quite sure that occupying "-1"and "-2" was a good idea, or whether or not it'll interfere other parts of the system / effect any future extensions. Otherwise, considering only the purpose at hand stated in this ticket, it should be possible with a bit of finesse, the only concern might be that it'll end up in bad code readability (we basically need to check if watermarks are actually these two special values in every place where we work with watermarks).

          I think a separate WatermarkStatus class has another readability advantage: we can simply keep a currentWatermarkStatus variable in operators that is set to the last emitted WatermarkStatus, which naturally reflects whether the operator is currently watermark idle or active.

          Either way, let me know what you think I'm not against extending Watermark, so I'm still open to going for that approach if you think the new methods are excessive.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I had considered this, by defining values "-1" and "-2" as special timestamp values that represent idle / active watermark status. However, I wasn't quite sure that occupying "-1"and "-2" was a good idea, or whether or not it'll interfere other parts of the system / effect any future extensions. Otherwise, considering only the purpose at hand stated in this ticket, it should be possible with a bit of finesse, the only concern might be that it'll end up in bad code readability (we basically need to check if watermarks are actually these two special values in every place where we work with watermarks). I think a separate WatermarkStatus class has another readability advantage: we can simply keep a currentWatermarkStatus variable in operators that is set to the last emitted WatermarkStatus , which naturally reflects whether the operator is currently watermark idle or active. Either way, let me know what you think I'm not against extending Watermark , so I'm still open to going for that approach if you think the new methods are excessive.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          If we want to extend Watermark, to aid with readability a bit, I think it's also fine to have a separate WatermarkStatus enumeration that operators can use.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited If we want to extend Watermark , to aid with readability a bit, I think it's also fine to have a separate WatermarkStatus enumeration that operators can use.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Oh dear, this is turning out to be quite tricky.

          My current thinking is that we can completely bypass operators if we have IDLE as watermark input. For example, consider this chain of operators:

          Kafka Source -> op1 -> op2 -> timestamp extractor

          op1 and op2 can be any operation (map,flatmap, what have you...).

          None of the operations op1, op2 or the timestamp extractor would actually need to be informed of the IDLE watermark. The intermediate operations cannot do anything with it, event the timestamp extractor cannot do anything but forward it. Therefore I'm thinking to just bypass it.

          The only place where it is relevant is in the watermark advancement code in the input processors. If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections, which in itself is quite tricky.

          Show
          aljoscha Aljoscha Krettek added a comment - Oh dear, this is turning out to be quite tricky. My current thinking is that we can completely bypass operators if we have IDLE as watermark input. For example, consider this chain of operators: Kafka Source -> op1 -> op2 -> timestamp extractor op1 and op2 can be any operation (map,flatmap, what have you...). None of the operations op1, op2 or the timestamp extractor would actually need to be informed of the IDLE watermark. The intermediate operations cannot do anything with it, event the timestamp extractor cannot do anything but forward it. Therefore I'm thinking to just bypass it. The only place where it is relevant is in the watermark advancement code in the input processors. If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections, which in itself is quite tricky.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Aljoscha Krettek,

          To ease discussion a bit, I've just pushed some of my current changes regarding the IDLE watermark forwarding logic to a local branch: https://github.com/tzulitai/flink/tree/FLINK-5017. It's WIP code which I haven't proofread yet, so might be a bit sloppy / exception messages missing etc. / won't build yet as the output & operator interface implementations aren't all updated yet.

          The only place where it is relevant is in the watermark advancement code in the input processors. If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections, which in itself is quite tricky.

          Concerning this: the approach I'm taking right now, is that operator's that are IDLE are not supposed to receive any records or watermarks at all, before they are triggered back to ACTIVE (using WatermarkStatus elements). It's basically the sources' and intermediate timestamp extractors' responsibility to ensure this.
          So, regarding "If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections", I'm not really sure if this is necessary. Once the whole input processor is IDLE, a IDLE status will be propagated down the chain, which eventually is forwarded to a real outgoing connection (output of last operator in chain, correct?). No records or watermarks will be propagated down the chain while the input processor remains IDLE. Once the input processor is ACTIVE again, it propagates an ACTIVE down the chain, and records and watermarks are allowed to flow again.

          Do you think the above makes sense? The WIP code implements this approach, at StreamInputProcessor / StreamTwoInputProcessor / AbstractStreamOperator.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Aljoscha Krettek , To ease discussion a bit, I've just pushed some of my current changes regarding the IDLE watermark forwarding logic to a local branch: https://github.com/tzulitai/flink/tree/FLINK-5017 . It's WIP code which I haven't proofread yet, so might be a bit sloppy / exception messages missing etc. / won't build yet as the output & operator interface implementations aren't all updated yet. The only place where it is relevant is in the watermark advancement code in the input processors. If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections, which in itself is quite tricky. Concerning this: the approach I'm taking right now, is that operator's that are IDLE are not supposed to receive any records or watermarks at all, before they are triggered back to ACTIVE (using WatermarkStatus elements). It's basically the sources' and intermediate timestamp extractors' responsibility to ensure this. So, regarding "If all inputs are IDLE, then the input processor needs to bypass the chain and directly output to all real outgoing connections", I'm not really sure if this is necessary. Once the whole input processor is IDLE, a IDLE status will be propagated down the chain, which eventually is forwarded to a real outgoing connection (output of last operator in chain, correct?). No records or watermarks will be propagated down the chain while the input processor remains IDLE. Once the input processor is ACTIVE again, it propagates an ACTIVE down the chain, and records and watermarks are allowed to flow again. Do you think the above makes sense? The WIP code implements this approach, at StreamInputProcessor / StreamTwoInputProcessor / AbstractStreamOperator .
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I think the main difference in my thinking is that the idleness of operators are always calculated and appropriately propagated downstream at each operator, and not simply bypassed. All operators, regardless of whether or not it's just an operator within a chain, need to check if they are toggled from IDLE -> ACTIVE or ACTIVE -> IDLE and appropriately propagate their new status.

          I use two status markers to do this. Also, it should be guaranteed that operators don't receive records or watermarks between an IDLE and an ACTIVE.

          Do you think completely bypassing intermediate operators in chains is absolutely necessary? It will definitely eliminate unnecessary work at intermediate operators, but as you pointed out, might introduce more complexity in the propagation.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I think the main difference in my thinking is that the idleness of operators are always calculated and appropriately propagated downstream at each operator, and not simply bypassed. All operators, regardless of whether or not it's just an operator within a chain, need to check if they are toggled from IDLE -> ACTIVE or ACTIVE -> IDLE and appropriately propagate their new status. I use two status markers to do this. Also, it should be guaranteed that operators don't receive records or watermarks between an IDLE and an ACTIVE. Do you think completely bypassing intermediate operators in chains is absolutely necessary? It will definitely eliminate unnecessary work at intermediate operators, but as you pointed out, might introduce more complexity in the propagation.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I think your approach works well with the current way the OperatorChain works. The problem with bypassing the chain is that the chain can have an output that goes to network at any stage and the watermarks/watermark status have to be broadcast on each of those.

          I attached a (poorly drawn ) picture to illustrate this:

          I think it's a good approach.

          Show
          aljoscha Aljoscha Krettek added a comment - I think your approach works well with the current way the OperatorChain works. The problem with bypassing the chain is that the chain can have an output that goes to network at any stage and the watermarks/watermark status have to be broadcast on each of those. I attached a (poorly drawn ) picture to illustrate this: I think it's a good approach.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Ah, I see. I actually didn't realize it is possible for such cases to exist, didn't pick this up while looking at the OperatorChain code. Thanks a lot for the illustration ! I'll continue with the current approach.

          Btw, do you happen to have any other ideas for the naming of WatermarkStatus? I have a feeling there's a better naming out there, because it doesn't only mark watermarks as idle, but also records.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Ah, I see. I actually didn't realize it is possible for such cases to exist, didn't pick this up while looking at the OperatorChain code. Thanks a lot for the illustration ! I'll continue with the current approach. Btw, do you happen to have any other ideas for the naming of WatermarkStatus ? I have a feeling there's a better naming out there, because it doesn't only mark watermarks as idle, but also records.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Possibly StreamStatus?

          Show
          aljoscha Aljoscha Krettek added a comment - Possibly StreamStatus ?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Yes I think that's better than WatermarkStatus, thanks. Will use StreamStatus.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Yes I think that's better than WatermarkStatus , thanks. Will use StreamStatus .
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2801

          FLINK-5017 [streaming] Introduce StreamStatus to facilitate idle sources

          This PR is the first part of making temporarily idle sources in Flink possible, by adding a new `StreamStatus` element that flows with the records. The second part, allowing source operators to emit `StreamStatus` elements, will be submitted as a separate PR based on this one.

          `StreamStatus` elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources.

          Prior to this PR, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in
          watermark-stalling downstream operators when there are idle sources. With this change, operators can
          now mark input channels to be idle, and ignore them when advancing their watermark.

          1. Design Choices
              1. `StreamStatus` are only generated at sources

          Sources guarantee that no records or watermarks will be emitted between a `IDLE` and `ACTIVE` (this would be implemented in the second part PR). All downstream operators need to process statuses that they receive, and appropriately forward status changes. `StreamStatus` can not be generated mid-topology.

              1. Using two status markers - `IDLE` and `ACTIVE`

          We need 2 markers, instead of only a `IDLE`, because operators at the `AbstractStreamOperator` level need to clearly know the end and start of idleness. I had considered using only a `IDLE` marker and seeing any watermarks / records received afterwards as a sign of resuming to be active, but that would deny us of correctly determining whether or not watermarks generated at timestamp extractors in the middle of topologies should actually be ignored (watermarks generated in the middle of topologies need to be ignored when the operator is actually idle, because they can be generated even if sources are idle and records aren't flowing through).

          Despite 2 markers, I plan to only have a single new `markAsTemporarilyIdle()` method on `SourceContext`s that serve as the only means for user source functions to express that the source is idle (included in second part PR). `SourceContext` implementations are responsible for controlling how actual `StreamStatus` elements are sent downstream.

              1. Consolidate watermark / status forwarding as a `StatusWatermarkValve`

          Since the forwarding logic is rather complex now with this change, all forwarding logic is bundled into a "valve", that `OneInputStreamTask`, `TwoInputStreamTask`, and `AbstractStreamOperator` use to control watermark and status forwarding.

          `StatusWatermarkValve` takes a implementation of `ValveOutputHandler`. Implementations decide what to do when a watermark or status is emitted from the valve. For example, `OneInputStreamTask` and `TwoInputStreamTask` simply forwards it to the head operator; `AbstractStreamOperator` needs to advance timers when the valve outputs a new watermark.

          I didn't want to use the `Output` interface, because record elements and latency markers have nothing to do with the valve's control logic; they are always simply forwarded.

              1. Adding a `setup()` life cycle method to `StreamInputProcessor` and `StreamTwoInputProcessor`

          This is mainly to facilitate creating `StatusWatermarkValve`s in the input processors. They need reference to the head operator when being created. I considered passing it in to input processors as a constructor argument in `init()`, but the `StreamIterationTail` operator forbids doing so, because for that operator, the `headOperator` is created only after `init()`.

          We could also consider moving creation of input processors into the beginning of `run()`, which would avoid the need of a `setup()` method on the input processors, but I wasn't sure if that would break anything.

              1. Block watermarks generated mid-topology at the `AbstractStreamOperator` level when idle

          Two catches for this:

          • `processStreamStatus` must NOT be overriden by concrete implementations. We need to rely on that to correctly block watermarks generated by timestamp extractors that emit watermarks completely bypassing the valve's forwarding logic.
          • The current implementation only works for one-input stream operators that generate watermarks. Since we don't seem to have two-input stream operators that generate watermarks also, this should be fine for now.
          1. Testing
          • Add new test in `AbstractStreamOperatorTest` to test that for concrete one-input operators that bypass the valve and directly emit watermarks, the watermarks are blocked if the operator is idle.
          • Unit tests for `StatusWatermarkValve` for complex forwarding cases.
          • Extended `testWatermarkForwarding` tests in both `OneInputStreamTaskTest` and `TwoInputStreamTaskTest` to also test stream status forwarding. They are relatively simple compared to unit test for `StatusWatermarkValve`, just to implicitly ensure that tasks are using valves correctly.

          I plan to add IT tests as second part PR, when source operators can start emitting `StreamStatus`.

          1. Other Remarks

          While working on this task, I have a feeling that perhaps we can consider to start differentiating between what process element methods can be overriden by concrete operator implementations, and what can't.

          For example, it would be best if `AbstractStreamOperator#processStreamStatus()` method can be `final`, to keep Stream Status processing logic away from concrete implementations, forbidding any possibility of overriding that.

          Right now this isn't possible, as we need to tie processing methods to the `OneInputStreamOperator` / `TwoInputStreamOperator` interfaces for a mixin pattern with the `AbstractStreamOperator`.

          What we could probably do, is to let input processors access the head operator as a `AbstractStreamOperator` so that the processing methods only visible at the abstract level can be called, and we keep only the processing methods we allow to override in `OneInputStreamOperator` / `TwoInputStreamOperator`.

          Processing methods that I think should only be visible at the abstract level are: `processElement()`, `processStreamStatus()`, and `processLatencyMarker()`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5017

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2801.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2801


          commit c70d4dca464220ae63f596a474bdd4d957934838
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-11-14T02:53:18Z

          FLINK-5017 [streaming] Introduce StreamStatus to facilitate idle sources

          This commit is the first part of making idle streaming sources in Flink possible. It introduces a new
          element, StreamStatus, that flows with other records in streams. StreamStatus elements are generated
          at the sources, and affect how operators advance their watermarks with the presence of idle sources.

          Prior to this commit, when advancing watermarks at downstream operators, the new min watermark is found
          by simply determining if the min watermark across all input channels has advanced. This resulted in
          watermark-stalling downstream operators when there are idle sources. With this change, operators can
          now mark input channels to be idle, and ignore them when advancing their watermark.

          This commit also includes refactoring of previous watermark forwarding logic into a single class,
          StatusWatermarkVavle. OneInputStreamTasks, TwoInputStreamTasks, and AbstractStreamOperator use valves
          to help them determine how watermarks and stream statuses are forwarded.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2801 FLINK-5017 [streaming] Introduce StreamStatus to facilitate idle sources This PR is the first part of making temporarily idle sources in Flink possible, by adding a new `StreamStatus` element that flows with the records. The second part, allowing source operators to emit `StreamStatus` elements, will be submitted as a separate PR based on this one. `StreamStatus` elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources. Prior to this PR, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in watermark-stalling downstream operators when there are idle sources. With this change, operators can now mark input channels to be idle, and ignore them when advancing their watermark. Design Choices `StreamStatus` are only generated at sources Sources guarantee that no records or watermarks will be emitted between a `IDLE` and `ACTIVE` (this would be implemented in the second part PR). All downstream operators need to process statuses that they receive, and appropriately forward status changes. `StreamStatus` can not be generated mid-topology. Using two status markers - `IDLE` and `ACTIVE` We need 2 markers, instead of only a `IDLE`, because operators at the `AbstractStreamOperator` level need to clearly know the end and start of idleness. I had considered using only a `IDLE` marker and seeing any watermarks / records received afterwards as a sign of resuming to be active, but that would deny us of correctly determining whether or not watermarks generated at timestamp extractors in the middle of topologies should actually be ignored (watermarks generated in the middle of topologies need to be ignored when the operator is actually idle, because they can be generated even if sources are idle and records aren't flowing through). Despite 2 markers, I plan to only have a single new `markAsTemporarilyIdle()` method on `SourceContext`s that serve as the only means for user source functions to express that the source is idle (included in second part PR). `SourceContext` implementations are responsible for controlling how actual `StreamStatus` elements are sent downstream. Consolidate watermark / status forwarding as a `StatusWatermarkValve` Since the forwarding logic is rather complex now with this change, all forwarding logic is bundled into a "valve", that `OneInputStreamTask`, `TwoInputStreamTask`, and `AbstractStreamOperator` use to control watermark and status forwarding. `StatusWatermarkValve` takes a implementation of `ValveOutputHandler`. Implementations decide what to do when a watermark or status is emitted from the valve. For example, `OneInputStreamTask` and `TwoInputStreamTask` simply forwards it to the head operator; `AbstractStreamOperator` needs to advance timers when the valve outputs a new watermark. I didn't want to use the `Output` interface, because record elements and latency markers have nothing to do with the valve's control logic; they are always simply forwarded. Adding a `setup()` life cycle method to `StreamInputProcessor` and `StreamTwoInputProcessor` This is mainly to facilitate creating `StatusWatermarkValve`s in the input processors. They need reference to the head operator when being created. I considered passing it in to input processors as a constructor argument in `init()`, but the `StreamIterationTail` operator forbids doing so, because for that operator, the `headOperator` is created only after `init()`. We could also consider moving creation of input processors into the beginning of `run()`, which would avoid the need of a `setup()` method on the input processors, but I wasn't sure if that would break anything. Block watermarks generated mid-topology at the `AbstractStreamOperator` level when idle Two catches for this: `processStreamStatus` must NOT be overriden by concrete implementations. We need to rely on that to correctly block watermarks generated by timestamp extractors that emit watermarks completely bypassing the valve's forwarding logic. The current implementation only works for one-input stream operators that generate watermarks. Since we don't seem to have two-input stream operators that generate watermarks also, this should be fine for now. Testing Add new test in `AbstractStreamOperatorTest` to test that for concrete one-input operators that bypass the valve and directly emit watermarks, the watermarks are blocked if the operator is idle. Unit tests for `StatusWatermarkValve` for complex forwarding cases. Extended `testWatermarkForwarding` tests in both `OneInputStreamTaskTest` and `TwoInputStreamTaskTest` to also test stream status forwarding. They are relatively simple compared to unit test for `StatusWatermarkValve`, just to implicitly ensure that tasks are using valves correctly. I plan to add IT tests as second part PR, when source operators can start emitting `StreamStatus`. Other Remarks While working on this task, I have a feeling that perhaps we can consider to start differentiating between what process element methods can be overriden by concrete operator implementations, and what can't. For example, it would be best if `AbstractStreamOperator#processStreamStatus()` method can be `final`, to keep Stream Status processing logic away from concrete implementations, forbidding any possibility of overriding that. Right now this isn't possible, as we need to tie processing methods to the `OneInputStreamOperator` / `TwoInputStreamOperator` interfaces for a mixin pattern with the `AbstractStreamOperator`. What we could probably do, is to let input processors access the head operator as a `AbstractStreamOperator` so that the processing methods only visible at the abstract level can be called, and we keep only the processing methods we allow to override in `OneInputStreamOperator` / `TwoInputStreamOperator`. Processing methods that I think should only be visible at the abstract level are: `processElement()`, `processStreamStatus()`, and `processLatencyMarker()`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2801.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2801 commit c70d4dca464220ae63f596a474bdd4d957934838 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-11-14T02:53:18Z FLINK-5017 [streaming] Introduce StreamStatus to facilitate idle sources This commit is the first part of making idle streaming sources in Flink possible. It introduces a new element, StreamStatus, that flows with other records in streams. StreamStatus elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources. Prior to this commit, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in watermark-stalling downstream operators when there are idle sources. With this change, operators can now mark input channels to be idle, and ignore them when advancing their watermark. This commit also includes refactoring of previous watermark forwarding logic into a single class, StatusWatermarkVavle. OneInputStreamTasks, TwoInputStreamTasks, and AbstractStreamOperator use valves to help them determine how watermarks and stream statuses are forwarded.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2801

          R: @aljoscha, @StephanEwen I think this is in good shape for reviews now. Any feedback is welcome!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2801 R: @aljoscha, @StephanEwen I think this is in good shape for reviews now. Any feedback is welcome!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2801

          @tzulitai I won't have much time this week so will review this next week.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2801 @tzulitai I won't have much time this week so will review this next week.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2801

          No problem @aljoscha, thank you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2801 No problem @aljoscha, thank you.
          Hide
          rmetzger Robert Metzger added a comment -

          I'm wondering whether this is really a blocker issue.
          I would like to get the 1.2 release out asap, and I think it would be okay to delay this for 1.3 (I suspect there are more follow ups on the connectors to make use of this feature).
          What do you think Tzu-Li (Gordon) Tai and Aljoscha Krettek?

          Show
          rmetzger Robert Metzger added a comment - I'm wondering whether this is really a blocker issue. I would like to get the 1.2 release out asap, and I think it would be okay to delay this for 1.3 (I suspect there are more follow ups on the connectors to make use of this feature). What do you think Tzu-Li (Gordon) Tai and Aljoscha Krettek ?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I agree that this isn't a blocker issue for 1.2 (I marked it as blocker in the first place only intended to say that it's blocking FLINK-4022).

          The "workaround" in FLINK-4341 can definitely wait as we don't seem to have users complaining about the behaviour, and for FLINK-4022 we can definitely go for a first-version with a similar approach to FLINK-4341 (if we want the Kafka partition discovery feature to make it into FLINK-4022, which I think is nice).

          Like you mentioned, we'd definitely need more follow-ups on the connectors to actually use this new idle element feature, which would definitely need thorough testing, and it might not be a good idea to slow down 1.2 release because of this.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I agree that this isn't a blocker issue for 1.2 (I marked it as blocker in the first place only intended to say that it's blocking FLINK-4022 ). The "workaround" in FLINK-4341 can definitely wait as we don't seem to have users complaining about the behaviour, and for FLINK-4022 we can definitely go for a first-version with a similar approach to FLINK-4341 (if we want the Kafka partition discovery feature to make it into FLINK-4022 , which I think is nice). Like you mentioned, we'd definitely need more follow-ups on the connectors to actually use this new idle element feature, which would definitely need thorough testing, and it might not be a good idea to slow down 1.2 release because of this.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I'm changing the priority of this issue from Blocker to Major.
          Aljoscha Krettek and I will still try to see if we can get this into 1.2 in time, but it's reasonable that this doesn't block the release process of 1.2.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I'm changing the priority of this issue from Blocker to Major . Aljoscha Krettek and I will still try to see if we can get this into 1.2 in time, but it's reasonable that this doesn't block the release process of 1.2.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2801

          Rebased on latest master.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2801 Rebased on latest master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2801

          @aljoscha and me had an offline discussion to let `StreamStatus` bypass the operators, instead of exposing them and have another `processStreamStatus(..)` method in the operator interface. This change is incorporated in the 1st commit.

          The second commit is some general improvements, including updating the Javadocs to match the new "operator bypass" implementation.

          Third commit adds a test to verify that watermarks generated by operators in a task's operator chain is ignored and not forwarded if the task is `IDLE`.

          @aljoscha this is ready for another review, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2801 @aljoscha and me had an offline discussion to let `StreamStatus` bypass the operators, instead of exposing them and have another `processStreamStatus(..)` method in the operator interface. This change is incorporated in the 1st commit. The second commit is some general improvements, including updating the Javadocs to match the new "operator bypass" implementation. Third commit adds a test to verify that watermarks generated by operators in a task's operator chain is ignored and not forwarded if the task is `IDLE`. @aljoscha this is ready for another review, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2801

          Thanks for your very good work! 👍 Could you please close this PR and the Jira issue, I just merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2801 Thanks for your very good work! 👍 Could you please close this PR and the Jira issue, I just merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2801

          @aljoscha thanks! closing this

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2801 @aljoscha thanks! closing this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/2801

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/2801
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/6630513

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development