Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13063

AsyncWaitOperator shouldn't be releasing checkpointingLock

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • This changes the default chaining behavior of the AsyncWaitOperator. By default, we now break chains so that the AsyncWaitOperator is never chained after another operator.

    Description

      1.

      For the following setup of chained operators:

      SourceOperator -> FlatMap -> AsyncOperator

      Lets assume that input buffer of AsyncOperator is full. We start processing a record from the SourceOperator, we pass it to the FlatMap, which fan it out (multiplies it 10 times). First multiplied record reaches AsyncOperator and is special treated (stored in AsyncWaitOperator#pendingStreamElementQueueEntry ) and then AsyncWaitOperator waits (and releases) on the checkpoint lock (in AsyncWaitOperator#addAsyncBufferEntry . If a checkpoint is triggered now, both SourceOperator and FlatMap will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the AsyncWatiOperator. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records.

      2.

      Similar issue (I think previously known) can happen if for example some upstream operator to the AsyncOperator fires a processing time timer, that emits some data. But in that case, AsyncWaitOperator#pendingStreamElementQueueEntry is being overwritten.

      3.

      If upstream operator has the following pseudo code:

      stateA = true
      output.collect(x)
      stateB = true

      one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because AsyncWaitOperator releases the checkpoint lock, they will not be.

      CC Aljoscha Krettek [~StephanEwen] Stefan Richter

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            srichter Stefan Richter
            pnowojski Piotr Nowojski
            Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m

                Slack

                  Issue deployment