Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25664

Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      For now, there might be case like:

      1. PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false)
      2. CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true)
      3. Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition)
        1. add the first finished-buffer will not notifyDataAvailable because isBlocked == true
        2. add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1
      4. DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false)
      5. OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1

      In conclusion,There are three case we should trigger notifyDataAvailable:
          case1: only have one finished buffer (handled by add)
          case2: only have one unfinished buffer (handled by flush)
          case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled)

      // test code for this case
      // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest 
      @Test
      public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption()
              throws Exception {
          blockSubpartitionByCheckpoint(1);
      
          subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
          subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE));
      
          assertEquals(1, availablityListener.getNumNotifications());
          readView.resumeConsumption();
          subpartition.flush();
          assertEquals(2, availablityListener.getNumNotifications());
      } 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            cailiuyang Cai Liuyang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment