Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Won't Fix
-
2.7.3
-
None
-
None
Description
NativeS3FileSystem does not support any retry management for failed uploading to S3.
If due to socket timeout or any other network exception, file uploading to S3 bucket fails, then uploading fails and temporary file gets deleted.
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at org.jets3t.service.S3Service.putObject(S3Service.java:2265)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:122)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.fs.s3native.$Proxy8.storeFile(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close(NativeS3FileSystem.java:284)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.close(CBZip2OutputStream.java:737)
at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionOutputStream.close(BZip2Codec.java:336)
at org.apache.flume.sink.hdfs.HDFSCompressedDataStream.close(HDFSCompressedDataStream.java:155)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:312)
at org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:308)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
This can be solved by using asynchronous retry management.
We have made following modifications to NativeS3FileSystem to add the retry management, which is working fine in our product system, without any uploading failure:
@@ -36,6 +36,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; @@ -279,9 +280,19 @@ backupStream.close(); LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); + Callable<Void> task = new Callable<Void>() { + private final byte[] md5Hash = digest == null ? null : digest.digest(); + public Void call() throws IOException { + store.storeFile(key, backupFile, md5Hash); + return null; + } + }; + RetriableTask<Void> r = new RetriableTask<Void>(task); + try { - byte[] md5Hash = digest == null ? null : digest.digest(); - store.storeFile(key, backupFile, md5Hash); + r.call(); + } catch (Exception e) { + throw new IOException(e); } finally { if (!backupFile.delete()) { LOG.warn("Could not delete temporary s3n file: " + backupFile);