Flume
  1. Flume
  2. FLUME-2445

Duplicate files in s3 (both temp and final file)

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: v1.5.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None

      Description

      Noticed that both temp and final file are created in S3 bucket by HDFS sink as shown below
      rw-rw-rw 1 9558423 2014-08-18 18:01 s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz
      rw-rw-rw 1 9558423 2014-08-18 18:01 s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp

      I could not find any errors in agent log. However, the agent tried to close and rename the temp file again when I tried to restart the agent next day. Even after second try, both file exists.
      Please find the logs below. File uploaded on Aug 18 and agent restarted on 19th

      $ grep actions-i-e9b26de6.1408381201580 logs/flume.log
      18 Aug 2014 17:00:01,591 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:261) - Creating s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp
      18 Aug 2014 17:00:02,150 INFO [hdfs-s3sink-actions-call-runner-1] (org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.<init>:182) - OutputStream for key 'flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp' writing to tempfile '/var/lib/hadoop-hdfs/cache/ec2-user/s3/output-1521416101446161225.tmp'
      18 Aug 2014 18:01:02,535 INFO [hdfs-s3sink-actions-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter$5.call:469) - Closing idle bucketWriter s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp at 1408384862535
      18 Aug 2014 18:01:02,535 INFO [hdfs-s3sink-actions-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:409) - Closing s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp
      18 Aug 2014 18:01:02,535 INFO [hdfs-s3sink-actions-call-runner-7] (org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close:217) - OutputStream for key 'flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp' closed. Now beginning upload
      18 Aug 2014 18:01:08,043 INFO [hdfs-s3sink-actions-call-runner-7] (org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close:229) - OutputStream for key 'flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp' upload complete
      18 Aug 2014 18:01:08,165 INFO [hdfs-s3sink-actions-call-runner-8] (org.apache.flume.sink.hdfs.BucketWriter$8.call:669) - Renaming s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp to s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz

      19 Aug 2014 19:55:37,635 INFO [conf-file-poller-0] (org.apache.flume.sink.hdfs.BucketWriter.close:409) - Closing s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp
      19 Aug 2014 19:55:37,635 INFO [conf-file-poller-0] (org.apache.flume.sink.hdfs.BucketWriter.close:428) - HDFSWriter is already closed: s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp
      19 Aug 2014 19:55:38,064 INFO [hdfs-s3sink-actions-call-runner-1] (org.apache.flume.sink.hdfs.BucketWriter$8.call:669) - Renaming s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz.tmp to s3n://my-bucket/flume/actions/day=16300/hour=17/actions-i-e9b26de6.1408381201580.json.gz

        Activity

        Hide
        Hari Shreedharan added a comment -

        No, take a look at the configure method. The first argument of the context.get* methods are the parameter names.

        Show
        Hari Shreedharan added a comment - No, take a look at the configure method. The first argument of the context.get* methods are the parameter names.
        Hide
        Ashish Paliwal added a comment -

        You can check the configure() in the source. Here are the params you can configure

        awsAccessKeyId
        awsSecretKey
        bucket
        endPoint
        batchSize
        objectPrefix

        Show
        Ashish Paliwal added a comment - You can check the configure() in the source. Here are the params you can configure awsAccessKeyId awsSecretKey bucket endPoint batchSize objectPrefix
        Hide
        Bijith Kumar added a comment -

        Hi Hari,
        Thank you so much. Should the sink configuration be same?. ie hdfs sink?

        Show
        Bijith Kumar added a comment - Hi Hari, Thank you so much. Should the sink configuration be same?. ie hdfs sink?
        Hide
        Hari Shreedharan added a comment -

        You need to build it, and then drop the compiled jar into Flume's classpath.

        Show
        Hari Shreedharan added a comment - You need to build it, and then drop the compiled jar into Flume's classpath.
        Hide
        Bijith Kumar added a comment -

        Thanks Hari. However, I didn't quite get it. How would I plugin this to Flume?

        Show
        Bijith Kumar added a comment - Thanks Hari. However, I didn't quite get it. How would I plugin this to Flume?
        Hide
        Hari Shreedharan added a comment -

        I wrote a sink that pushes to S3: https://github.com/harishreedharan/usingflumecode/blob/master/ch05/src/main/java/usingflume/ch05/S3Sink.java

        It may not tolerate all possible errors etc, but you should be able to make some minor changes and use this.

        Show
        Hari Shreedharan added a comment - I wrote a sink that pushes to S3: https://github.com/harishreedharan/usingflumecode/blob/master/ch05/src/main/java/usingflume/ch05/S3Sink.java It may not tolerate all possible errors etc, but you should be able to make some minor changes and use this.
        Hide
        Bijith Kumar added a comment -

        Hi Everyone,
        I spend considerable time testing this. In short, there are two key issues.
        1. S3 upload fails sporadically with HDFS sink (multiple failures)
        2. No retries if sink to S3 fails for any reason.

        #2 makes it a blocker for anyone using Flume for sinking to S3 (As upload to S3 can fail anytime).
        I would like to contribute if it is reasonable effort. Can anyone guide?

        Meanwhile I am going to use File Roll sink and custom S3 uploader

        Show
        Bijith Kumar added a comment - Hi Everyone, I spend considerable time testing this. In short, there are two key issues. 1. S3 upload fails sporadically with HDFS sink (multiple failures) 2. No retries if sink to S3 fails for any reason. #2 makes it a blocker for anyone using Flume for sinking to S3 (As upload to S3 can fail anytime). I would like to contribute if it is reasonable effort. Can anyone guide? Meanwhile I am going to use File Roll sink and custom S3 uploader
        Hide
        Bijith Kumar added a comment -

        Yeah. I checked the code. Thank you.
        Today I saw one more related issue wherein only the .tmp file exists in S3. (instead of both temp and final files as mentioned in this ticket)
        And for this file, following log from BucketWriter.renameBucket() is not printed in agent side
        LOG.info("Renaming " + srcPath + " to " + dstPath);

        This makes me wonder if the issue is with the new thread that is started to execute rename operation. That this thread is silently dying for some reason?
        Ref: BucketWriter. renameBucket()

        Show
        Bijith Kumar added a comment - Yeah. I checked the code. Thank you. Today I saw one more related issue wherein only the .tmp file exists in S3. (instead of both temp and final files as mentioned in this ticket) And for this file, following log from BucketWriter.renameBucket() is not printed in agent side LOG.info("Renaming " + srcPath + " to " + dstPath); This makes me wonder if the issue is with the new thread that is started to execute rename operation. That this thread is silently dying for some reason? Ref: BucketWriter. renameBucket()
        Hide
        Hari Shreedharan added a comment -

        No, the way flume works is that it writes to a temporary file (whose prefix and suffix can be changed), and then renames it when it is ready to be closed. There is no way to avoid a temporary file.

        Show
        Hari Shreedharan added a comment - No, the way flume works is that it writes to a temporary file (whose prefix and suffix can be changed), and then renames it when it is ready to be closed. There is no way to avoid a temporary file.
        Hide
        Bijith Kumar added a comment -

        Thanks Hari. I will try to dig deeper into this.
        Meantime, is there a way to disable intermediate .tmp file creation in Flume? The config only allows me to change the 'inUseSuffix'

        Show
        Bijith Kumar added a comment - Thanks Hari. I will try to dig deeper into this. Meantime, is there a way to disable intermediate .tmp file creation in Flume? The config only allows me to change the 'inUseSuffix'
        Hide
        Hari Shreedharan added a comment -

        Even though the HDFS sink can write to S3 (since it uses the HDFS client API) - it really is not tested and verified. The S3 connectors for HDFS may or may not work correctly with Flume. It looks like renames don't actually work as expected for some reason with S3.

        Show
        Hari Shreedharan added a comment - Even though the HDFS sink can write to S3 (since it uses the HDFS client API) - it really is not tested and verified. The S3 connectors for HDFS may or may not work correctly with Flume. It looks like renames don't actually work as expected for some reason with S3.
        Hide
        Bijith Kumar added a comment -

        Hi Ashish,
        This is a bug as per my understanding. Please correct me if wrong

        Show
        Bijith Kumar added a comment - Hi Ashish, This is a bug as per my understanding. Please correct me if wrong
        Hide
        Ashish Paliwal added a comment -

        Pls ask questions on User ML http://flume.apache.org/mailinglists.html

        Show
        Ashish Paliwal added a comment - Pls ask questions on User ML http://flume.apache.org/mailinglists.html
        Hide
        Bijith Kumar added a comment -

        Anyone please...

        Show
        Bijith Kumar added a comment - Anyone please...

          People

          • Assignee:
            Unassigned
            Reporter:
            Bijith Kumar
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development