Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1579937)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy)
@@ -36,6 +36,7 @@
import org.apache.jackrabbit.aws.ext.S3Constants;
import org.apache.jackrabbit.aws.ext.Utils;
import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.AsyncUploadResult;
import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
import org.apache.jackrabbit.core.data.DataIdentifier;
@@ -44,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
@@ -99,6 +101,8 @@
private Properties properties;
private Date startTime;
+
+ private ThreadPoolExecutor asyncWriteExecuter;
/**
* Initialize S3Backend. It creates AmazonS3Client and TransferManager from
@@ -135,9 +139,7 @@
startTime = new Date();
Thread.currentThread().setContextClassLoader(
getClass().getClassLoader());
- if (LOG.isDebugEnabled()) {
- LOG.debug("init");
- }
+ LOG.debug("init");
this.store = store;
s3service = Utils.openService(prop);
if (bucket == null || "".equals(bucket.trim())) {
@@ -157,9 +159,9 @@
s3service.createBucket(bucket, region);
endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
}
- LOG.info("Created bucket: " + bucket + " in " + region);
+ LOG.info("Created bucket [{}] in [{}] ", bucket, region);
} else {
- LOG.info("Using bucket: " + bucket);
+ LOG.info("Using bucket [{}]", bucket);
if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
endpoint = S3 + DOT + AWSDOTCOM;
} else if (Region.EU_Ireland.toString().equals(region)) {
@@ -178,18 +180,27 @@
* redirects it to correct location.
*/
s3service.setEndpoint(endpoint);
- LOG.info("S3 service endpoint: " + endpoint);
+ LOG.info("S3 service endpoint [{}] ", endpoint);
int writeThreads = 10;
String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS);
if (writeThreadsStr != null) {
writeThreads = Integer.parseInt(writeThreadsStr);
}
- LOG.info("Using thread pool of [" + writeThreads
- + "] threads in S3 transfer manager");
+ LOG.info("Using thread pool of [{}] threads in S3 transfer manager.", writeThreads);
tmx = new TransferManager(s3service,
(ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads,
new NamedThreadFactory("s3-transfer-manager-worker")));
+
+ int asyncWritePoolSize = 10;
+ String maxConnsStr = prop.getProperty(S3Constants.S3_MAX_CONNS);
+ if (maxConnsStr != null) {
+ asyncWritePoolSize = Integer.parseInt(maxConnsStr)
+ - writeThreads;
+ }
+
+ asyncWriteExecuter = (ThreadPoolExecutor) Executors.newFixedThreadPool(
+ asyncWritePoolSize, new NamedThreadFactory("s3-write-worker"));
String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS);
boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
? true
@@ -197,15 +208,10 @@
if (renameKeyBool) {
renameKeys();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("S3 Backend initialized in ["
- + (System.currentTimeMillis() - startTime.getTime())
- + "] ms");
- }
+ LOG.debug("S3 Backend initialized in [{}] ms",
+ +(System.currentTimeMillis() - startTime.getTime()));
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" error ", e);
- }
+ LOG.debug(" error ", e);
throw new DataStoreException("Could not initialize S3 from "
+ prop, e);
} finally {
@@ -233,8 +239,8 @@
throw new IllegalArgumentException(
"callback parameter cannot be null in asyncUpload");
}
- Thread th = new Thread(new AsyncUploadJob(identifier, file, callback));
- th.start();
+ asyncWriteExecuter.execute(new AsyncUploadJob(identifier, file,
+ callback));
}
/**
@@ -251,17 +257,15 @@
ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
key);
if (objectMetaData != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("exists [" + identifier + "]: [true] took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("exists [{}]: [true] took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start) );
return true;
}
return false;
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404) {
- LOG.info("exists [" + identifier + "]: [false] took ["
- + (System.currentTimeMillis() - start) + "] ms");
+ LOG.debug("exists [{}]: [false] took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start) );
return false;
}
throw new DataStoreException(
@@ -293,11 +297,8 @@
key, bucket, key);
copReq.setNewObjectMetadata(objectMetaData);
s3service.copyObject(copReq);
- if (LOG.isDebugEnabled()) {
- LOG.debug("[ " + identifier.toString()
- + "] touched took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("[{}] touched took [{}] ms. ", identifier,
+ (System.currentTimeMillis() - start));
}
} else {
retVal = false;
@@ -320,11 +321,8 @@
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("exists [" + identifier + "]: [" + retVal + "] took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("exists [{}]: [{}] took [{}] ms.", new Object[] { identifier,
+ retVal, (System.currentTimeMillis() - start) });
return retVal;
}
@@ -339,10 +337,8 @@
getClass().getClassLoader());
S3Object object = s3service.getObject(bucket, key);
InputStream in = object.getObjectContent();
- if (LOG.isDebugEnabled()) {
- LOG.debug("[ " + identifier.toString() + "] read took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("[{}] read took [{}]ms", identifier,
+ (System.currentTimeMillis() - start));
return in;
} catch (AmazonServiceException e) {
throw new DataStoreException("Object not found: " + key, e);
@@ -373,11 +369,8 @@
if (!prevObjectListing.isTruncated()) break;
prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("getAllIdentifiers returned size [ " + ids.size()
- + "] took [" + (System.currentTimeMillis() - start)
- + "] ms");
- }
+ LOG.debug("getAllIdentifiers returned size [{}] took [{}] ms.",
+ ids.size(), (System.currentTimeMillis() - start));
return ids.iterator();
} catch (AmazonServiceException e) {
throw new DataStoreException("Could not list objects", e);
@@ -399,17 +392,16 @@
getClass().getClassLoader());
ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
long lastModified = object.getLastModified().getTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Identifier [" + identifier.toString()
- + "] 's lastModified = [" + lastModified + "] took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug(
+ "Identifier [{}]'s lastModified = [{}] took [{}]ms.",
+ new Object[] { identifier, lastModified,
+ (System.currentTimeMillis() - start) });
return lastModified;
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404) {
- LOG.info("getLastModified:Identifier [" + identifier.toString()
- + "] not found. Took ["
- + (System.currentTimeMillis() - start) + "]ms");
+ LOG.info(
+ "getLastModified:Identifier [{}] not found. Took [{}] ms.",
+ identifier, (System.currentTimeMillis() - start));
}
throw new DataStoreException(e);
} finally {
@@ -429,11 +421,9 @@
getClass().getClassLoader());
ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
long length = object.getContentLength();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Identifier [" + identifier.toString()
- + "] 's length = [" + length + "] took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("Identifier [{}]'s length = [{}] took [{}]ms.",
+ new Object[] { identifier, length,
+ (System.currentTimeMillis() - start) });
return length;
} catch (AmazonServiceException e) {
throw new DataStoreException("Could not length of dataIdentifier "
@@ -455,11 +445,8 @@
Thread.currentThread().setContextClassLoader(
getClass().getClassLoader());
s3service.deleteObject(bucket, key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Identifier [" + identifier.toString()
- + "] 's deleted. It took ["
- + (System.currentTimeMillis() - start) + "] ms");
- }
+ LOG.debug("Identifier [{}] deleted. It took [{}]ms.", new Object[] {
+ identifier, (System.currentTimeMillis() - start) });
} catch (AmazonServiceException e) {
throw new DataStoreException(
"Could not getLastModified of dataIdentifier " + identifier, e);
@@ -489,15 +476,9 @@
DataIdentifier identifier = new DataIdentifier(
getIdentifierName(s3ObjSumm.getKey()));
long lastModified = s3ObjSumm.getLastModified().getTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("id [" + identifier + "], lastModified ["
- + lastModified + "]");
- }
+ LOG.debug("Identifier [{}]'s lastModified = [{}]", identifier, lastModified);
if (!store.isInUse(identifier) && lastModified < min) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("add id :" + s3ObjSumm.getKey()
- + " to delete lists");
- }
+ LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey());
deleteList.add(new DeleteObjectsRequest.KeyVersion(
s3ObjSumm.getKey()));
deleteIdSet.add(identifier);
@@ -514,10 +495,8 @@
+ dobjs.getDeletedObjects().size() + " out of "
+ deleteList.size() + " are deleted");
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(deleteList
- + " records deleted from datastore");
- }
+ LOG.debug("[{}] records deleted from datastore",
+ deleteList);
}
}
if (!prevObjectListing.isTruncated()) {
@@ -530,10 +509,10 @@
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
- LOG.info("deleteAllOlderThan: min=[" + min + "] exit. Deleted ["
- + deleteIdSet + "] records. Number of records deleted ["
- + deleteIdSet.size() + "] took ["
- + (System.currentTimeMillis() - start) + "] ms");
+ LOG.info(
+ "deleteAllOlderThan: min=[{}] exit. Deleted[{}] records. Number of records deleted [{}] took [{}]ms",
+ new Object[] { min, deleteIdSet, deleteIdSet.size(),
+ (System.currentTimeMillis() - start) });
return deleteIdSet;
}
@@ -543,6 +522,7 @@
tmx.abortMultipartUploads(bucket, startTime);
tmx.shutdownNow();
s3service.shutdown();
+ asyncWriteExecuter.shutdownNow();
LOG.info("S3Backend closed.");
}
@@ -588,21 +568,15 @@
throw new DataStoreException("Collision: " + key
+ " new length: " + file.length() + " old length: " + l);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(key + " exists, lastmodified ="
- + objectMetaData.getLastModified().getTime());
- }
+ LOG.debug("[{}]'s exists, lastmodified = [{}]", key,
+ objectMetaData.getLastModified().getTime());
CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
bucket, key);
copReq.setNewObjectMetadata(objectMetaData);
s3service.copyObject(copReq);
- if (LOG.isDebugEnabled()) {
- LOG.debug("lastModified of " + identifier.toString()
- + " updated successfully");
- }
+ LOG.debug("lastModified of [{}] updated successfully.", identifier);
if (callback != null) {
- callback.call(identifier, file,
- AsyncUploadCallback.RESULT.SUCCESS);
+ callback.onSuccess(new AsyncUploadResult(identifier, file));
}
}
@@ -613,27 +587,22 @@
file));
// wait for upload to finish
if (asyncUpload) {
- up.addProgressListener(new S3UploadProgressListener(
+ up.addProgressListener(new S3UploadProgressListener(up,
identifier, file, callback));
- if (LOG.isDebugEnabled()) {
- LOG.debug("added upload progress listener to identifier ["
- + identifier + "]");
- }
+ LOG.debug(
+ "added upload progress listener to identifier [{}]",
+ identifier);
} else {
up.waitForUploadResult();
- if (LOG.isDebugEnabled()) {
- LOG.debug("synchronous upload to identifier ["
- + identifier + "] completed.");
- }
+ LOG.debug("synchronous upload to identifier [{}] completed.", identifier);
if (callback != null) {
- callback.call(identifier, file,
- AsyncUploadCallback.RESULT.SUCCESS);
+ callback.onSuccess(new AsyncUploadResult(
+ identifier, file));
}
}
} catch (Exception e2) {
if (!asyncUpload) {
- callback.call(identifier, file,
- AsyncUploadCallback.RESULT.ABORTED);
+ callback.onAbort(new AsyncUploadResult(identifier, file));
}
throw new DataStoreException("Could not upload " + key, e2);
}
@@ -643,11 +612,10 @@
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("write [" + identifier + "] length [" + file.length()
- + "], in async mode [" + asyncUpload + "] in ["
- + (System.currentTimeMillis() - start) + "] ms.");
- }
+ LOG.debug(
+ "write of [{}], length=[{}], in async mode [{}], in [{}]ms",
+ new Object[] { identifier, file.length(), asyncUpload,
+ (System.currentTimeMillis() - start) });
}
/**
@@ -694,8 +662,8 @@
} catch (InterruptedException ie) {
}
- LOG.info("Renamed [" + count + "] keys, time taken ["
- + ((System.currentTimeMillis() - startTime) / 1000) + "] sec");
+ LOG.info("Renamed [{}] keys, time taken [{}]sec", count,
+ ((System.currentTimeMillis() - startTime) / 1000));
// Delete older keys.
if (deleteList.size() > 0) {
DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
@@ -706,9 +674,10 @@
delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
startIndex, endIndex)));
DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
- LOG.info("Records[" + dobjs.getDeletedObjects().size()
- + "] deleted in datastore from index [" + startIndex
- + "] to [" + (endIndex - 1) + "]");
+ LOG.info(
+ "Records[{}] deleted in datastore from index [{}] to [{}]",
+ new Object[] { dobjs.getDeletedObjects().size(),
+ startIndex, (endIndex - 1) });
if (endIndex == size) {
break;
} else {
@@ -775,9 +744,7 @@
CopyObjectRequest copReq = new CopyObjectRequest(bucket,
oldKey, bucket, newS3Key);
s3service.copyObject(copReq);
- if (LOG.isDebugEnabled()) {
- LOG.debug(oldKey + " renamed to " + newS3Key);
- }
+ LOG.debug("[{}] renamed to [{}] ", oldKey, newS3Key);
} finally {
if (contextClassLoader != null) {
Thread.currentThread().setContextClassLoader(
@@ -801,24 +768,34 @@
private DataIdentifier identifier;
private AsyncUploadCallback callback;
+
+ private Upload upload;
- public S3UploadProgressListener(DataIdentifier identifier, File file,
+ public S3UploadProgressListener(Upload upload, DataIdentifier identifier, File file,
AsyncUploadCallback callback) {
super();
this.identifier = identifier;
this.file = file;
this.callback = callback;
+ this.upload = upload;
}
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventCode()) {
case ProgressEvent.COMPLETED_EVENT_CODE:
- callback.call(identifier, file,
- AsyncUploadCallback.RESULT.SUCCESS);
+ callback.onSuccess(new AsyncUploadResult(identifier, file));
break;
case ProgressEvent.FAILED_EVENT_CODE:
- callback.call(identifier, file,
- AsyncUploadCallback.RESULT.FAILED);
+ AsyncUploadResult result = new AsyncUploadResult(
+ identifier, file);
+ try {
+ AmazonClientException e = upload.waitForException();
+ if (e != null) {
+ result.setException(e);
+ }
+ } catch (InterruptedException e) {
+ }
+ callback.onFailure(result);
break;
default:
break;
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1579937)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy)
@@ -31,6 +31,26 @@
* Amazon aws secret key.
*/
public static final String SECRET_KEY = "secretKey";
+
+ /**
+ * Amazon S3 Http connection timeout.
+ */
+ public static final String S3_CONN_TIMEOUT = "connectionTimeout";
+
+ /**
+ * Amazon S3 socket timeout.
+ */
+ public static final String S3_SOCK_TIMEOUT = "socketTimeout";
+
+ /**
+ * Amazon S3 maximum connections to be used.
+ */
+ public static final String S3_MAX_CONNS = "maxConnections";
+
+ /**
+ * Amazon S3 maximum retries.
+ */
+ public static final String S3_MAX_ERR_RETRY = "maxErrorRetry";
/**
* Amazon aws S3 bucket.
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (revision 1579937)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (working copy)
@@ -62,10 +62,10 @@
AWSCredentials credentials = new BasicAWSCredentials(
prop.getProperty(S3Constants.ACCESS_KEY),
prop.getProperty(S3Constants.SECRET_KEY));
- int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout"));
- int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout"));
- int maxConnections = Integer.parseInt(prop.getProperty("maxConnections"));
- int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry"));
+ int connectionTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT));
+ int socketTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_SOCK_TIMEOUT));
+ int maxConnections = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_CONNS));
+ int maxErrorRetry = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY));
ClientConfiguration cc = new ClientConfiguration();
cc.setConnectionTimeout(connectionTimeOut);
cc.setSocketTimeout(socketTimeOut);
Index: jackrabbit-aws-ext/src/test/resources/log4j.properties
===================================================================
--- jackrabbit-aws-ext/src/test/resources/log4j.properties (revision 1579937)
+++ jackrabbit-aws-ext/src/test/resources/log4j.properties (working copy)
@@ -17,6 +17,8 @@
# this is the log4j configuration for the JCR API tests
log4j.rootLogger=INFO, file
+log4j.logger.org.apache.jackrabbit.core.data=DEBUG
+log4j.logger.org.apache.jackrabbit.aws.ext.ds=DEBUG
#log4j.logger.org.apache.jackrabbit.test=DEBUG
Index: jackrabbit-aws-ext/src/test/resources/repository_sample.xml
===================================================================
--- jackrabbit-aws-ext/src/test/resources/repository_sample.xml (revision 1579937)
+++ jackrabbit-aws-ext/src/test/resources/repository_sample.xml (working copy)
@@ -43,6 +43,7 @@
+