Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22497

When using DefaultRollingPolicy in StreamingFileSink, the file will be finished delayed

    XMLWordPrintableJSON

Details

    Description

      I had a doubt when testing StreamingFileSink:

      The default 60s rolling interval in DefaultRollingPolicy is detected by procTimeService. If the rolling interval is not met this time, it will be delayed to the next timer trigger point (after 60s), so this is not real-time and does not match the maximum duration. For example, if the checkpoint period is set to 60s, the file should be converted to finished at the second checkpoint, but it will be delayed to the third checkpoint.

      You can refer to the attached picture for detail.

      If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of Bucket.write method, the file will be set to finished as we expect at the second checkpoint.

      void write(IN element, long currentTime) throws IOException {
          if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element) || rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) {
              if (LOG.isDebugEnabled()) {
                  LOG.info("Subtask {} closing in-progress part file for bucket id={} due to element {}.", subtaskIndex, bucketId, element);
      	}
      		rollPartFile(currentTime);
          }
          inProgressPart.write(element, currentTime);
      }
      

      Maybe we can replace periodic detection with this?

      Is my understanding correct? Or can we do this? 
      Thanks! _

      Attachments

        1. 1.png
          237 kB
          ChangjiGuo

        Activity

          People

            Unassigned Unassigned
            ChangjiGuo ChangjiGuo
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: