Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API, Streaming
    • Labels:
      None

      Description

      This task comes after FLINK-5017, which adds the new element StreamStatus to be incorporated with watermark progression logic.

      This task tracks the implementation of source idleness awareness and status toggling in {{SourceFunction.SourceContext}}s.

      The source contexts should work on an "idle interval", where we determine the containing SourceStreamTask to be idle if no new records / watermarks have been collected in-between 2 continuous checks. (default value is 0. Letting this value be user configurable is tracked by FLINK-5018).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3347

          FLINK-5716 [streaming] Make StreamSourceContexts aware of source idleness

          This PR is the second part of adding `StreamStatus` to the streaming runtime to facilitate idle sources in Flink.

          It allows the `AutomaticWatermarkContext` for ingestion time and `ManualWatermarkContext` for event time to detect source idleness. It works based on a fixed-interval check implementation, where we check if the context has collected any records or watermarks from the source UDF in-between two checks. If no records or watermarks were collected in-between 2 checks, we toggle the `SourceStreamTask` to be `IDLE`. As soon as a record or watermark is collected afterwards, we toggle back to be `ACTIVE`.

          This idleness check is disabled by default. It is not yet configurable with this PR (that would be worked on as a separate issue FLINK-5018(https://issues.apache.org/jira/browse/FLINK-5018)).

          It also introduces a new user-exposed method to the `SourceFunction.SourceContext` interface:
          ```
          interface SourceFunction.SourceContext

          { ... void markAsTemporarilyIdle(); ... }

          ```

          The purpose of this method is to allow the source UDF to proactively mark the source as `IDLE` without waiting for the underlying interval checks. UDFs should make a best effort to call this. For example, the Kafka and Kinesis consumers source instances can call this as soon as they determine they will not have subscribed partitions on startup.

            1. Others

          Introduced an internal interface `StreamStatusMaintainer` -
          ```
          interface StreamStatusMaintainer extends StreamStatusProvider

          { void toggleStreamStatus(StreamStatus streamStatus); }

          ```

          Main reason for this is to keep the `StreamSource` operator unaccessible to the `OperatorChain`.

            1. Tests

          Added `StreamSourceContextIdleDetectionTests` to test the proposed functionality. I did not add task-level tests that uses `StreamTaskTestHarness` yet because the idle timeout is not yet configurable beyond the `SourceContext` interfaces. I propose to add tests with task harnesses along with FLINK-5018(https://issues.apache.org/jira/browse/FLINK-5018).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5716

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3347.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3347


          commit 66851ad987dedceb74475472d13a267101a95f61
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-02-16T18:43:44Z

          FLINK-5716 [streaming] Make StreamSourceContexts aware of source idleness


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3347 FLINK-5716 [streaming] Make StreamSourceContexts aware of source idleness This PR is the second part of adding `StreamStatus` to the streaming runtime to facilitate idle sources in Flink. It allows the `AutomaticWatermarkContext` for ingestion time and `ManualWatermarkContext` for event time to detect source idleness. It works based on a fixed-interval check implementation, where we check if the context has collected any records or watermarks from the source UDF in-between two checks. If no records or watermarks were collected in-between 2 checks, we toggle the `SourceStreamTask` to be `IDLE`. As soon as a record or watermark is collected afterwards, we toggle back to be `ACTIVE`. This idleness check is disabled by default. It is not yet configurable with this PR (that would be worked on as a separate issue FLINK-5018 ( https://issues.apache.org/jira/browse/FLINK-5018 )). It also introduces a new user-exposed method to the `SourceFunction.SourceContext` interface: ``` interface SourceFunction.SourceContext { ... void markAsTemporarilyIdle(); ... } ``` The purpose of this method is to allow the source UDF to proactively mark the source as `IDLE` without waiting for the underlying interval checks. UDFs should make a best effort to call this. For example, the Kafka and Kinesis consumers source instances can call this as soon as they determine they will not have subscribed partitions on startup. Others Introduced an internal interface `StreamStatusMaintainer` - ``` interface StreamStatusMaintainer extends StreamStatusProvider { void toggleStreamStatus(StreamStatus streamStatus); } ``` Main reason for this is to keep the `StreamSource` operator unaccessible to the `OperatorChain`. Tests Added `StreamSourceContextIdleDetectionTests` to test the proposed functionality. I did not add task-level tests that uses `StreamTaskTestHarness` yet because the idle timeout is not yet configurable beyond the `SourceContext` interfaces. I propose to add tests with task harnesses along with FLINK-5018 ( https://issues.apache.org/jira/browse/FLINK-5018 ). You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5716 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3347.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3347 commit 66851ad987dedceb74475472d13a267101a95f61 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-02-16T18:43:44Z FLINK-5716 [streaming] Make StreamSourceContexts aware of source idleness
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3347#discussion_r102431062

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java —
          @@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {

          • Streaming topologies can use timestamp assigner functions to override the timestamps
          • assigned here.
            */
          • private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
            + private static class ManualWatermarkContext<T> extends WatermarkContext<T> {
          • private final Object lock;
            private final Output<StreamRecord<T>> output;
            private final StreamRecord<T> reuse;
          • private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
          • this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
            + private ManualWatermarkContext(
            + final Output<StreamRecord<T>> output,
            + final ProcessingTimeService timeService,
            + final Object checkpointLock,
            + final StreamStatusMaintainer streamStatusMaintainer,
            + final long idleTimeout) { + + super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); this.reuse = new StreamRecord<>(null); }

          @Override
          + protected void processAndCollect(T element)

          { + output.collect(reuse.replace(element)); + }

          +
          + @Override
          + protected void processAndCollectWithTimestamp(T element, long timestamp)

          { + output.collect(reuse.replace(element, timestamp)); + }

          +
          + @Override
          + protected void processAndEmitWatermark(Watermark mark)

          { + output.emitWatermark(mark); + }

          +
          + @Override
          + protected boolean allowWatermark(Watermark mark)

          { + return true; + }

          + }
          +
          + /**
          + * An asbtract

          {@link SourceFunction.SourceContext}

          that should be used as the base for
          — End diff –

          Typo: `asbtract`

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3347#discussion_r102431062 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java — @@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) { Streaming topologies can use timestamp assigner functions to override the timestamps assigned here. */ private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { + private static class ManualWatermarkContext<T> extends WatermarkContext<T> { private final Object lock; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) { this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + private ManualWatermarkContext( + final Output<StreamRecord<T>> output, + final ProcessingTimeService timeService, + final Object checkpointLock, + final StreamStatusMaintainer streamStatusMaintainer, + final long idleTimeout) { + + super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); this.reuse = new StreamRecord<>(null); } @Override + protected void processAndCollect(T element) { + output.collect(reuse.replace(element)); + } + + @Override + protected void processAndCollectWithTimestamp(T element, long timestamp) { + output.collect(reuse.replace(element, timestamp)); + } + + @Override + protected void processAndEmitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + protected boolean allowWatermark(Watermark mark) { + return true; + } + } + + /** + * An asbtract {@link SourceFunction.SourceContext} that should be used as the base for — End diff – Typo: `asbtract`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3347

          @tzulitai Could you please go ahead and merge?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3347 @tzulitai Could you please go ahead and merge?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3347

          Thanks @aljoscha for the review! Will fix the typo and merge this to `master`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3347 Thanks @aljoscha for the review! Will fix the typo and merge this to `master`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3347

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3347
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/b0f0f37

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development