Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1579937) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy) @@ -36,6 +36,7 @@ import org.apache.jackrabbit.aws.ext.S3Constants; import org.apache.jackrabbit.aws.ext.Utils; import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.AsyncUploadResult; import org.apache.jackrabbit.core.data.Backend; import org.apache.jackrabbit.core.data.CachingDataStore; import org.apache.jackrabbit.core.data.DataIdentifier; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressListener; @@ -99,6 +101,8 @@ private Properties properties; private Date startTime; + + private ThreadPoolExecutor asyncWriteExecuter; /** * Initialize S3Backend. It creates AmazonS3Client and TransferManager from @@ -135,9 +139,7 @@ startTime = new Date(); Thread.currentThread().setContextClassLoader( getClass().getClassLoader()); - if (LOG.isDebugEnabled()) { - LOG.debug("init"); - } + LOG.debug("init"); this.store = store; s3service = Utils.openService(prop); if (bucket == null || "".equals(bucket.trim())) { @@ -157,9 +159,9 @@ s3service.createBucket(bucket, region); endpoint = S3 + DASH + region + DOT + AWSDOTCOM; } - LOG.info("Created bucket: " + bucket + " in " + region); + LOG.info("Created bucket [{}] in [{}] ", bucket, region); } else { - LOG.info("Using bucket: " + bucket); + LOG.info("Using bucket [{}]", bucket); if (DEFAULT_AWS_BUCKET_REGION.equals(region)) { endpoint = S3 + DOT + AWSDOTCOM; } else if (Region.EU_Ireland.toString().equals(region)) { @@ -178,18 +180,27 @@ * redirects it to correct location. */ s3service.setEndpoint(endpoint); - LOG.info("S3 service endpoint: " + endpoint); + LOG.info("S3 service endpoint [{}] ", endpoint); int writeThreads = 10; String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS); if (writeThreadsStr != null) { writeThreads = Integer.parseInt(writeThreadsStr); } - LOG.info("Using thread pool of [" + writeThreads - + "] threads in S3 transfer manager"); + LOG.info("Using thread pool of [{}] threads in S3 transfer manager.", writeThreads); tmx = new TransferManager(s3service, (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads, new NamedThreadFactory("s3-transfer-manager-worker"))); + + int asyncWritePoolSize = 10; + String maxConnsStr = prop.getProperty(S3Constants.S3_MAX_CONNS); + if (maxConnsStr != null) { + asyncWritePoolSize = Integer.parseInt(maxConnsStr) + - writeThreads; + } + + asyncWriteExecuter = (ThreadPoolExecutor) Executors.newFixedThreadPool( + asyncWritePoolSize, new NamedThreadFactory("s3-write-worker")); String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS); boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp)) ? true @@ -197,15 +208,10 @@ if (renameKeyBool) { renameKeys(); } - if (LOG.isDebugEnabled()) { - LOG.debug("S3 Backend initialized in [" - + (System.currentTimeMillis() - startTime.getTime()) - + "] ms"); - } + LOG.debug("S3 Backend initialized in [{}] ms", + +(System.currentTimeMillis() - startTime.getTime())); } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug(" error ", e); - } + LOG.debug(" error ", e); throw new DataStoreException("Could not initialize S3 from " + prop, e); } finally { @@ -233,8 +239,8 @@ throw new IllegalArgumentException( "callback parameter cannot be null in asyncUpload"); } - Thread th = new Thread(new AsyncUploadJob(identifier, file, callback)); - th.start(); + asyncWriteExecuter.execute(new AsyncUploadJob(identifier, file, + callback)); } /** @@ -251,17 +257,15 @@ ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket, key); if (objectMetaData != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("exists [" + identifier + "]: [true] took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("exists [{}]: [true] took [{}] ms.", + identifier, (System.currentTimeMillis() - start) ); return true; } return false; } catch (AmazonServiceException e) { if (e.getStatusCode() == 404) { - LOG.info("exists [" + identifier + "]: [false] took [" - + (System.currentTimeMillis() - start) + "] ms"); + LOG.debug("exists [{}]: [false] took [{}] ms.", + identifier, (System.currentTimeMillis() - start) ); return false; } throw new DataStoreException( @@ -293,11 +297,8 @@ key, bucket, key); copReq.setNewObjectMetadata(objectMetaData); s3service.copyObject(copReq); - if (LOG.isDebugEnabled()) { - LOG.debug("[ " + identifier.toString() - + "] touched took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("[{}] touched took [{}] ms. ", identifier, + (System.currentTimeMillis() - start)); } } else { retVal = false; @@ -320,11 +321,8 @@ Thread.currentThread().setContextClassLoader(contextClassLoader); } } - - if (LOG.isDebugEnabled()) { - LOG.debug("exists [" + identifier + "]: [" + retVal + "] took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("exists [{}]: [{}] took [{}] ms.", new Object[] { identifier, + retVal, (System.currentTimeMillis() - start) }); return retVal; } @@ -339,10 +337,8 @@ getClass().getClassLoader()); S3Object object = s3service.getObject(bucket, key); InputStream in = object.getObjectContent(); - if (LOG.isDebugEnabled()) { - LOG.debug("[ " + identifier.toString() + "] read took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("[{}] read took [{}]ms", identifier, + (System.currentTimeMillis() - start)); return in; } catch (AmazonServiceException e) { throw new DataStoreException("Object not found: " + key, e); @@ -373,11 +369,8 @@ if (!prevObjectListing.isTruncated()) break; prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing); } - if (LOG.isDebugEnabled()) { - LOG.debug("getAllIdentifiers returned size [ " + ids.size() - + "] took [" + (System.currentTimeMillis() - start) - + "] ms"); - } + LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.", + ids.size(), (System.currentTimeMillis() - start)); return ids.iterator(); } catch (AmazonServiceException e) { throw new DataStoreException("Could not list objects", e); @@ -399,17 +392,16 @@ getClass().getClassLoader()); ObjectMetadata object = s3service.getObjectMetadata(bucket, key); long lastModified = object.getLastModified().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("Identifier [" + identifier.toString() - + "] 's lastModified = [" + lastModified + "] took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug( + "Identifier [{}]'s lastModified = [{}] took [{}]ms.", + new Object[] { identifier, lastModified, + (System.currentTimeMillis() - start) }); return lastModified; } catch (AmazonServiceException e) { if (e.getStatusCode() == 404) { - LOG.info("getLastModified:Identifier [" + identifier.toString() - + "] not found. Took [" - + (System.currentTimeMillis() - start) + "]ms"); + LOG.info( + "getLastModified:Identifier [{}] not found. Took [{}] ms.", + identifier, (System.currentTimeMillis() - start)); } throw new DataStoreException(e); } finally { @@ -429,11 +421,9 @@ getClass().getClassLoader()); ObjectMetadata object = s3service.getObjectMetadata(bucket, key); long length = object.getContentLength(); - if (LOG.isDebugEnabled()) { - LOG.debug("Identifier [" + identifier.toString() - + "] 's length = [" + length + "] took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("Identifier [{}]'s length = [{}] took [{}]ms.", + new Object[] { identifier, length, + (System.currentTimeMillis() - start) }); return length; } catch (AmazonServiceException e) { throw new DataStoreException("Could not length of dataIdentifier " @@ -455,11 +445,8 @@ Thread.currentThread().setContextClassLoader( getClass().getClassLoader()); s3service.deleteObject(bucket, key); - if (LOG.isDebugEnabled()) { - LOG.debug("Identifier [" + identifier.toString() - + "] 's deleted. It took [" - + (System.currentTimeMillis() - start) + "] ms"); - } + LOG.debug("Identifier [{}] deleted. It took [{}]ms.", new Object[] { + identifier, (System.currentTimeMillis() - start) }); } catch (AmazonServiceException e) { throw new DataStoreException( "Could not getLastModified of dataIdentifier " + identifier, e); @@ -489,15 +476,9 @@ DataIdentifier identifier = new DataIdentifier( getIdentifierName(s3ObjSumm.getKey())); long lastModified = s3ObjSumm.getLastModified().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("id [" + identifier + "], lastModified [" - + lastModified + "]"); - } + LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified); if (!store.isInUse(identifier) && lastModified < min) { - if (LOG.isDebugEnabled()) { - LOG.debug("add id :" + s3ObjSumm.getKey() - + " to delete lists"); - } + LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey()); deleteList.add(new DeleteObjectsRequest.KeyVersion( s3ObjSumm.getKey())); deleteIdSet.add(identifier); @@ -514,10 +495,8 @@ + dobjs.getDeletedObjects().size() + " out of " + deleteList.size() + " are deleted"); } else { - if (LOG.isDebugEnabled()) { - LOG.debug(deleteList - + " records deleted from datastore"); - } + LOG.debug("[{}] records deleted from datastore", + deleteList); } } if (!prevObjectListing.isTruncated()) { @@ -530,10 +509,10 @@ Thread.currentThread().setContextClassLoader(contextClassLoader); } } - LOG.info("deleteAllOlderThan: min=[" + min + "] exit. Deleted [" - + deleteIdSet + "] records. Number of records deleted [" - + deleteIdSet.size() + "] took [" - + (System.currentTimeMillis() - start) + "] ms"); + LOG.info( + "deleteAllOlderThan: min=[{}] exit. Deleted[{}] records. Number of records deleted [{}] took [{}]ms", + new Object[] { min, deleteIdSet, deleteIdSet.size(), + (System.currentTimeMillis() - start) }); return deleteIdSet; } @@ -543,6 +522,7 @@ tmx.abortMultipartUploads(bucket, startTime); tmx.shutdownNow(); s3service.shutdown(); + asyncWriteExecuter.shutdownNow(); LOG.info("S3Backend closed."); } @@ -588,21 +568,15 @@ throw new DataStoreException("Collision: " + key + " new length: " + file.length() + " old length: " + l); } - if (LOG.isDebugEnabled()) { - LOG.debug(key + " exists, lastmodified =" - + objectMetaData.getLastModified().getTime()); - } + LOG.debug("[{}]'s exists, lastmodified = [{}]", key, + objectMetaData.getLastModified().getTime()); CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, bucket, key); copReq.setNewObjectMetadata(objectMetaData); s3service.copyObject(copReq); - if (LOG.isDebugEnabled()) { - LOG.debug("lastModified of " + identifier.toString() - + " updated successfully"); - } + LOG.debug("lastModified of [{}] updated successfully.", identifier); if (callback != null) { - callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + callback.onSuccess(new AsyncUploadResult(identifier, file)); } } @@ -613,27 +587,22 @@ file)); // wait for upload to finish if (asyncUpload) { - up.addProgressListener(new S3UploadProgressListener( + up.addProgressListener(new S3UploadProgressListener(up, identifier, file, callback)); - if (LOG.isDebugEnabled()) { - LOG.debug("added upload progress listener to identifier [" - + identifier + "]"); - } + LOG.debug( + "added upload progress listener to identifier [{}]", + identifier); } else { up.waitForUploadResult(); - if (LOG.isDebugEnabled()) { - LOG.debug("synchronous upload to identifier [" - + identifier + "] completed."); - } + LOG.debug("synchronous upload to identifier [{}] completed.", identifier); if (callback != null) { - callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + callback.onSuccess(new AsyncUploadResult( + identifier, file)); } } } catch (Exception e2) { if (!asyncUpload) { - callback.call(identifier, file, - AsyncUploadCallback.RESULT.ABORTED); + callback.onAbort(new AsyncUploadResult(identifier, file)); } throw new DataStoreException("Could not upload " + key, e2); } @@ -643,11 +612,10 @@ Thread.currentThread().setContextClassLoader(contextClassLoader); } } - if (LOG.isDebugEnabled()) { - LOG.debug("write [" + identifier + "] length [" + file.length() - + "], in async mode [" + asyncUpload + "] in [" - + (System.currentTimeMillis() - start) + "] ms."); - } + LOG.debug( + "write of [{}], length=[{}], in async mode [{}], in [{}]ms", + new Object[] { identifier, file.length(), asyncUpload, + (System.currentTimeMillis() - start) }); } /** @@ -694,8 +662,8 @@ } catch (InterruptedException ie) { } - LOG.info("Renamed [" + count + "] keys, time taken [" - + ((System.currentTimeMillis() - startTime) / 1000) + "] sec"); + LOG.info("Renamed [{}] keys, time taken [{}]sec", count, + ((System.currentTimeMillis() - startTime) / 1000)); // Delete older keys. if (deleteList.size() > 0) { DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest( @@ -706,9 +674,10 @@ delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList( startIndex, endIndex))); DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq); - LOG.info("Records[" + dobjs.getDeletedObjects().size() - + "] deleted in datastore from index [" + startIndex - + "] to [" + (endIndex - 1) + "]"); + LOG.info( + "Records[{}] deleted in datastore from index [{}] to [{}]", + new Object[] { dobjs.getDeletedObjects().size(), + startIndex, (endIndex - 1) }); if (endIndex == size) { break; } else { @@ -775,9 +744,7 @@ CopyObjectRequest copReq = new CopyObjectRequest(bucket, oldKey, bucket, newS3Key); s3service.copyObject(copReq); - if (LOG.isDebugEnabled()) { - LOG.debug(oldKey + " renamed to " + newS3Key); - } + LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key); } finally { if (contextClassLoader != null) { Thread.currentThread().setContextClassLoader( @@ -801,24 +768,34 @@ private DataIdentifier identifier; private AsyncUploadCallback callback; + + private Upload upload; - public S3UploadProgressListener(DataIdentifier identifier, File file, + public S3UploadProgressListener(Upload upload, DataIdentifier identifier, File file, AsyncUploadCallback callback) { super(); this.identifier = identifier; this.file = file; this.callback = callback; + this.upload = upload; } public void progressChanged(ProgressEvent progressEvent) { switch (progressEvent.getEventCode()) { case ProgressEvent.COMPLETED_EVENT_CODE: - callback.call(identifier, file, - AsyncUploadCallback.RESULT.SUCCESS); + callback.onSuccess(new AsyncUploadResult(identifier, file)); break; case ProgressEvent.FAILED_EVENT_CODE: - callback.call(identifier, file, - AsyncUploadCallback.RESULT.FAILED); + AsyncUploadResult result = new AsyncUploadResult( + identifier, file); + try { + AmazonClientException e = upload.waitForException(); + if (e != null) { + result.setException(e); + } + } catch (InterruptedException e) { + } + callback.onFailure(result); break; default: break; Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1579937) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy) @@ -31,6 +31,26 @@ * Amazon aws secret key. */ public static final String SECRET_KEY = "secretKey"; + + /** + * Amazon S3 Http connection timeout. + */ + public static final String S3_CONN_TIMEOUT = "connectionTimeout"; + + /** + * Amazon S3 socket timeout. + */ + public static final String S3_SOCK_TIMEOUT = "socketTimeout"; + + /** + * Amazon S3 maximum connections to be used. + */ + public static final String S3_MAX_CONNS = "maxConnections"; + + /** + * Amazon S3 maximum retries. + */ + public static final String S3_MAX_ERR_RETRY = "maxErrorRetry"; /** * Amazon aws S3 bucket. Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (revision 1579937) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (working copy) @@ -62,10 +62,10 @@ AWSCredentials credentials = new BasicAWSCredentials( prop.getProperty(S3Constants.ACCESS_KEY), prop.getProperty(S3Constants.SECRET_KEY)); - int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout")); - int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout")); - int maxConnections = Integer.parseInt(prop.getProperty("maxConnections")); - int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry")); + int connectionTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT)); + int socketTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_SOCK_TIMEOUT)); + int maxConnections = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_CONNS)); + int maxErrorRetry = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY)); ClientConfiguration cc = new ClientConfiguration(); cc.setConnectionTimeout(connectionTimeOut); cc.setSocketTimeout(socketTimeOut); Index: jackrabbit-aws-ext/src/test/resources/log4j.properties =================================================================== --- jackrabbit-aws-ext/src/test/resources/log4j.properties (revision 1579937) +++ jackrabbit-aws-ext/src/test/resources/log4j.properties (working copy) @@ -17,6 +17,8 @@ # this is the log4j configuration for the JCR API tests log4j.rootLogger=INFO, file +log4j.logger.org.apache.jackrabbit.core.data=DEBUG +log4j.logger.org.apache.jackrabbit.aws.ext.ds=DEBUG #log4j.logger.org.apache.jackrabbit.test=DEBUG Index: jackrabbit-aws-ext/src/test/resources/repository_sample.xml =================================================================== --- jackrabbit-aws-ext/src/test/resources/repository_sample.xml (revision 1579937) +++ jackrabbit-aws-ext/src/test/resources/repository_sample.xml (working copy) @@ -43,6 +43,7 @@ +