Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5083

Race condition in Rolling/Bucketing Sink pending files cleanup

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.1.3, 1.2.0
    • None
    • API / DataStream
    • None

    Description

      In both Open and Restore methods there is code that:

      1. gets a recursive listing from baseDir
      2. iterates listing and name checks filenames based on subtaskIndex and other criteria to find pending or in-progress files. If found delete.

      The problem is that the recursive listing gets all files for all subtaskIndexes. The race error is when #hasNext is called as part of the iteration, a hidden existence check is made on the "next" file, which was deleted by another task after-listing but pre-iteration, so an error is thrown and the job fails.

      Depending on the number of pending files, this condition may outlast the number of job retries, each failing on a different file.

      A solution would be use #listStatus instead. The hadoop FileSystem supports a PathFilter in its #listStatus calls, but not in the recursive #listFiles call. The cleanup is performed from the baseDir so the recursive listing would have to be in Flink.

      This touches on another issue. Over time, the directory listing is bound to get very large, and re-listing everything from the baseDir may get increasingly expensive, especially if the Fs is S3. Maybe we can have a Bucketer callback to return a list of cleanup root directories based on the current file? I'm guessing most people are using time based bucketing, so there's only so much of a period where cleanup will matter. If so, then this would solve for the above recursive listing problem.

      Attachments

        Activity

          People

            Unassigned Unassigned
            cresny@gmail.com Cliff Resnick
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: