Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-13063

AsyncWaitOperator shouldn't be releasing checkpointingLock

    XMLWordPrintableJSON

    Details

    • Release Note:
      This changes the default chaining behavior of the AsyncWaitOperator. By default, we now break chains so that the AsyncWaitOperator is never chained after another operator.

      Description

      1.

      For the following setup of chained operators:

      SourceOperator -> FlatMap -> AsyncOperator

      Lets assume that input buffer of AsyncOperator is full. We start processing a record from the SourceOperator, we pass it to the FlatMap, which fan it out (multiplies it 10 times). First multiplied record reaches AsyncOperator and is special treated (stored in AsyncWaitOperator#pendingStreamElementQueueEntry ) and then AsyncWaitOperator waits (and releases) on the checkpoint lock (in AsyncWaitOperator#addAsyncBufferEntry . If a checkpoint is triggered now, both SourceOperator and FlatMap will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the AsyncWatiOperator. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records.

      2.

      Similar issue (I think previously known) can happen if for example some upstream operator to the AsyncOperator fires a processing time timer, that emits some data. But in that case, AsyncWaitOperator#pendingStreamElementQueueEntry is being overwritten.

      3.

      If upstream operator has the following pseudo code:

      stateA = true
      output.collect(x)
      stateB = true

      one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because AsyncWaitOperator releases the checkpoint lock, they will not be.

      CC Aljoscha Krettek [~StephanEwen] Stefan Richter

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                srichter Stefan Richter
                Reporter:
                pnowojski Piotr Nowojski
              • Votes:
                0 Vote for this issue
                Watchers:
                11 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m