Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4975

Add a limit for how much data may be buffered during checkpoint alignment

    Details

      Description

      During checkpoint alignment, data may be buffered/spilled.

      We should introduce an upper limit for the spilled data volume. After exceeding that limit, the checkpoint alignment should abort and the checkpoint be canceled.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/2754

          FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment

          In corner case situations, checkpoint alignment can take very long and buffer/spill a lot of data. This PR introduces setting a limit to how much data may be buffered during alignments. If that volume is exceeded, the checkpoint will abort.

          While these overly large alignment situation should not occur in a healthy environment, it is an important safety net to have.

          This Pull Request consists of three parts:

              1. Introduce Cancellation Barriers

          These Cancellation Barriers are like checkpoint barriers, flowing with the data, but signalling that a checkpoint should be aborted rather that the position of that stream in the checkpoint.

          This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that these Cancellation Barriers are correctly interpreted and interplay well with other situations of alignment starts and cancellations (such as when newer barriers come early).

              1. Adjust and Checkpoint Coordinator

          Tasks emit cancellation barriers whenever they cannot start a checkpoint or whenever a checkpoint alignment was canceled. That lets downstream tasks know earlier that they should stop the alignment for that checkpoint, because it will not be able to complete.

          Tasks also explicitly send "decline" messages to the checkpoint coordinator for checkpoints they "skipped" due to alignment being cancelled or superseded.

          Previously the assumptions were:

          • When a Source Task cannot start a checkpoint, a new checkpoint must be triggered immediately, to dissolve any started downstream alignments that otherwise would not be able to complete.
          • Whenever an alignment is aborted by a newer checkpoint barrier coming in, that newer barrier will eventually reach the downstream task and break outdated pending alignments. The cancellation barrier will not break the outdated alignment earlier.
              1. Alignment Size Limit

          When the `BarrierBuffer` has buffered more than a certain number of bytes, it aborts the alignment and signals the Task that the checkpoint was aborted. The Task sends a cancellation barrier for that checkpoint downstream, to signal the downstream tasks that they should not wait for a proper barrier.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink checkpoint_cancellation

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2754.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2754


          commit 9837f551e58c7d7d40b85e3ae2292f14be9d73e4
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-10-23T16:41:32Z

          FLINK-4984 [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints

          commit 844d15c6bd50f7d4e5449ba6958e404685c6eb59
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-02T21:34:59Z

          FLINK-4985 [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator

          commit 3b922bb6ec3b798042c265c4d49d4d5dad940759
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-03T14:28:15Z

          FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment.

          If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2754 FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment In corner case situations, checkpoint alignment can take very long and buffer/spill a lot of data. This PR introduces setting a limit to how much data may be buffered during alignments. If that volume is exceeded, the checkpoint will abort. While these overly large alignment situation should not occur in a healthy environment, it is an important safety net to have. This Pull Request consists of three parts: Introduce Cancellation Barriers These Cancellation Barriers are like checkpoint barriers, flowing with the data, but signalling that a checkpoint should be aborted rather that the position of that stream in the checkpoint. This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that these Cancellation Barriers are correctly interpreted and interplay well with other situations of alignment starts and cancellations (such as when newer barriers come early). Adjust and Checkpoint Coordinator Tasks emit cancellation barriers whenever they cannot start a checkpoint or whenever a checkpoint alignment was canceled. That lets downstream tasks know earlier that they should stop the alignment for that checkpoint, because it will not be able to complete. Tasks also explicitly send "decline" messages to the checkpoint coordinator for checkpoints they "skipped" due to alignment being cancelled or superseded. Previously the assumptions were: When a Source Task cannot start a checkpoint, a new checkpoint must be triggered immediately, to dissolve any started downstream alignments that otherwise would not be able to complete. Whenever an alignment is aborted by a newer checkpoint barrier coming in, that newer barrier will eventually reach the downstream task and break outdated pending alignments. The cancellation barrier will not break the outdated alignment earlier. Alignment Size Limit When the `BarrierBuffer` has buffered more than a certain number of bytes, it aborts the alignment and signals the Task that the checkpoint was aborted. The Task sends a cancellation barrier for that checkpoint downstream, to signal the downstream tasks that they should not wait for a proper barrier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink checkpoint_cancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2754 commit 9837f551e58c7d7d40b85e3ae2292f14be9d73e4 Author: Stephan Ewen <sewen@apache.org> Date: 2016-10-23T16:41:32Z FLINK-4984 [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints commit 844d15c6bd50f7d4e5449ba6958e404685c6eb59 Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-02T21:34:59Z FLINK-4985 [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator commit 3b922bb6ec3b798042c265c4d49d4d5dad940759 Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-03T14:28:15Z FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment. If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2754

          If no one has any objection to this, I would merge it by tomorrow...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2754 If no one has any objection to this, I would merge it by tomorrow...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87013382

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java —
          @@ -135,19 +138,20 @@ else if (next.getEvent().getClass() == CheckpointBarrier.class)

          { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); }

          }
          + else if (next.getEvent().getClass() == CancelCheckpointMarker.class)

          { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + }

          else {
          if (next.getEvent().getClass() == EndOfPartitionEvent.class)

          { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); }

          return next;
          }
          }
          else if (!endOfStream) {
          // end of input stream. stream continues with the buffered data
          endOfStream = true;

          • releaseBlocks();
            + releaseBlocksAndResetBarriers();
            return getNextNonBlocked();
              • End diff –

          Can't we replace the tail recursive call by staying in the `while` loop?

          The same could then apply to the other tail recursive call after calling `completeBufferedSequence` (replacing the recursive call with a `continue`).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87013382 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java — @@ -135,19 +138,20 @@ else if (next.getEvent().getClass() == CheckpointBarrier.class) { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); } } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); } return next; } } else if (!endOfStream) { // end of input stream. stream continues with the buffered data endOfStream = true; releaseBlocks(); + releaseBlocksAndResetBarriers(); return getNextNonBlocked(); End diff – Can't we replace the tail recursive call by staying in the `while` loop? The same could then apply to the other tail recursive call after calling `completeBufferedSequence` (replacing the recursive call with a `continue`).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87016959

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java —
          @@ -418,5 +422,16 @@ public void cleanup() throws IOException

          { throw new IOException("Cannot remove temp file for stream alignment writer"); }

          }
          +
          + /**
          + * Gets the size of this spilled sequence.
          + */
          + public long size() throws IOException {
          + if (fileChannel.isOpen()) {
          — End diff –

          Just saw this while backporting - this should refer to the `size` field.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87016959 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java — @@ -418,5 +422,16 @@ public void cleanup() throws IOException { throw new IOException("Cannot remove temp file for stream alignment writer"); } } + + /** + * Gets the size of this spilled sequence. + */ + public long size() throws IOException { + if (fileChannel.isOpen()) { — End diff – Just saw this while backporting - this should refer to the `size` field.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87017118

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java —
          @@ -31,44 +34,44 @@

          private static final long serialVersionUID = 2094094662279578953L;

          • /** The timestamp associated with the checkpoint */
          • private final long timestamp;
            + /** The reason why the checkpoint was declined */
            + private final Throwable reason;
          • public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
          • super(job, taskExecutionId, checkpointId);
          • this.timestamp = timestamp;
            + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { + this(job, taskExecutionId, checkpointId, null); }
          • // --------------------------------------------------------------------------------------------
            -
          • public long getTimestamp() {
          • return timestamp;
            + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) {
            + super(job, taskExecutionId, checkpointId);
            +
            + if (reason == null ||
            + reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class ||
              • End diff –

          Noticed during backporting: This misses other known exceptions

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87017118 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java — @@ -31,44 +34,44 @@ private static final long serialVersionUID = 2094094662279578953L; /** The timestamp associated with the checkpoint */ private final long timestamp; + /** The reason why the checkpoint was declined */ + private final Throwable reason; public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) { super(job, taskExecutionId, checkpointId); this.timestamp = timestamp; + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { + this(job, taskExecutionId, checkpointId, null); } // -------------------------------------------------------------------------------------------- - public long getTimestamp() { return timestamp; + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) { + super(job, taskExecutionId, checkpointId); + + if (reason == null || + reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class || End diff – Noticed during backporting: This misses other known exceptions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87017313

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) {
          }

          /**

          • * Receives a {@link DeclineCheckpoint} message and returns whether the
            - * message was associated with a pending checkpoint.
            + * Receives a {@link DeclineCheckpoint}

            message for a pending checkpoint.
            *

          • @param message Checkpoint decline from the task manager
          • *
          • * @return Flag indicating whether the declined checkpoint was associated
          • * with a pending checkpoint.
            */
          • public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
            + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
            if (shutdown || message == null) { - return false; + return; }

            if (!job.equals(message.getJob())) {

          • LOG.error("Received DeclineCheckpoint message for wrong job: {}", message);
          • return false;
            + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
            + message.getJob() + " while this coordinator handles job " + job);
            }

          final long checkpointId = message.getCheckpointId();
          + final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");

          PendingCheckpoint checkpoint;

          • // Flag indicating whether the ack message was for a known pending
          • // checkpoint.
          • boolean isPendingCheckpoint;
            -
            synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) { - return false; + return; }

          checkpoint = pendingCheckpoints.get(checkpointId);

          if (checkpoint != null && !checkpoint.isDiscarded()) {

          • isPendingCheckpoint = true;
            -
          • LOG.info("Discarding checkpoint " + checkpointId
          • + " because of checkpoint decline from task " + message.getTaskExecutionId());
            + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " +
            + message.getTaskExecutionId() + " : " + reason);

          pendingCheckpoints.remove(checkpointId);
          checkpoint.abortDeclined();
          rememberRecentCheckpointId(checkpointId);

          • boolean haveMoreRecentPending = false;
            -
          • for (PendingCheckpoint p : pendingCheckpoints.values()) {
          • if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { - haveMoreRecentPending = true; - break; - }
          • }
          • if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); - }

            else if (!haveMoreRecentPending)

            { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); - triggerQueuedRequests(); - }
          • } else if (checkpoint != null) {
            + // we don't have to schedule another "dissolving" checkpoint any more because the
            + // cancellation barriers take care of breaking downstream alignments
            + // we only need to make sure that suspended queued requests are resumed
            + triggerQueuedRequests();
              • End diff –

          Noticed during backporting: This should only happen if no newer pending checkpoint exists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87017313 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) { } /** * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * @param message Checkpoint decline from the task manager * * @return Flag indicating whether the declined checkpoint was associated * with a pending checkpoint. */ public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; // Flag indicating whether the ack message was for a known pending // checkpoint. boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; - LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + + message.getTaskExecutionId() + " : " + reason); pendingCheckpoints.remove(checkpointId); checkpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); boolean haveMoreRecentPending = false; - for (PendingCheckpoint p : pendingCheckpoints.values()) { if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { - haveMoreRecentPending = true; - break; - } } if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); - } else if (!haveMoreRecentPending) { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); - triggerQueuedRequests(); - } } else if (checkpoint != null) { + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + triggerQueuedRequests(); End diff – Noticed during backporting: This should only happen if no newer pending checkpoint exists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87019713

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java —
          @@ -254,8 +357,20 @@ public void cleanup() throws IOException {
          for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered)

          { seq.cleanup(); }

          + queuedBuffered.clear();
          }

          • +
            + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
            + currentCheckpointId = checkpointId;
            + onBarrier(channelIndex);
            +
            + startOfAlignmentTimestamp = System.nanoTime();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Starting stream alignment for checkpoint " + checkpointId);

              • End diff –

          Does not make a difference since it's guarded but IntelliJ warns because the logging statement does not use a parameterized debug message with `{}`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87019713 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java — @@ -254,8 +357,20 @@ public void cleanup() throws IOException { for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) { seq.cleanup(); } + queuedBuffered.clear(); } + + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { + currentCheckpointId = checkpointId; + onBarrier(channelIndex); + + startOfAlignmentTimestamp = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Starting stream alignment for checkpoint " + checkpointId); End diff – Does not make a difference since it's guarded but IntelliJ warns because the logging statement does not use a parameterized debug message with `{}`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87028376

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java —
          @@ -254,8 +357,20 @@ public void cleanup() throws IOException {
          for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered)

          { seq.cleanup(); }

          + queuedBuffered.clear();
          }

          • +
            + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
            + currentCheckpointId = checkpointId;
            + onBarrier(channelIndex);
            +
            + startOfAlignmentTimestamp = System.nanoTime();
            +
            + if (LOG.isDebugEnabled()) {
            + LOG.debug("Starting stream alignment for checkpoint " + checkpointId);

              • End diff –

          When guarded this here is actually the more efficient pattern. So, i'd lean towards ignoring/deactivating that warning.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87028376 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java — @@ -254,8 +357,20 @@ public void cleanup() throws IOException { for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) { seq.cleanup(); } + queuedBuffered.clear(); } + + private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException { + currentCheckpointId = checkpointId; + onBarrier(channelIndex); + + startOfAlignmentTimestamp = System.nanoTime(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Starting stream alignment for checkpoint " + checkpointId); End diff – When guarded this here is actually the more efficient pattern. So, i'd lean towards ignoring/deactivating that warning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87030844

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java —
          @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
          }

          @Test

          • public void testEndOfStreamWhileCheckpoint() {
            + public void testEndOfStreamWhileCheckpoint() throws Exception
            Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered}

            +
            + @Test
            + public void testSingleChannelAbortCheckpoint() throws Exception {
            + BufferOrEvent[] sequence =

            { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }

            ;
            +
            + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
            + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
            +
            + StatefulTask toNotify = mock(StatefulTask.class);
            + buffer.registerCheckpointEventHandler(toNotify);
            +
            + check(sequence[0], buffer.getNextNonBlocked());
            + check(sequence[2], buffer.getNextNonBlocked());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[6], buffer.getNextNonBlocked());
            + assertEquals(5L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[8], buffer.getNextNonBlocked());
            + assertEquals(6L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
            + assertEquals(0L, buffer.getAlignmentDurationNanos());

          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testMultiChannelAbortCheckpoint() throws Exception {
          + BufferOrEvent[] sequence =

          { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + // successful first checkpoint, with some aligned buffers
          + check(sequence[0], buffer.getNextNonBlocked());
          + check(sequence[1], buffer.getNextNonBlocked());
          + check(sequence[2], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[5], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[6], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          + check(sequence[9], buffer.getNextNonBlocked());
          +
          + // canceled checkpoint on last barrier
          + startTs = System.nanoTime();
          + check(sequence[12], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[13], buffer.getNextNonBlocked());
          +
          + // one more successful checkpoint
          + check(sequence[15], buffer.getNextNonBlocked());
          + check(sequence[16], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[20], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[21], buffer.getNextNonBlocked());
          +
          + // this checkpoint gets immediately canceled
          + check(sequence[24], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // some buffers
          + check(sequence[26], buffer.getNextNonBlocked());
          + check(sequence[27], buffer.getNextNonBlocked());
          + check(sequence[28], buffer.getNextNonBlocked());
          +
          + // a simple successful checkpoint
          + startTs = System.nanoTime();
          + check(sequence[32], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[33], buffer.getNextNonBlocked());
          +
          + check(sequence[37], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testAbortViaQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence = {
          + // starting a checkpoint
          + /* 0 */ createBuffer(1),
          + /* 1 */ createBarrier(1, 1), createBarrier(1, 2),
          + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
          +
          + // queued barrier and cancellation barrier
          + /* 6 */ createCancellationBarrier(2, 2),
          + /* 7 */ createBarrier(2, 1),
          +
          + // some intermediate buffers (some queued)
          + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
          +
          + // complete initial checkpoint
          + /* 11 */ createBarrier(1, 0),
          +
          + // some buffers (none queued, since checkpoint is aborted)
          + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
          +
          + // final barrier of aborted checkpoint
          + /* 15 */ createBarrier(1, 2),
          — End diff –

          Isn't this a barrier of the initial checkpoint which has been completed successfully? Just wondering because of the comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87030844 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java — @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() { } @Test public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered} + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [6] , buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [8] , buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [1] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [5] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [6] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + check(sequence [9] , buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence [12] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence [13] , buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence [15] , buffer.getNextNonBlocked()); + check(sequence [16] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [20] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence [21] , buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence [24] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence [26] , buffer.getNextNonBlocked()); + check(sequence [27] , buffer.getNextNonBlocked()); + check(sequence [28] , buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence [32] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence [33] , buffer.getNextNonBlocked()); + + check(sequence [37] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), — End diff – Isn't this a barrier of the initial checkpoint which has been completed successfully? Just wondering because of the comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87033063

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java —
          @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
          }

          @Test

          • public void testEndOfStreamWhileCheckpoint() {
            + public void testEndOfStreamWhileCheckpoint() throws Exception
            Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered}

            +
            + @Test
            + public void testSingleChannelAbortCheckpoint() throws Exception {
            + BufferOrEvent[] sequence =

            { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }

            ;
            +
            + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
            + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
            +
            + StatefulTask toNotify = mock(StatefulTask.class);
            + buffer.registerCheckpointEventHandler(toNotify);
            +
            + check(sequence[0], buffer.getNextNonBlocked());
            + check(sequence[2], buffer.getNextNonBlocked());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[6], buffer.getNextNonBlocked());
            + assertEquals(5L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[8], buffer.getNextNonBlocked());
            + assertEquals(6L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
            + assertEquals(0L, buffer.getAlignmentDurationNanos());

          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testMultiChannelAbortCheckpoint() throws Exception {
          + BufferOrEvent[] sequence =

          { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + // successful first checkpoint, with some aligned buffers
          + check(sequence[0], buffer.getNextNonBlocked());
          + check(sequence[1], buffer.getNextNonBlocked());
          + check(sequence[2], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[5], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[6], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          + check(sequence[9], buffer.getNextNonBlocked());
          +
          + // canceled checkpoint on last barrier
          + startTs = System.nanoTime();
          + check(sequence[12], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[13], buffer.getNextNonBlocked());
          +
          + // one more successful checkpoint
          + check(sequence[15], buffer.getNextNonBlocked());
          + check(sequence[16], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[20], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[21], buffer.getNextNonBlocked());
          +
          + // this checkpoint gets immediately canceled
          + check(sequence[24], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // some buffers
          + check(sequence[26], buffer.getNextNonBlocked());
          + check(sequence[27], buffer.getNextNonBlocked());
          + check(sequence[28], buffer.getNextNonBlocked());
          +
          + // a simple successful checkpoint
          + startTs = System.nanoTime();
          + check(sequence[32], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[33], buffer.getNextNonBlocked());
          +
          + check(sequence[37], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testAbortViaQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence =

          { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + check(sequence[0], buffer.getNextNonBlocked());
          +
          + // starting first checkpoint
          + startTs = System.nanoTime();
          + check(sequence[4], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          +
          + // finished first checkpoint
          + check(sequence[3], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[5], buffer.getNextNonBlocked());
          +
          + // re-read the queued cancellation barriers
          + check(sequence[9], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + check(sequence[10], buffer.getNextNonBlocked());
          + check(sequence[12], buffer.getNextNonBlocked());
          + check(sequence[13], buffer.getNextNonBlocked());
          + check(sequence[14], buffer.getNextNonBlocked());
          +
          + check(sequence[16], buffer.getNextNonBlocked());
          + check(sequence[17], buffer.getNextNonBlocked());
          + check(sequence[18], buffer.getNextNonBlocked());
          +
          + // no further alignment should have happened
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // no further checkpoint (abort) notifications
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + /**
          + * This tests the where a replay of queued checkpoint barriers meets
          + * a canceled checkpoint.
          + *
          + * The replayed newer checkpoint barrier must not try to cancel the
          + * already canceled checkpoint.
          + */
          + @Test
          + public void testAbortWhileHavingQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence =

          { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + check(sequence[0], buffer.getNextNonBlocked());
          +
          + // starting first checkpoint
          + startTs = System.nanoTime();
          + check(sequence[2], buffer.getNextNonBlocked());
          + check(sequence[3], buffer.getNextNonBlocked());
          + check(sequence[6], buffer.getNextNonBlocked());
          +
          + // cancelled by cancellation barrier
          + check(sequence[4], buffer.getNextNonBlocked());
          + validateAlignmentTime(startTs, buffer);
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
          +
          + // the next checkpoint alignment starts now
          + startTs = System.nanoTime();
          + check(sequence[9], buffer.getNextNonBlocked());
          + check(sequence[11], buffer.getNextNonBlocked());
          + check(sequence[13], buffer.getNextNonBlocked());
          + check(sequence[15], buffer.getNextNonBlocked());
          +
          + // checkpoint done
          + check(sequence[7], buffer.getNextNonBlocked());
          + validateAlignmentTime(startTs, buffer);
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
          +
          + // queued data
          + check(sequence[10], buffer.getNextNonBlocked());
          + check(sequence[14], buffer.getNextNonBlocked());
          +
          + // trailing data
          + check(sequence[18], buffer.getNextNonBlocked());
          + check(sequence[19], buffer.getNextNonBlocked());
          + check(sequence[20], buffer.getNextNonBlocked());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          +
          + // check overall notifications
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
          + }
          +
          + /**
          + * This tests the where a cancellation barrier is received for a checkpoint already
          + * canceled due to receiving a newer checkpoint barrier.
          + */
          + @Test
          + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
          + BufferOrEvent[] sequence = {
          + // starting a checkpoint
          + /* 0 */ createBuffer(2),
          + /* 1 */ createBarrier(3, 1), createBarrier(3, 0),
          + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2),
          +
          + // newer checkpoint barrier cancels/subsumes pending checkpoint
          + /* 6 */ createBarrier(5, 2),
          — End diff –

          It could happen so far, if one source was not ready and the others were.

          I think good design is that every component in itself is as robust as it can be. Chances are good it can then compensate for other unexpected behavior (like an RPC message getting lost).

          I guess that the above philosophy is the reason why the alignment code has so far worked pretty well, despite the not trivial problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87033063 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java — @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() { } @Test public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered} + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [6] , buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [8] , buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [1] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [5] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [6] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + check(sequence [9] , buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence [12] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence [13] , buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence [15] , buffer.getNextNonBlocked()); + check(sequence [16] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [20] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence [21] , buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence [24] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence [26] , buffer.getNextNonBlocked()); + check(sequence [27] , buffer.getNextNonBlocked()); + check(sequence [28] , buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence [32] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence [33] , buffer.getNextNonBlocked()); + + check(sequence [37] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence [0] , buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence [4] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + + // finished first checkpoint + check(sequence [3] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [5] , buffer.getNextNonBlocked()); + + // re-read the queued cancellation barriers + check(sequence [9] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [10] , buffer.getNextNonBlocked()); + check(sequence [12] , buffer.getNextNonBlocked()); + check(sequence [13] , buffer.getNextNonBlocked()); + check(sequence [14] , buffer.getNextNonBlocked()); + + check(sequence [16] , buffer.getNextNonBlocked()); + check(sequence [17] , buffer.getNextNonBlocked()); + check(sequence [18] , buffer.getNextNonBlocked()); + + // no further alignment should have happened + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // no further checkpoint (abort) notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the where a replay of queued checkpoint barriers meets + * a canceled checkpoint. + * + * The replayed newer checkpoint barrier must not try to cancel the + * already canceled checkpoint. + */ + @Test + public void testAbortWhileHavingQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence [0] , buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence [2] , buffer.getNextNonBlocked()); + check(sequence [3] , buffer.getNextNonBlocked()); + check(sequence [6] , buffer.getNextNonBlocked()); + + // cancelled by cancellation barrier + check(sequence [4] , buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + + // the next checkpoint alignment starts now + startTs = System.nanoTime(); + check(sequence [9] , buffer.getNextNonBlocked()); + check(sequence [11] , buffer.getNextNonBlocked()); + check(sequence [13] , buffer.getNextNonBlocked()); + check(sequence [15] , buffer.getNextNonBlocked()); + + // checkpoint done + check(sequence [7] , buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + + // queued data + check(sequence [10] , buffer.getNextNonBlocked()); + check(sequence [14] , buffer.getNextNonBlocked()); + + // trailing data + check(sequence [18] , buffer.getNextNonBlocked()); + check(sequence [19] , buffer.getNextNonBlocked()); + check(sequence [20] , buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + } + + /** + * This tests the where a cancellation barrier is received for a checkpoint already + * canceled due to receiving a newer checkpoint barrier. + */ + @Test + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(2), + /* 1 */ createBarrier(3, 1), createBarrier(3, 0), + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // newer checkpoint barrier cancels/subsumes pending checkpoint + /* 6 */ createBarrier(5, 2), — End diff – It could happen so far, if one source was not ready and the others were. I think good design is that every component in itself is as robust as it can be. Chances are good it can then compensate for other unexpected behavior (like an RPC message getting lost). I guess that the above philosophy is the reason why the alignment code has so far worked pretty well, despite the not trivial problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87034115

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) {
          }

          /**

          • * Receives a {@link DeclineCheckpoint} message and returns whether the
            - * message was associated with a pending checkpoint.
            + * Receives a {@link DeclineCheckpoint}

            message for a pending checkpoint.
            *

          • @param message Checkpoint decline from the task manager
          • *
          • * @return Flag indicating whether the declined checkpoint was associated
          • * with a pending checkpoint.
            */
          • public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
            + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
            if (shutdown || message == null) { - return false; + return; }

            if (!job.equals(message.getJob())) {

          • LOG.error("Received DeclineCheckpoint message for wrong job: {}", message);
          • return false;
            + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
            + message.getJob() + " while this coordinator handles job " + job);
            }

          final long checkpointId = message.getCheckpointId();
          + final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");

          PendingCheckpoint checkpoint;

          • // Flag indicating whether the ack message was for a known pending
          • // checkpoint.
          • boolean isPendingCheckpoint;
            -
            synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) { - return false; + return; }

          checkpoint = pendingCheckpoints.get(checkpointId);

          if (checkpoint != null && !checkpoint.isDiscarded()) {

          • isPendingCheckpoint = true;
            -
          • LOG.info("Discarding checkpoint " + checkpointId
          • + " because of checkpoint decline from task " + message.getTaskExecutionId());
            + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " +
              • End diff –

          Parameterized logging statement using `{}` would be better.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87034115 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) { } /** * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * @param message Checkpoint decline from the task manager * * @return Flag indicating whether the declined checkpoint was associated * with a pending checkpoint. */ public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; // Flag indicating whether the ack message was for a known pending // checkpoint. boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; - LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint " + checkpointId + " because of checkpoint decline from task " + End diff – Parameterized logging statement using `{}` would be better.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87035590

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java —
          @@ -0,0 +1,32 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.checkpoint.decline;
          +
          +/**
          + * Exception indicating that a checkpoint was declined because a task was not
          + * ready to perform a checkpoint.
          — End diff –

          JavaDocs and exception message seem to diverge a little bit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87035590 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java — @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task was not + * ready to perform a checkpoint. — End diff – JavaDocs and exception message seem to diverge a little bit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87035779

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java —
          @@ -0,0 +1,32 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.checkpoint.decline;
          +
          +/**
          + * Exception indicating that a checkpoint was declined because a task was not
          + * ready to perform a checkpoint.
          — End diff –

          JavaDocs do not fit class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87035779 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java — @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task was not + * ready to perform a checkpoint. — End diff – JavaDocs do not fit class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87035938

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java —
          @@ -69,7 +69,7 @@

          • @param checkpointId The ID of the checkpoint to be aborted.
              • End diff –

          New method parameter not added to JavaDocs

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87035938 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java — @@ -69,7 +69,7 @@ @param checkpointId The ID of the checkpoint to be aborted. End diff – New method parameter not added to JavaDocs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87039138

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java —
          @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
          }

          @Test

          • public void testEndOfStreamWhileCheckpoint() {
            + public void testEndOfStreamWhileCheckpoint() throws Exception
            Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered}

            +
            + @Test
            + public void testSingleChannelAbortCheckpoint() throws Exception {
            + BufferOrEvent[] sequence =

            { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }

            ;
            +
            + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
            + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
            +
            + StatefulTask toNotify = mock(StatefulTask.class);
            + buffer.registerCheckpointEventHandler(toNotify);
            +
            + check(sequence[0], buffer.getNextNonBlocked());
            + check(sequence[2], buffer.getNextNonBlocked());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[6], buffer.getNextNonBlocked());
            + assertEquals(5L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[8], buffer.getNextNonBlocked());
            + assertEquals(6L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
            + assertEquals(0L, buffer.getAlignmentDurationNanos());

          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testMultiChannelAbortCheckpoint() throws Exception {
          + BufferOrEvent[] sequence =

          { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + // successful first checkpoint, with some aligned buffers
          + check(sequence[0], buffer.getNextNonBlocked());
          + check(sequence[1], buffer.getNextNonBlocked());
          + check(sequence[2], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[5], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[6], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          + check(sequence[9], buffer.getNextNonBlocked());
          +
          + // canceled checkpoint on last barrier
          + startTs = System.nanoTime();
          + check(sequence[12], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[13], buffer.getNextNonBlocked());
          +
          + // one more successful checkpoint
          + check(sequence[15], buffer.getNextNonBlocked());
          + check(sequence[16], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[20], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[21], buffer.getNextNonBlocked());
          +
          + // this checkpoint gets immediately canceled
          + check(sequence[24], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // some buffers
          + check(sequence[26], buffer.getNextNonBlocked());
          + check(sequence[27], buffer.getNextNonBlocked());
          + check(sequence[28], buffer.getNextNonBlocked());
          +
          + // a simple successful checkpoint
          + startTs = System.nanoTime();
          + check(sequence[32], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[33], buffer.getNextNonBlocked());
          +
          + check(sequence[37], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testAbortViaQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence = {
          + // starting a checkpoint
          + /* 0 */ createBuffer(1),
          + /* 1 */ createBarrier(1, 1), createBarrier(1, 2),
          + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
          +
          + // queued barrier and cancellation barrier
          + /* 6 */ createCancellationBarrier(2, 2),
          + /* 7 */ createBarrier(2, 1),
          +
          + // some intermediate buffers (some queued)
          + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
          +
          + // complete initial checkpoint
          + /* 11 */ createBarrier(1, 0),
          +
          + // some buffers (none queued, since checkpoint is aborted)
          + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
          +
          + // final barrier of aborted checkpoint
          + /* 15 */ createBarrier(1, 2),
          — End diff –

          Both late and aborted barriers are handled in the same code path, so the test worked despite that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87039138 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java — @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() { } @Test public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered} + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [6] , buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [8] , buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [1] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [5] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [6] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + check(sequence [9] , buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence [12] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence [13] , buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence [15] , buffer.getNextNonBlocked()); + check(sequence [16] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [20] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence [21] , buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence [24] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence [26] , buffer.getNextNonBlocked()); + check(sequence [27] , buffer.getNextNonBlocked()); + check(sequence [28] , buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence [32] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence [33] , buffer.getNextNonBlocked()); + + check(sequence [37] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), — End diff – Both late and aborted barriers are handled in the same code path, so the test worked despite that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87039301

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java —
          @@ -0,0 +1,33 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.checkpoint.decline;
          +
          +/**
          + * Exception indicating that a checkpoint was declined because a task was not
          + * ready to perform a checkpoint.
          — End diff –

          JavaDocs

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87039301 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java — @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task was not + * ready to perform a checkpoint. — End diff – JavaDocs
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2754

          Really good changes @StephanEwen. With the safeguard against spilling too much data while aligning the checkpoint barriers Flink will be more robust and deliver a better user experience in large scale situations.

          I think the code is in a really good shape and the changes are well tested. Thus +1 for merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2754 Really good changes @StephanEwen. With the safeguard against spilling too much data while aligning the checkpoint barriers Flink will be more robust and deliver a better user experience in large scale situations. I think the code is in a really good shape and the changes are well tested. Thus +1 for merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87043110

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java —
          @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
          }

          @Test

          • public void testEndOfStreamWhileCheckpoint() {
            + public void testEndOfStreamWhileCheckpoint() throws Exception
            Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered}

            +
            + @Test
            + public void testSingleChannelAbortCheckpoint() throws Exception {
            + BufferOrEvent[] sequence =

            { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }

            ;
            +
            + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
            + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
            +
            + StatefulTask toNotify = mock(StatefulTask.class);
            + buffer.registerCheckpointEventHandler(toNotify);
            +
            + check(sequence[0], buffer.getNextNonBlocked());
            + check(sequence[2], buffer.getNextNonBlocked());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[6], buffer.getNextNonBlocked());
            + assertEquals(5L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
            + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
            + assertEquals(0L, buffer.getAlignmentDurationNanos());
            +
            + check(sequence[8], buffer.getNextNonBlocked());
            + assertEquals(6L, buffer.getCurrentCheckpointId());
            + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
            + assertEquals(0L, buffer.getAlignmentDurationNanos());

          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testMultiChannelAbortCheckpoint() throws Exception {
          + BufferOrEvent[] sequence =

          { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + // successful first checkpoint, with some aligned buffers
          + check(sequence[0], buffer.getNextNonBlocked());
          + check(sequence[1], buffer.getNextNonBlocked());
          + check(sequence[2], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[5], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[6], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          + check(sequence[9], buffer.getNextNonBlocked());
          +
          + // canceled checkpoint on last barrier
          + startTs = System.nanoTime();
          + check(sequence[12], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[13], buffer.getNextNonBlocked());
          +
          + // one more successful checkpoint
          + check(sequence[15], buffer.getNextNonBlocked());
          + check(sequence[16], buffer.getNextNonBlocked());
          + startTs = System.nanoTime();
          + check(sequence[20], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[21], buffer.getNextNonBlocked());
          +
          + // this checkpoint gets immediately canceled
          + check(sequence[24], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // some buffers
          + check(sequence[26], buffer.getNextNonBlocked());
          + check(sequence[27], buffer.getNextNonBlocked());
          + check(sequence[28], buffer.getNextNonBlocked());
          +
          + // a simple successful checkpoint
          + startTs = System.nanoTime();
          + check(sequence[32], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
          + validateAlignmentTime(startTs, buffer);
          + check(sequence[33], buffer.getNextNonBlocked());
          +
          + check(sequence[37], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + @Test
          + public void testAbortViaQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence =

          { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + check(sequence[0], buffer.getNextNonBlocked());
          +
          + // starting first checkpoint
          + startTs = System.nanoTime();
          + check(sequence[4], buffer.getNextNonBlocked());
          + check(sequence[8], buffer.getNextNonBlocked());
          +
          + // finished first checkpoint
          + check(sequence[3], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
          + validateAlignmentTime(startTs, buffer);
          +
          + check(sequence[5], buffer.getNextNonBlocked());
          +
          + // re-read the queued cancellation barriers
          + check(sequence[9], buffer.getNextNonBlocked());
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + check(sequence[10], buffer.getNextNonBlocked());
          + check(sequence[12], buffer.getNextNonBlocked());
          + check(sequence[13], buffer.getNextNonBlocked());
          + check(sequence[14], buffer.getNextNonBlocked());
          +
          + check(sequence[16], buffer.getNextNonBlocked());
          + check(sequence[17], buffer.getNextNonBlocked());
          + check(sequence[18], buffer.getNextNonBlocked());
          +
          + // no further alignment should have happened
          + assertEquals(0L, buffer.getAlignmentDurationNanos());
          +
          + // no further checkpoint (abort) notifications
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          + }
          +
          + /**
          + * This tests the where a replay of queued checkpoint barriers meets
          + * a canceled checkpoint.
          + *
          + * The replayed newer checkpoint barrier must not try to cancel the
          + * already canceled checkpoint.
          + */
          + @Test
          + public void testAbortWhileHavingQueuedBarriers() throws Exception {
          + BufferOrEvent[] sequence =

          { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + }

          ;
          +
          + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
          + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
          +
          + StatefulTask toNotify = mock(StatefulTask.class);
          + buffer.registerCheckpointEventHandler(toNotify);
          +
          + long startTs;
          +
          + check(sequence[0], buffer.getNextNonBlocked());
          +
          + // starting first checkpoint
          + startTs = System.nanoTime();
          + check(sequence[2], buffer.getNextNonBlocked());
          + check(sequence[3], buffer.getNextNonBlocked());
          + check(sequence[6], buffer.getNextNonBlocked());
          +
          + // cancelled by cancellation barrier
          + check(sequence[4], buffer.getNextNonBlocked());
          + validateAlignmentTime(startTs, buffer);
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
          +
          + // the next checkpoint alignment starts now
          + startTs = System.nanoTime();
          + check(sequence[9], buffer.getNextNonBlocked());
          + check(sequence[11], buffer.getNextNonBlocked());
          + check(sequence[13], buffer.getNextNonBlocked());
          + check(sequence[15], buffer.getNextNonBlocked());
          +
          + // checkpoint done
          + check(sequence[7], buffer.getNextNonBlocked());
          + validateAlignmentTime(startTs, buffer);
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
          +
          + // queued data
          + check(sequence[10], buffer.getNextNonBlocked());
          + check(sequence[14], buffer.getNextNonBlocked());
          +
          + // trailing data
          + check(sequence[18], buffer.getNextNonBlocked());
          + check(sequence[19], buffer.getNextNonBlocked());
          + check(sequence[20], buffer.getNextNonBlocked());
          +
          + // all done
          + assertNull(buffer.getNextNonBlocked());
          + assertNull(buffer.getNextNonBlocked());
          +
          + buffer.cleanup();
          + checkNoTempFilesRemain();
          +
          + // check overall notifications
          + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
          + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
          + }
          +
          + /**
          + * This tests the where a cancellation barrier is received for a checkpoint already
          + * canceled due to receiving a newer checkpoint barrier.
          + */
          + @Test
          + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
          + BufferOrEvent[] sequence = {
          + // starting a checkpoint
          + /* 0 */ createBuffer(2),
          + /* 1 */ createBarrier(3, 1), createBarrier(3, 0),
          + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2),
          +
          + // newer checkpoint barrier cancels/subsumes pending checkpoint
          + /* 6 */ createBarrier(5, 2),
          — End diff –

          I totally agree.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87043110 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java — @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() { } @Test public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception Unknown macro: { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered} + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [6] , buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [8] , buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence [0] , buffer.getNextNonBlocked()); + check(sequence [1] , buffer.getNextNonBlocked()); + check(sequence [2] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [5] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [6] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + check(sequence [9] , buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence [12] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence [13] , buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence [15] , buffer.getNextNonBlocked()); + check(sequence [16] , buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence [20] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence [21] , buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence [24] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence [26] , buffer.getNextNonBlocked()); + check(sequence [27] , buffer.getNextNonBlocked()); + check(sequence [28] , buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence [32] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence [33] , buffer.getNextNonBlocked()); + + check(sequence [37] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), + + // some more buffers + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence [0] , buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence [4] , buffer.getNextNonBlocked()); + check(sequence [8] , buffer.getNextNonBlocked()); + + // finished first checkpoint + check(sequence [3] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence [5] , buffer.getNextNonBlocked()); + + // re-read the queued cancellation barriers + check(sequence [9] , buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence [10] , buffer.getNextNonBlocked()); + check(sequence [12] , buffer.getNextNonBlocked()); + check(sequence [13] , buffer.getNextNonBlocked()); + check(sequence [14] , buffer.getNextNonBlocked()); + + check(sequence [16] , buffer.getNextNonBlocked()); + check(sequence [17] , buffer.getNextNonBlocked()); + check(sequence [18] , buffer.getNextNonBlocked()); + + // no further alignment should have happened + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // no further checkpoint (abort) notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + /** + * This tests the where a replay of queued checkpoint barriers meets + * a canceled checkpoint. + * + * The replayed newer checkpoint barrier must not try to cancel the + * already canceled checkpoint. + */ + @Test + public void testAbortWhileHavingQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2), createBuffer(1), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) + } ; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence [0] , buffer.getNextNonBlocked()); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence [2] , buffer.getNextNonBlocked()); + check(sequence [3] , buffer.getNextNonBlocked()); + check(sequence [6] , buffer.getNextNonBlocked()); + + // cancelled by cancellation barrier + check(sequence [4] , buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + + // the next checkpoint alignment starts now + startTs = System.nanoTime(); + check(sequence [9] , buffer.getNextNonBlocked()); + check(sequence [11] , buffer.getNextNonBlocked()); + check(sequence [13] , buffer.getNextNonBlocked()); + check(sequence [15] , buffer.getNextNonBlocked()); + + // checkpoint done + check(sequence [7] , buffer.getNextNonBlocked()); + validateAlignmentTime(startTs, buffer); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + + // queued data + check(sequence [10] , buffer.getNextNonBlocked()); + check(sequence [14] , buffer.getNextNonBlocked()); + + // trailing data + check(sequence [18] , buffer.getNextNonBlocked()); + check(sequence [19] , buffer.getNextNonBlocked()); + check(sequence [20] , buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + } + + /** + * This tests the where a cancellation barrier is received for a checkpoint already + * canceled due to receiving a newer checkpoint barrier. + */ + @Test + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(2), + /* 1 */ createBarrier(3, 1), createBarrier(3, 0), + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // newer checkpoint barrier cancels/subsumes pending checkpoint + /* 6 */ createBarrier(5, 2), — End diff – I totally agree.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2754#discussion_r87043464

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java —
          @@ -135,19 +138,20 @@ else if (next.getEvent().getClass() == CheckpointBarrier.class)

          { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); }

          }
          + else if (next.getEvent().getClass() == CancelCheckpointMarker.class)

          { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + }

          else {
          if (next.getEvent().getClass() == EndOfPartitionEvent.class)

          { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); }

          return next;
          }
          }
          else if (!endOfStream) {
          // end of input stream. stream continues with the buffered data
          endOfStream = true;

          • releaseBlocks();
            + releaseBlocksAndResetBarriers();
            return getNextNonBlocked();
              • End diff –

          Sure. The recursion should also have a non-measurable impact, I assume.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87043464 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java — @@ -135,19 +138,20 @@ else if (next.getEvent().getClass() == CheckpointBarrier.class) { processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); } } + else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { + processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); + } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - numClosedChannels++; - // no chance to complete this checkpoint - releaseBlocks(); + processEndOfPartition(next.getChannelIndex()); } return next; } } else if (!endOfStream) { // end of input stream. stream continues with the buffered data endOfStream = true; releaseBlocks(); + releaseBlocksAndResetBarriers(); return getNextNonBlocked(); End diff – Sure. The recursion should also have a non-measurable impact, I assume.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2754

          Btw: Going through the `BarrierBuffer` and the way events are handled was really instructive

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2754 Btw: Going through the `BarrierBuffer` and the way events are handled was really instructive
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/2773

          [backport] FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment

            1. This is a backport of #2754 to the 1.1.x branch.
            2. This already incorporates the feedback on #2754

          In corner case situations, checkpoint alignment can take very long and buffer/spill a lot of data. This PR introduces setting a limit to how much data may be buffered during alignments. If that volume is exceeded, the checkpoint will abort.

          While these overly large alignment situation should not occur in a healthy environment, it is an important safety net to have.

          This Pull Request consists of three parts:

              1. Introduce Cancellation Barriers

          These Cancellation Barriers are like checkpoint barriers, flowing with the data, but signalling that a checkpoint should be aborted rather that the position of that stream in the checkpoint.

          This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that these Cancellation Barriers are correctly interpreted and interplay well with other situations of alignment starts and cancellations (such as when newer barriers come early).

              1. Adjust and Checkpoint Coordinator

          Tasks emit cancellation barriers whenever they cannot start a checkpoint or whenever a checkpoint alignment was canceled. That lets downstream tasks know earlier that they should stop the alignment for that checkpoint, because it will not be able to complete.

          Tasks also explicitly send "decline" messages to the checkpoint coordinator for checkpoints they "skipped" due to alignment being cancelled or superseded.

          Previously the assumptions were:

          • When a Source Task cannot start a checkpoint, a new checkpoint must be triggered immediately, to dissolve any started downstream alignments that otherwise would not be able to complete.
          • Whenever an alignment is aborted by a newer checkpoint barrier coming in, that newer barrier will eventually reach the downstream task and break outdated pending alignments. The cancellation barrier will not break the outdated alignment earlier.
              1. Alignment Size Limit

          When the `BarrierBuffer` has buffered more than a certain number of bytes, it aborts the alignment and signals the Task that the checkpoint was aborted. The Task sends a cancellation barrier for that checkpoint downstream, to signal the downstream tasks that they should not wait for a proper barrier.

          The maximum alignment size is a config option: `task.checkpoint.alignment.max-size`

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink backport

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2773.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2773


          commit a1f028dee49928ada014632bb27216b36e30250e
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-10-23T16:41:32Z

          FLINK-4984 [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints

          commit b643edf1ace88b34c9cea5e892c440ad114a46fe
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-03T14:28:15Z

          FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment.

          If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.

          commit 0d890024299aecd3279d9f033415a206622e0425
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-08T16:13:19Z

          FLINK-4985 [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2773 [backport] FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment This is a backport of #2754 to the 1.1.x branch. This already incorporates the feedback on #2754 In corner case situations, checkpoint alignment can take very long and buffer/spill a lot of data. This PR introduces setting a limit to how much data may be buffered during alignments. If that volume is exceeded, the checkpoint will abort. While these overly large alignment situation should not occur in a healthy environment, it is an important safety net to have. This Pull Request consists of three parts: Introduce Cancellation Barriers These Cancellation Barriers are like checkpoint barriers, flowing with the data, but signalling that a checkpoint should be aborted rather that the position of that stream in the checkpoint. This adds extensive tests to the `BarrierBuffer` and `BarrierTracker` that these Cancellation Barriers are correctly interpreted and interplay well with other situations of alignment starts and cancellations (such as when newer barriers come early). Adjust and Checkpoint Coordinator Tasks emit cancellation barriers whenever they cannot start a checkpoint or whenever a checkpoint alignment was canceled. That lets downstream tasks know earlier that they should stop the alignment for that checkpoint, because it will not be able to complete. Tasks also explicitly send "decline" messages to the checkpoint coordinator for checkpoints they "skipped" due to alignment being cancelled or superseded. Previously the assumptions were: When a Source Task cannot start a checkpoint, a new checkpoint must be triggered immediately, to dissolve any started downstream alignments that otherwise would not be able to complete. Whenever an alignment is aborted by a newer checkpoint barrier coming in, that newer barrier will eventually reach the downstream task and break outdated pending alignments. The cancellation barrier will not break the outdated alignment earlier. Alignment Size Limit When the `BarrierBuffer` has buffered more than a certain number of bytes, it aborts the alignment and signals the Task that the checkpoint was aborted. The Task sends a cancellation barrier for that checkpoint downstream, to signal the downstream tasks that they should not wait for a proper barrier. The maximum alignment size is a config option: `task.checkpoint.alignment.max-size` You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2773.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2773 commit a1f028dee49928ada014632bb27216b36e30250e Author: Stephan Ewen <sewen@apache.org> Date: 2016-10-23T16:41:32Z FLINK-4984 [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints commit b643edf1ace88b34c9cea5e892c440ad114a46fe Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-03T14:28:15Z FLINK-4975 [checkpointing] Add a limit for how much data may be buffered in alignment. If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled. commit 0d890024299aecd3279d9f033415a206622e0425 Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-08T16:13:19Z FLINK-4985 [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2773

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2773
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2754

          Thanks for the thorough review!

          Addressed the feedback, rebased, and will merge as soon as Travis gives a green light...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2754 Thanks for the thorough review! Addressed the feedback, rebased, and will merge as soon as Travis gives a green light...
          Hide
          StephanEwen Stephan Ewen added a comment -

          Implemented in

          • 1.2.0 via 07ab9f45341f8c49354d9357d9459ee2199b4e1d
          • 1.1.4 via 0962cb6f45607fb21d50030e325e99fc2c37164a
          Show
          StephanEwen Stephan Ewen added a comment - Implemented in 1.2.0 via 07ab9f45341f8c49354d9357d9459ee2199b4e1d 1.1.4 via 0962cb6f45607fb21d50030e325e99fc2c37164a
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/2754

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2754

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development