Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11302

FlinkS3FileSystem uses an incorrect path for temporary files

    XMLWordPrintableJSON

Details

    Description

      I'm running the Flink job which stores the parquet files on S3.
      For that purpose, I use StreamingFileSink (in Bulk format) which under the hood uses FlinkS3FileSystem from flink-s3-fs-hadoop-1.7.1.jar

      When I try to submit it in Yarn cluster I got the following exception:

      java.nio.file.NoSuchFileException: /mnt/e1/yarn/nm/usercache/ec2-user/appcache/application_1544473029846_0487,/mnt/e2/yarn/nm/usercache/ec2-user/appcache/application_1544473029846_0487/.tmp_d7a04184-6fed-4c69-9d2f-f950e06bf5f0
      	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
      	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
      	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
      	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
      	at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
      	at java.nio.file.Files.newOutputStream(Files.java:216)
      	at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:95)
      	at org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator.apply(RefCountedTmpFileCreator.java:43)
      	at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.openNew(RefCountedBufferingFileStream.java:174)
      	at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.boundedBufferingFileStream(S3RecoverableFsDataOutputStream.java:271)
      	at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.newStream(S3RecoverableFsDataOutputStream.java:236)
      	at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:85)
      	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
      	at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
      	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
      	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
      	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
      	....
      

      As we can see the target path form stacktrace consist of two paths separated by a comma

      It happens because in AbstractS3FileSystemFactory we use the value of CoreOptions.TMP_DIRS "as is" and we expect that the value will contain only one path. But in reality, it may contain more than one path.

      Proposed solution: Split path by comma and "|" and use only the first path.

      Bugfix already exist and I will create a pull request soon

      Attachments

        Issue Links

          Activity

            People

              artsem.semianenka Artsem Semianenka
              artsem.semianenka Artsem Semianenka
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 0.5h
                  0.5h
                  Remaining:
                  Time Spent - 20m Remaining Estimate - 10m
                  10m
                  Logged:
                  Time Spent - 20m Remaining Estimate - 10m
                  20m