Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1579122) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy) @@ -23,9 +23,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -44,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressListener; @@ -99,6 +102,8 @@ private Properties properties; private Date startTime; + + private ThreadPoolExecutor asyncWriteExecuter; /** * Initialize S3Backend. It creates AmazonS3Client and TransferManager from @@ -190,6 +195,16 @@ tmx = new TransferManager(s3service, (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads, new NamedThreadFactory("s3-transfer-manager-worker"))); + + int asyncWritePoolSize = 10; + String maxConnsStr = prop.getProperty(S3Constants.S3_MAX_CONNS); + if (maxConnsStr != null) { + asyncWritePoolSize = Integer.parseInt(maxConnsStr) + - writeThreads; + } + + asyncWriteExecuter = (ThreadPoolExecutor) Executors.newFixedThreadPool( + asyncWritePoolSize, new NamedThreadFactory("s3-write-worker")); String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS); boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp)) ? true @@ -233,8 +248,8 @@ throw new IllegalArgumentException( "callback parameter cannot be null in asyncUpload"); } - Thread th = new Thread(new AsyncUploadJob(identifier, file, callback)); - th.start(); + asyncWriteExecuter.execute(new AsyncUploadJob(identifier, file, + callback)); } /** @@ -543,6 +558,7 @@ tmx.abortMultipartUploads(bucket, startTime); tmx.shutdownNow(); s3service.shutdown(); + asyncWriteExecuter.shutdownNow(); LOG.info("S3Backend closed."); } @@ -602,7 +618,7 @@ } if (callback != null) { callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + AsyncUploadCallback.RESULT.SUCCESS, null); } } @@ -613,7 +629,7 @@ file)); // wait for upload to finish if (asyncUpload) { - up.addProgressListener(new S3UploadProgressListener( + up.addProgressListener(new S3UploadProgressListener(up, identifier, file, callback)); if (LOG.isDebugEnabled()) { LOG.debug("added upload progress listener to identifier [" @@ -627,13 +643,13 @@ } if (callback != null) { callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + AsyncUploadCallback.RESULT.SUCCESS, null); } } } catch (Exception e2) { if (!asyncUpload) { callback.call(identifier, file, - AsyncUploadCallback.RESULT.ABORTED); + AsyncUploadCallback.RESULT.ABORTED, null); } throw new DataStoreException("Could not upload " + key, e2); } @@ -801,24 +817,34 @@ private DataIdentifier identifier; private AsyncUploadCallback callback; + + private Upload upload; - public S3UploadProgressListener(DataIdentifier identifier, File file, + public S3UploadProgressListener(Upload upload, DataIdentifier identifier, File file, AsyncUploadCallback callback) { super(); this.identifier = identifier; this.file = file; this.callback = callback; + this.upload = upload; } public void progressChanged(ProgressEvent progressEvent) { switch (progressEvent.getEventCode()) { case ProgressEvent.COMPLETED_EVENT_CODE: callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + AsyncUploadCallback.RESULT.SUCCESS, null); break; case ProgressEvent.FAILED_EVENT_CODE: + Map argsMap = new HashMap(1); + try { + AmazonClientException e = upload.waitForException(); + if ( e != null) { + argsMap.put(AsyncUploadCallback.EXCEPTION_ARG, e); + } + } catch (InterruptedException e) {} callback.call(identifier, file, - AsyncUploadCallback.RESULT.FAILED); + AsyncUploadCallback.RESULT.FAILED, argsMap); break; default: break; Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1579122) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy) @@ -31,6 +31,26 @@ * Amazon aws secret key. */ public static final String SECRET_KEY = "secretKey"; + + /** + * Amazon S3 Http connection timeout. + */ + public static final String S3_CONN_TIMEOUT = "connectionTimeout"; + + /** + * Amazon S3 socket timeout. + */ + public static final String S3_SOCK_TIMEOUT = "socketTimeout"; + + /** + * Amazon S3 maximum connections to be used. + */ + public static final String S3_MAX_CONNS = "maxConnections"; + + /** + * Amazon S3 maximum retries. + */ + public static final String S3_MAX_ERR_RETRY = "maxErrorRetry"; /** * Amazon aws S3 bucket. Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (revision 1579122) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (working copy) @@ -62,10 +62,10 @@ AWSCredentials credentials = new BasicAWSCredentials( prop.getProperty(S3Constants.ACCESS_KEY), prop.getProperty(S3Constants.SECRET_KEY)); - int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout")); - int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout")); - int maxConnections = Integer.parseInt(prop.getProperty("maxConnections")); - int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry")); + int connectionTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT)); + int socketTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_SOCK_TIMEOUT)); + int maxConnections = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_CONNS)); + int maxErrorRetry = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY)); ClientConfiguration cc = new ClientConfiguration(); cc.setConnectionTimeout(connectionTimeOut); cc.setSocketTimeout(socketTimeOut); Index: jackrabbit-aws-ext/src/test/resources/log4j.properties =================================================================== --- jackrabbit-aws-ext/src/test/resources/log4j.properties (revision 1579122) +++ jackrabbit-aws-ext/src/test/resources/log4j.properties (working copy) @@ -17,6 +17,8 @@ # this is the log4j configuration for the JCR API tests log4j.rootLogger=INFO, file +log4j.logger.org.apache.jackrabbit.core.data=DEBUG +log4j.logger.org.apache.jackrabbit.aws.ext.ds=DEBUG #log4j.logger.org.apache.jackrabbit.test=DEBUG Index: jackrabbit-aws-ext/src/test/resources/repository_sample.xml =================================================================== --- jackrabbit-aws-ext/src/test/resources/repository_sample.xml (revision 1579122) +++ jackrabbit-aws-ext/src/test/resources/repository_sample.xml (working copy) @@ -43,6 +43,7 @@ +