Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2007

HDFS Sink should check if file is closed and retry if it is not.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.4.0
    • Fix Version/s: 1.5.0
    • Component/s: None
    • Labels:
      None

      Description

      We can use the new API added in HDFS-4525. We will need to use reflection though, so we can run against a version of HDFS which does not have this API.

      1. FLUME-2007-8.patch
        17 kB
        Theodore michael Malaska
      2. FLUME-2007-7.patch
        16 kB
        Theodore michael Malaska
      3. FLUME-2007-6.patch
        15 kB
        Theodore michael Malaska
      4. FLUME-2007-6.patch
        15 kB
        Theodore michael Malaska
      5. FLUME-2007-5.patch
        18 kB
        Theodore michael Malaska
      6. FLUME-2007-4.patch
        15 kB
        Theodore michael Malaska
      7. FLUME-2007-2.patch
        16 kB
        Theodore michael Malaska
      8. FLUME-2007-1.patch
        15 kB
        Theodore michael Malaska
      9. FLUME-2007-0.patch
        9 kB
        Theodore michael Malaska

        Issue Links

          Activity

          Hide
          ted.m Theodore michael Malaska added a comment -

          Hey Hari,

          I would love to do this one. Just to confirm. Does this sound like what you are requesting?

          I will add the following method (fs.isFileClosed) to after a close commend has been fired. If the file is not closed then I will wait a given time and repeat. I will repeat N times and then throw an exception.

          Thanks again.

          Show
          ted.m Theodore michael Malaska added a comment - Hey Hari, I would love to do this one. Just to confirm. Does this sound like what you are requesting? I will add the following method (fs.isFileClosed) to after a close commend has been fired. If the file is not closed then I will wait a given time and repeat. I will repeat N times and then throw an exception. Thanks again.
          Hide
          ted.m Theodore michael Malaska added a comment -

          Version one of the patch.

          Updated AbstractHDFSWriter and it's children to use isFileClosed is it is available.

          Also includes unit tests

          Show
          ted.m Theodore michael Malaska added a comment - Version one of the patch. Updated AbstractHDFSWriter and it's children to use isFileClosed is it is available. Also includes unit tests
          Hide
          ted.m Theodore michael Malaska added a comment -

          This jira is ready for review. Do I need to do anything else here?

          Show
          ted.m Theodore michael Malaska added a comment - This jira is ready for review. Do I need to do anything else here?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          I posted a review on reviewboard. Please take a look - mostly minor issues.

          Show
          hshreedharan Hari Shreedharan added a comment - I posted a review on reviewboard. Please take a look - mostly minor issues.
          Hide
          ted.m Theodore michael Malaska added a comment -

          Thanks, I will fix those and resubmit

          Show
          ted.m Theodore michael Malaska added a comment - Thanks, I will fix those and resubmit
          Hide
          ted.m Theodore michael Malaska added a comment - - edited

          Made corrections based on review

          Show
          ted.m Theodore michael Malaska added a comment - - edited Made corrections based on review
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks Ted. I have one question. Looks like there is a sleep for 1 minute between retry attempts. Note that since this is being called on a thread timing out based on the hdfs timeout, this time between retries would cause issues, right? I mean if you retry once a minute and the timeout is 30 seconds - there would never be a retry. Do you think it might make sense for the time between retries be something like 1/nth of the timeout (n being max retry attempts) - so n retries can be completed within the timeout itself. I think it is fine for that not to be configurable. Also it might make sense to put a ceiling on the max retry attempts - a value like 1000 or something in the config does not make a whole lot of sense.

          Show
          hshreedharan Hari Shreedharan added a comment - Thanks Ted. I have one question. Looks like there is a sleep for 1 minute between retry attempts. Note that since this is being called on a thread timing out based on the hdfs timeout, this time between retries would cause issues, right? I mean if you retry once a minute and the timeout is 30 seconds - there would never be a retry. Do you think it might make sense for the time between retries be something like 1/nth of the timeout (n being max retry attempts) - so n retries can be completed within the timeout itself. I think it is fine for that not to be configurable. Also it might make sense to put a ceiling on the max retry attempts - a value like 1000 or something in the config does not make a whole lot of sense.
          Hide
          ted.m Theodore michael Malaska added a comment -

          That is a great idea. I will put that in the next patch

          Show
          ted.m Theodore michael Malaska added a comment - That is a great idea. I will put that in the next patch
          Hide
          ted.m Theodore michael Malaska added a comment -

          Hey Hari,

          What is the timeout parameter you are referring to in the last comment. I
          need to find it and study it.

          This is why I do Jira every once in a while. I learn new things like this.

          Show
          ted.m Theodore michael Malaska added a comment - Hey Hari, What is the timeout parameter you are referring to in the last comment. I need to find it and study it. This is why I do Jira every once in a while. I learn new things like this.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Theodore michael Malaska - The parameter I was talking about is "hdfs.callTimeout". We time out every call after that many seconds using a thread that is timed out using a future.cancel call.

          Show
          hshreedharan Hari Shreedharan added a comment - Theodore michael Malaska - The parameter I was talking about is "hdfs.callTimeout". We time out every call after that many seconds using a thread that is timed out using a future.cancel call.
          Hide
          ted.m Theodore michael Malaska added a comment -

          OK I am free. I will have the next patch shortly

          Show
          ted.m Theodore michael Malaska added a comment - OK I am free. I will have the next patch shortly
          Hide
          ted.m Theodore michael Malaska added a comment -

          added timeBetweenCloseRetries = Long.parseLong(conf.get("hdfs.callTimeout", "30000"));

          and timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);

          Show
          ted.m Theodore michael Malaska added a comment - added timeBetweenCloseRetries = Long.parseLong(conf.get("hdfs.callTimeout", "30000")); and timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1. This looks good. I will run tests and commit later today.

          Show
          hshreedharan Hari Shreedharan added a comment - +1. This looks good. I will run tests and commit later today.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Looks like the final changes caused test failures. I notice that the test itself is using the old name of the parameter and is expecting the time between close retries parameter in configuration. Maybe the test needs to be updated?

          Show
          hshreedharan Hari Shreedharan added a comment - Looks like the final changes caused test failures. I notice that the test itself is using the old name of the parameter and is expecting the time between close retries parameter in configuration. Maybe the test needs to be updated?
          Hide
          ted.m Theodore michael Malaska added a comment -

          I updated the Jira, but the review site is down.

          I've attached the patch

          Show
          ted.m Theodore michael Malaska added a comment - I updated the Jira, but the review site is down. I've attached the patch
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks Ted. This file is missing the ASF license header:
          flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java

          Also I changed this to:

           if (numberOfCloseRetries > 0) {
                try {
                  timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l);
                } catch (NumberFormatException e) {
                  logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout"));
                }
                timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000);
              }
          

          and this change in the test:

          context.put("hdfs.closeTries", String.valueOf(numberOfCloseRetriesToAttempt));
          
          Show
          hshreedharan Hari Shreedharan added a comment - Thanks Ted. This file is missing the ASF license header: flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java Also I changed this to: if (numberOfCloseRetries > 0) { try { timeBetweenCloseRetries = context.getLong( "hdfs.callTimeout" , 10000l); } catch (NumberFormatException e) { logger.warn( "hdfs.callTimeout can not be parsed to a long : " + context.getLong( "hdfs.callTimeout" )); } timeBetweenCloseRetries = Math .max(timeBetweenCloseRetries/numberOfCloseRetries, 1000); } and this change in the test: context.put( "hdfs.closeTries" , String .valueOf(numberOfCloseRetriesToAttempt));
          Hide
          ted.m Theodore michael Malaska added a comment -

          Updated the test to
          A - Test
          B - Work

          Show
          ted.m Theodore michael Malaska added a comment - Updated the test to A - Test B - Work
          Hide
          hshreedharan Hari Shreedharan added a comment -

          +1. Committing this now. Thanks Ted!

          Show
          hshreedharan Hari Shreedharan added a comment - +1. Committing this now. Thanks Ted!
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Patch committed, rev: 5b5470b. Thanks Ted!

          Show
          hshreedharan Hari Shreedharan added a comment - Patch committed, rev: 5b5470b. Thanks Ted!
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in flume-trunk #476 (See https://builds.apache.org/job/flume-trunk/476/)
          FLUME-2007. HDFS Sink should check if file is closed and retry if it is not. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=5b5470bd5d3e94842032009c36788d4ae346674b)

          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
          • flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java
          • flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java
          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
          • flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
          • flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in flume-trunk #476 (See https://builds.apache.org/job/flume-trunk/476/ ) FLUME-2007 . HDFS Sink should check if file is closed and retry if it is not. (hshreedharan: http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=5b5470bd5d3e94842032009c36788d4ae346674b ) flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java

            People

            • Assignee:
              ted.m Theodore michael Malaska
              Reporter:
              hshreedharan Hari Shreedharan
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development