Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9749 Rework Bucketing Sink
  3. FLINK-9751

Add a RecoverableWriter to the FileSystem abstraction

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 1.6.0
    • Connectors / Common
    • None

    Description

      The core operation of the StreamingFileSink is to append result data to (hidden) "in progress" files and then, when the files should roll over, publish them as visible files. At each checkpoint, the data so far must be persistent in the "in progress" files. On recovery, we resume the "in progress" file at the exact position of the checkpoint, or publish up to the position of that checkpoint.

      In order to support various file systems and object stores, we need an interface that captures these core operations and allows for different implementations (such as file truncate/append on posix, MultiPartUpload on S3, ...)

      Proposed interface:

      /**
       * A handle to an in-progress stream with a defined and persistent amount of data.
       * The handle can be used to recover the stream and publish the result file.
       */
      interface CommitRecoverable { ... }
      
      /**
       * A handle to an in-progress stream with a defined and persistent amount of data.
       * The handle can be used to recover the stream and either publish the result file
       * or keep appending data to the stream.
       */
      interface ResumeRecoverable extends CommitRecoverable { ... }
      
      /**
       * An output stream to a file system that can be recovered at well defined points.
       * The stream initially writes to hidden files or temp files and only creates the
       * target file once it is closed and "committed".
       */
      public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {
      
          /**
           * Ensures all data so far is persistent (similar to {@link #sync()}) and returns
           * a handle to recover the stream at the current position.
           */
          public abstract ResumeRecoverable persist() throws IOException;
      
          /**
           * Closes the stream, ensuring persistence of all data (similar to {@link #sync()}).
           * This returns a Committer that can be used to publish (make visible) the file
           * that the stream was writing to.
           */
          public abstract Committer closeForCommit() throws IOException;
      
          /**
           * A committer can publish the file of a stream that was closed.
           * The Committer can be recovered via a {@link CommitRecoverable}.
           */
          public interface Committer {
      
              void commit() throws IOException;
      
              CommitRecoverable getRecoverable();
          }
      }
      
      /**
       * The RecoverableWriter creates and recovers RecoverableFsDataOutputStream.
       */
      public interface RecoverableWriter{
      
          RecoverableFsDataOutputStream open(Path path) throws IOException;
      
          RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
      
          RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException;
      

      Attachments

        Activity

          People

            sewen Stephan Ewen
            sewen Stephan Ewen
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: