Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6427

BucketingSink does not sync file length in case of cancel

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.2.1
    • Fix Version/s: 1.3.0
    • Component/s: Streaming Connectors
    • Labels:
      None

      Description

      This is from a discussion on the user mailing lists: https://lists.apache.org/thread.html/917dbbbd7c8f48b33e7b470fa9ed382df61cf43cb5deb46becebf4b5@%3Cuser.flink.apache.org%3E

      A possible solution is to add this to StreamWriterBase.hflushOrSync():

      if (os.getWrappedStream() instanceof DFSOutputStream) {
          ((DFSOutputStream) os.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
      }
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user juergenthomann closed the pull request at:

          https://github.com/apache/flink/pull/3813

          Show
          githubbot ASF GitHub Bot added a comment - Github user juergenthomann closed the pull request at: https://github.com/apache/flink/pull/3813
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user juergenthomann commented on the issue:

          https://github.com/apache/flink/pull/3813

          Thanks for merging!

          Show
          githubbot ASF GitHub Bot added a comment - Github user juergenthomann commented on the issue: https://github.com/apache/flink/pull/3813 Thanks for merging!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3813

          Merged. 👍 @juergenthomann could you please close this PR?

          Thanks for fixing this and finding and debugging it in the first place!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3813 Merged. 👍 @juergenthomann could you please close this PR? Thanks for fixing this and finding and debugging it in the first place!
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented in 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented in 6d0c4c340d0b052d2a97e7e86622707d05f6b6d7
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3813

          No objections from my side. LGTM.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3813 No objections from my side. LGTM.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user juergenthomann commented on the issue:

          https://github.com/apache/flink/pull/3813

          @aljoscha I tested it now 3 times and always the valid length was equal or smaller than the file size reported by the namenode. So for me it works perfectly fine now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user juergenthomann commented on the issue: https://github.com/apache/flink/pull/3813 @aljoscha I tested it now 3 times and always the valid length was equal or smaller than the file size reported by the namenode. So for me it works perfectly fine now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3813

          @juergenthomann you tested this in your program, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3813 @juergenthomann you tested this in your program, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3813

          This looks good to me. @StefanRRichter do you have an opinion on this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3813 This looks good to me. @StefanRRichter do you have an opinion on this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/3807

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3807
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user juergenthomann opened a pull request:

          https://github.com/apache/flink/pull/3813

          FLINK-6427 Ensure file length is flushed in StreamWriterBase

          StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the NameNode does not know about the current file length if we flush but don't properly close the file. We now specify that we also want to update the file length when syncing a Stream.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/juergenthomann/flink bucketing-sink-flush-file-length

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3813.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3813


          commit 83799ce096e54e0590aa1d79b37dec8ff4300e2e
          Author: Jürgen Thomann <juergen.thomann@innogames.com>
          Date: 2017-05-03T11:06:04Z

          FLINK-6427 Ensure file length is flushed in StreamWriterBase


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user juergenthomann opened a pull request: https://github.com/apache/flink/pull/3813 FLINK-6427 Ensure file length is flushed in StreamWriterBase StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the NameNode does not know about the current file length if we flush but don't properly close the file. We now specify that we also want to update the file length when syncing a Stream. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juergenthomann/flink bucketing-sink-flush-file-length Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3813.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3813 commit 83799ce096e54e0590aa1d79b37dec8ff4300e2e Author: Jürgen Thomann <juergen.thomann@innogames.com> Date: 2017-05-03T11:06:04Z FLINK-6427 Ensure file length is flushed in StreamWriterBase
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user juergenthomann commented on the issue:

          https://github.com/apache/flink/pull/3807

          @aljoscha I'll create a new pull request in some minutes. In my case the valid-length and the size reported by the namenode are exactly the same.

          Show
          githubbot ASF GitHub Bot added a comment - Github user juergenthomann commented on the issue: https://github.com/apache/flink/pull/3807 @aljoscha I'll create a new pull request in some minutes. In my case the valid-length and the size reported by the namenode are exactly the same.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3807

          @juergenthomann Cool, if you want I can also close this PR and wait for you to open a fixed (and verified) change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3807 @juergenthomann Cool, if you want I can also close this PR and wait for you to open a fixed (and verified) change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user juergenthomann commented on the issue:

          https://github.com/apache/flink/pull/3807

          It seems that DFSOutputStream is considered a private class (see Annotation). HdfsDataOutputStream also provides an hsync with flags and is considered public. It is also the place where the SyncFlags are provided.

          I will try it with my mentioned small change and see if that solves our problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user juergenthomann commented on the issue: https://github.com/apache/flink/pull/3807 It seems that DFSOutputStream is considered a private class (see Annotation). HdfsDataOutputStream also provides an hsync with flags and is considered public. It is also the place where the SyncFlags are provided. I will try it with my mentioned small change and see if that solves our problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3807

          FLINK-6427 Ensure file length is flushed in StreamWriterBase

          StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the
          NameNode does not update the file length if we flush but don't properly close
          the file. We now specify that we also want to flush the file length when
          flushing.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink bucketing-sink-flush-file-length

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3807.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3807


          commit dbc3899d5dbda642fdce0f06e9fece43dbc9585f
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-05-02T09:31:40Z

          FLINK-6427 Ensure file length is flushed in StreamWriterBase

          StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the
          NameNode does not update the file length if we flush but don't properly close
          the file. We now specify that we also want to flush the file length when
          flushing.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3807 FLINK-6427 Ensure file length is flushed in StreamWriterBase StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the NameNode does not update the file length if we flush but don't properly close the file. We now specify that we also want to flush the file length when flushing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink bucketing-sink-flush-file-length Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3807 commit dbc3899d5dbda642fdce0f06e9fece43dbc9585f Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-05-02T09:31:40Z FLINK-6427 Ensure file length is flushed in StreamWriterBase StreamWriterBase is used with BucketingSink. With HDFS, it can happen that the NameNode does not update the file length if we flush but don't properly close the file. We now specify that we also want to flush the file length when flushing.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Stefan Richter, Stephan Ewen, what do you think? You mostly worked with DFS related problems during the last release cycle.

          Show
          aljoscha Aljoscha Krettek added a comment - Stefan Richter , Stephan Ewen , what do you think? You mostly worked with DFS related problems during the last release cycle.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development