Index: jackrabbit-aws-ext/pom.xml
===================================================================
--- jackrabbit-aws-ext/pom.xml (revision 1576578)
+++ jackrabbit-aws-ext/pom.xml (working copy)
@@ -53,12 +53,18 @@
${project.version}
+ org.apache.jackrabbit
+ jackrabbit-data
+ ${project.version}
+ test-jar
+
+
org.slf4j
slf4j-api
1.7.5
-
-
+
+
junit
junit
test
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1576578)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy)
@@ -20,22 +20,26 @@
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.aws.ext.S3Constants;
import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +50,8 @@
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.ProgressEvent;
+import com.amazonaws.services.s3.model.ProgressListener;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.Region;
import com.amazonaws.services.s3.model.S3Object;
@@ -58,6 +64,21 @@
*/
public class S3Backend implements Backend {
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+ private AmazonS3Client s3service;
+
+ private String bucket;
+
+ private TransferManager tmx;
+
+ private CachingDataStore store;
+
+ private Properties prop = null;
+
private static final String KEY_PREFIX = "dataStore_";
/**
@@ -76,19 +97,8 @@
private static final String DASH = "-";
- /**
- * Logger instance.
- */
- private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+ private Date startTime;
- private AmazonS3Client s3service;
-
- private String bucket;
-
- private TransferManager tmx;
-
- private CachingDataStore store;
-
/**
* Initialize S3Backend. It creates AmazonS3Client and TransferManager from
* aws.properties. It creates S3 bucket if it doesn't pre-exist in S3.
@@ -99,12 +109,20 @@
if (config == null) {
config = Utils.DEFAULT_CONFIG_FILE;
}
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
- Properties prop = Utils.readConfig(config);
- LOG.debug("init");
+ startTime = new Date();
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ prop = Utils.readConfig(config);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("init");
+ }
this.store = store;
s3service = Utils.openService(prop);
- bucket = prop.getProperty(S3Constants.S3_BUCKET);
+ if (bucket == null || "".equals(bucket.trim())) {
+ bucket = prop.getProperty(S3Constants.S3_BUCKET);
+ }
String region = prop.getProperty(S3Constants.S3_REGION);
String endpoint = null;
if (!s3service.doesBucketExist(bucket)) {
@@ -130,6 +148,10 @@
endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
}
}
+ String propEndPoint = prop.getProperty(S3Constants.S3_END_POINT);
+ if (propEndPoint != null & !"".equals(propEndPoint)) {
+ endpoint = propEndPoint;
+ }
/*
* setting endpoint to remove latency of redirection. If endpoint is
* not set, invocation first goes us standard region, which
@@ -137,12 +159,39 @@
*/
s3service.setEndpoint(endpoint);
LOG.info("S3 service endpoint: " + endpoint);
- tmx = new TransferManager(s3service, createDefaultExecutorService());
- LOG.debug(" done");
+
+ int writeThreads = 10;
+ String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS);
+ if (writeThreadsStr != null) {
+ writeThreads = Integer.parseInt(writeThreadsStr);
+ }
+ LOG.info("Using thread pool of [" + writeThreads
+ + "] threads in S3 transfer manager");
+ tmx = new TransferManager(s3service,
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads,
+ new NamedThreadFactory("s3-transfer-manager-worker")));
+ String renameKeyProp = prop.getProperty(S3Constants.S3_RENAME_KEYS);
+ boolean renameKeyBool = (renameKeyProp == null || "".equals(renameKeyProp))
+ ? true
+ : Boolean.parseBoolean(renameKeyProp);
+ if (renameKeyBool) {
+ renameKeys();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("S3 Backend initialized in ["
+ + (System.currentTimeMillis() - startTime.getTime())
+ + "] ms");
+ }
} catch (Exception e) {
- LOG.debug(" error ", e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" error ", e);
+ }
throw new DataStoreException("Could not initialize S3 from "
+ config, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@@ -153,128 +202,147 @@
@Override
public void write(DataIdentifier identifier, File file)
throws DataStoreException {
- String key = getKeyName(identifier);
- ObjectMetadata objectMetaData = null;
- long start = System.currentTimeMillis();
- LOG.debug("write {0} length {1}", identifier, file.length());
- try {
- // check if the same record already exists
- try {
- objectMetaData = s3service.getObjectMetadata(bucket, key);
- } catch (AmazonServiceException ase) {
- if (ase.getStatusCode() != 404) {
- throw ase;
- }
- }
- if (objectMetaData != null) {
- long l = objectMetaData.getContentLength();
- if (l != file.length()) {
- throw new DataStoreException("Collision: " + key
- + " new length: " + file.length() + " old length: " + l);
- }
- LOG.debug(key + " exists");
- CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
- bucket, key);
- copReq.setNewObjectMetadata(objectMetaData);
- s3service.copyObject(copReq);
- LOG.debug("lastModified of " + identifier.toString()
- + " updated successfully");
- LOG.debug(" updated");
- }
- } catch (AmazonServiceException e) {
- LOG.debug(" does not exist", e);
- // not found - create it
- }
- if (objectMetaData == null) {
- LOG.debug(" creating");
- try {
- // start multipart parallel upload using amazon sdk
- Upload up = tmx.upload(new PutObjectRequest(bucket, key, file));
- // wait for upload to finish
- up.waitForUploadResult();
- LOG.debug(" done");
- } catch (Exception e2) {
- LOG.debug(" could not upload", e2);
- throw new DataStoreException("Could not upload " + key, e2);
- }
- }
- LOG.debug(" ms: {0}", System.currentTimeMillis() - start);
+ this.write(identifier, file, false, null);
}
+ @Override
+ public void writeAsync(DataIdentifier identifier, File file,
+ AsyncUploadCallback callback) throws DataStoreException {
+ if (callback == null) {
+ throw new IllegalArgumentException(
+ "callback parameter cannot be null in asyncUpload");
+ }
+ Thread th = new Thread(new AsyncUploadJob(identifier, file, callback));
+ th.start();
+ }
+
/**
* Check if record identified by identifier exists in Amazon S3.
*/
@Override
public boolean exists(DataIdentifier identifier) throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
- LOG.debug("exists {0}", identifier);
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket,
key);
if (objectMetaData != null) {
- LOG.debug(" true");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("exists [" + identifier + "]: [true] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
return true;
}
return false;
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404) {
- LOG.info("key [" + identifier.toString() + "] not found.");
+ LOG.info("exists [" + identifier + "]: [false] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
return false;
}
throw new DataStoreException(
"Error occured to getObjectMetadata for key ["
+ identifier.toString() + "]", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
- public void touch(DataIdentifier identifier, long minModifiedDate)
+ public boolean exists(DataIdentifier identifier, boolean touch)
throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ObjectMetadata objectMetaData = null;
+ boolean retVal = false;
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
- if (minModifiedDate != 0) {
- ObjectMetadata objectMetaData = s3service.getObjectMetadata(
- bucket, key);
- if (objectMetaData.getLastModified().getTime() < minModifiedDate) {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ objectMetaData = s3service.getObjectMetadata(bucket, key);
+ if (objectMetaData != null) {
+ retVal = true;
+ if (touch) {
CopyObjectRequest copReq = new CopyObjectRequest(bucket,
key, bucket, key);
copReq.setNewObjectMetadata(objectMetaData);
s3service.copyObject(copReq);
- LOG.debug("lastModified of " + identifier.toString()
- + " updated successfully");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[ " + identifier.toString()
+ + "] touched took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
}
+ } else {
+ retVal = false;
}
+
+ } catch (AmazonServiceException e) {
+ if (e.getStatusCode() == 404) {
+ retVal = false;
+ } else {
+ throw new DataStoreException(
+ "Error occured to find exists for key ["
+ + identifier.toString() + "]", e);
+ }
} catch (Exception e) {
throw new DataStoreException(
- "An Exception occurred while trying to set the last modified date of record "
+ "Error occured to find exists for key "
+ identifier.toString(), e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("exists [" + identifier + "]: [" + retVal + "] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
+ return retVal;
}
@Override
public InputStream read(DataIdentifier identifier)
throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
- LOG.debug("read {" + identifier + "}");
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
S3Object object = s3service.getObject(bucket, key);
InputStream in = object.getObjectContent();
- LOG.debug(" return");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[ " + identifier.toString() + "] read took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
return in;
} catch (AmazonServiceException e) {
throw new DataStoreException("Object not found: " + key, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
public Iterator getAllIdentifiers()
throws DataStoreException {
+ long start = System.currentTimeMillis();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
- LOG.debug("getAllIdentifiers");
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
Set ids = new HashSet();
- ObjectListing prevObjectListing = s3service.listObjects(bucket,
- KEY_PREFIX);
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
while (true) {
for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
String id = getIdentifierName(s3ObjSumm.getKey());
@@ -282,140 +350,478 @@
ids.add(new DataIdentifier(id));
}
}
- if (!prevObjectListing.isTruncated()) {
- break;
- }
+ if (!prevObjectListing.isTruncated()) break;
prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
- LOG.debug(" return");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getAllIdentifiers returned size [ " + ids.size()
+ + "] took [" + (System.currentTimeMillis() - start)
+ + "] ms");
+ }
return ids.iterator();
} catch (AmazonServiceException e) {
throw new DataStoreException("Could not list objects", e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
public long getLastModified(DataIdentifier identifier)
throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
- return object.getLastModified().getTime();
+ long lastModified = object.getLastModified().getTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Identifier [" + identifier.toString()
+ + "] 's lastModified = [" + lastModified + "] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
+ return lastModified;
} catch (AmazonServiceException e) {
- throw new DataStoreException(
- "Could not getLastModified of dataIdentifier " + identifier, e);
+ if (e.getStatusCode() == 404) {
+ LOG.info("getLastModified:Identifier [" + identifier.toString()
+ + "] not found. Took ["
+ + (System.currentTimeMillis() - start) + "]ms");
+ }
+ throw new DataStoreException(e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
public long getLength(DataIdentifier identifier) throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
- return object.getContentLength();
+ long length = object.getContentLength();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Identifier [" + identifier.toString()
+ + "] 's length = [" + length + "] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
+ return length;
} catch (AmazonServiceException e) {
throw new DataStoreException("Could not length of dataIdentifier "
+ identifier, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
public void deleteRecord(DataIdentifier identifier)
throws DataStoreException {
+ long start = System.currentTimeMillis();
String key = getKeyName(identifier);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
s3service.deleteObject(bucket, key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Identifier [" + identifier.toString()
+ + "] 's deleted. It took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ }
} catch (AmazonServiceException e) {
throw new DataStoreException(
"Could not getLastModified of dataIdentifier " + identifier, e);
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
}
}
@Override
- public List deleteAllOlderThan(long min)
+ public Set deleteAllOlderThan(long min)
throws DataStoreException {
- LOG.info("deleteAllOlderThan " + new Date(min));
- List diDeleteList = new ArrayList(30);
- ObjectListing prevObjectListing = s3service.listObjects(bucket);
- while (true) {
+ long start = System.currentTimeMillis();
+ // S3 stores lastModified to lower boundary of timestamp in ms.
+ // and hence min is reduced by 1000ms.
+ min = min - 1000;
+ Set deleteIdSet = new HashSet(30);
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ while (true) {
+ List deleteList = new ArrayList();
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ DataIdentifier identifier = new DataIdentifier(
+ getIdentifierName(s3ObjSumm.getKey()));
+ long lastModified = s3ObjSumm.getLastModified().getTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("id [" + identifier + "], lastModified ["
+ + lastModified + "]");
+ }
+ if (!store.isInUse(identifier) && lastModified < min) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add id :" + s3ObjSumm.getKey()
+ + " to delete lists");
+ }
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(
+ s3ObjSumm.getKey()));
+ deleteIdSet.add(identifier);
+ }
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
+ bucket);
+ delObjsReq.setKeys(deleteList);
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+ throw new DataStoreException(
+ "Incomplete delete object request. only "
+ + dobjs.getDeletedObjects().size() + " out of "
+ + deleteList.size() + " are deleted");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(deleteList
+ + " records deleted from datastore");
+ }
+ }
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ LOG.info("deleteAllOlderThan: min=[" + min + "] exit. Deleted ["
+ + deleteIdSet + "] records. Number of records deleted ["
+ + deleteIdSet.size() + "] took ["
+ + (System.currentTimeMillis() - start) + "] ms");
+ return deleteIdSet;
+ }
+
+ @Override
+ public void close() {
+ // backend is closing. abort all mulitpart uploads from start.
+ tmx.abortMultipartUploads(bucket, startTime);
+ tmx.shutdownNow();
+ s3service.shutdown();
+ LOG.info("S3Backend closed.");
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ private void write(DataIdentifier identifier, File file,
+ boolean asyncUpload, AsyncUploadCallback callback)
+ throws DataStoreException {
+ String key = getKeyName(identifier);
+ ObjectMetadata objectMetaData = null;
+ long start = System.currentTimeMillis();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ // check if the same record already exists
+ try {
+ objectMetaData = s3service.getObjectMetadata(bucket, key);
+ } catch (AmazonServiceException ase) {
+ if (ase.getStatusCode() != 404) {
+ throw ase;
+ }
+ }
+ if (objectMetaData != null) {
+ long l = objectMetaData.getContentLength();
+ if (l != file.length()) {
+ throw new DataStoreException("Collision: " + key
+ + " new length: " + file.length() + " old length: " + l);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(key + " exists, lastmodified ="
+ + objectMetaData.getLastModified().getTime());
+ }
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key,
+ bucket, key);
+ copReq.setNewObjectMetadata(objectMetaData);
+ s3service.copyObject(copReq);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("lastModified of " + identifier.toString()
+ + " updated successfully");
+ }
+ if (callback != null) {
+ callback.call(identifier, file,
+ AsyncUploadCallback.RESULT.SUCCESS);
+ }
+ }
+
+ if (objectMetaData == null) {
+ try {
+ // start multipart parallel upload using amazon sdk
+ Upload up = tmx.upload(new PutObjectRequest(bucket, key,
+ file));
+ // wait for upload to finish
+ if (asyncUpload) {
+ up.addProgressListener(new S3UploadProgressListener(
+ identifier, file, callback));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("added upload progress listener to identifier ["
+ + identifier + "]");
+ }
+ } else {
+ up.waitForUploadResult();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("synchronous upload to identifier ["
+ + identifier + "] completed.");
+ }
+ if (callback != null) {
+ callback.call(identifier, file,
+ AsyncUploadCallback.RESULT.SUCCESS);
+ }
+ }
+ } catch (Exception e2) {
+ if (!asyncUpload) {
+ callback.call(identifier, file,
+ AsyncUploadCallback.RESULT.ABORTED);
+ }
+ throw new DataStoreException("Could not upload " + key, e2);
+ }
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write [" + identifier + "] length [" + file.length()
+ + "], in async mode [" + asyncUpload + "] in ["
+ + (System.currentTimeMillis() - start) + "] ms.");
+ }
+ }
+
+ /**
+ * This method rename object keys in S3 concurrently. The number of
+ * concurrent threads is defined by 'maxConnections' property in
+ * aws.properties. As S3 doesn't have "move" command, this method simulate
+ * move as copy object object to new key and then delete older key.
+ */
+ private void renameKeys() throws DataStoreException {
+ long startTime = System.currentTimeMillis();
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ long count = 0;
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ ObjectListing prevObjectListing = s3service.listObjects(bucket,
+ KEY_PREFIX);
List deleteList = new ArrayList();
- for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
- DataIdentifier identifier = new DataIdentifier(
- getIdentifierName(s3ObjSumm.getKey()));
- if (!store.isInUse(identifier)
- && s3ObjSumm.getLastModified().getTime() < min) {
- LOG.info("add id :" + s3ObjSumm.getKey()
- + " to delete lists");
+ int nThreads = Integer.parseInt(prop.getProperty("maxConnections"));
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads,
+ new NamedThreadFactory("s3-object-rename-worker"));
+ boolean taskAdded = false;
+ while (true) {
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ executor.execute(new KeyRenameThread(s3ObjSumm.getKey()));
+ taskAdded = true;
+ count++;
deleteList.add(new DeleteObjectsRequest.KeyVersion(
s3ObjSumm.getKey()));
- diDeleteList.add(new DataIdentifier(
- getIdentifierName(s3ObjSumm.getKey())));
}
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ executor.shutdown();
+
+ try {
+ // Wait until all threads are finish
+ while (taskAdded
+ && !executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.info("Rename S3 keys tasks timedout. Waiting again");
+ }
+ } catch (InterruptedException ie) {
+
+ }
+ LOG.info("Renamed [" + count + "] keys, time taken ["
+ + ((System.currentTimeMillis() - startTime) / 1000) + "] sec");
+ // Delete older keys.
if (deleteList.size() > 0) {
DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
bucket);
- delObjsReq.setKeys(deleteList);
- DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
- if (dobjs.getDeletedObjects().size() != deleteList.size()) {
- throw new DataStoreException(
- "Incomplete delete object request. only "
- + dobjs.getDeletedObjects().size() + " out of "
- + deleteList.size() + " are deleted");
+ int batchSize = 500, startIndex = 0, size = deleteList.size();
+ int endIndex = batchSize < size ? batchSize : size;
+ while (endIndex <= size) {
+ delObjsReq.setKeys(Collections.unmodifiableList(deleteList.subList(
+ startIndex, endIndex)));
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ LOG.info("Records[" + dobjs.getDeletedObjects().size()
+ + "] deleted in datastore from index [" + startIndex
+ + "] to [" + (endIndex - 1) + "]");
+ if (endIndex == size) {
+ break;
+ } else {
+ startIndex = endIndex;
+ endIndex = (startIndex + batchSize) < size
+ ? (startIndex + batchSize)
+ : size;
+ }
}
- LOG.info(deleteList.size() + " records deleted from datastore");
}
- if (!prevObjectListing.isTruncated()) {
- break;
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
}
- prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
- LOG.info("deleteAllOlderThan exit");
- return diDeleteList;
}
- @Override
- public void close() {
- s3service.shutdown();
- s3service = null;
- tmx = null;
+ /**
+ * The method convert old key format to new format. For e.g. this method
+ * converts old key dataStore_004cb70c8f87d78f04da41e7547cb434094089ea to
+ * 004c-b70c8f87d78f04da41e7547cb434094089ea.
+ */
+ private static String convertKey(String oldKey)
+ throws IllegalArgumentException {
+ if (!oldKey.startsWith(KEY_PREFIX)) {
+ throw new IllegalArgumentException("[" + oldKey
+ + "] doesn't start with prefix [" + KEY_PREFIX + "]");
+ }
+ String key = oldKey.substring(KEY_PREFIX.length());
+ return key.substring(0, 4) + DASH + key.substring(4);
}
/**
* Get key from data identifier. Object is stored with key in S3.
*/
private static String getKeyName(DataIdentifier identifier) {
- return KEY_PREFIX + identifier.toString();
+ String key = identifier.toString();
+ return key.substring(0, 4) + DASH + key.substring(4);
}
/**
* Get data identifier from key.
*/
private static String getIdentifierName(String key) {
- if (!key.startsWith(KEY_PREFIX)) {
+ if (!key.contains(DASH)) {
return null;
}
- return key.substring(KEY_PREFIX.length());
+ return key.substring(0, 4) + key.substring(5);
}
/**
- * Returns a new thread pool configured with the default settings.
- *
- * @return A new thread pool configured with the default settings.
+ * The class renames object key in S3 in a thread.
*/
- private ThreadPoolExecutor createDefaultExecutorService() {
- ThreadFactory threadFactory = new ThreadFactory() {
- private int threadCount = 1;
+ private class KeyRenameThread implements Runnable {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setContextClassLoader(getClass().getClassLoader());
- thread.setName("s3-transfer-manager-worker-" + threadCount++);
- return thread;
+ private String oldKey;
+
+ public void run() {
+ ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ getClass().getClassLoader());
+ String newS3Key = convertKey(oldKey);
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket,
+ oldKey, bucket, newS3Key);
+ s3service.copyObject(copReq);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(oldKey + " renamed to " + newS3Key);
+ }
+ } finally {
+ if (contextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(
+ contextClassLoader);
+ }
}
- };
- return (ThreadPoolExecutor) Executors.newFixedThreadPool(10,
- threadFactory);
+ }
+
+ public KeyRenameThread(String oldKey) {
+ this.oldKey = oldKey;
+ }
}
+
+ /**
+ * Listener which receives callback on status of S3 upload.
+ */
+ private class S3UploadProgressListener implements ProgressListener {
+
+ private File file;
+
+ private DataIdentifier identifier;
+
+ private AsyncUploadCallback callback;
+
+ public S3UploadProgressListener(DataIdentifier identifier, File file,
+ AsyncUploadCallback callback) {
+ super();
+ this.identifier = identifier;
+ this.file = file;
+ this.callback = callback;
+ }
+
+ public void progressChanged(ProgressEvent progressEvent) {
+ switch (progressEvent.getEventCode()) {
+ case ProgressEvent.COMPLETED_EVENT_CODE:
+ callback.call(identifier, file,
+ AsyncUploadCallback.RESULT.SUCCESS);
+ break;
+ case ProgressEvent.FAILED_EVENT_CODE:
+ callback.call(identifier, file,
+ AsyncUploadCallback.RESULT.FAILED);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ /**
+ * This class implements {@link Runnable} interface to upload {@link File}
+ * to S3 asynchronously.
+ */
+ private class AsyncUploadJob implements Runnable {
+
+ private DataIdentifier identifier;
+
+ private File file;
+
+ private AsyncUploadCallback callback;
+
+ public AsyncUploadJob(DataIdentifier identifier, File file,
+ AsyncUploadCallback callback) {
+ super();
+ this.identifier = identifier;
+ this.file = file;
+ this.callback = callback;
+ }
+
+ public void run() {
+ try {
+ write(identifier, file, true, callback);
+ } catch (DataStoreException e) {
+ LOG.error("Could not upload [" + identifier + "], file[" + file
+ + "]", e);
+ }
+
+ }
+ }
}
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1576578)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy)
@@ -21,7 +21,7 @@
* Defined Amazon S3 constants.
*/
public final class S3Constants {
-
+
/**
* Amazon aws access key.
*/
@@ -41,8 +41,23 @@
* Amazon aws S3 region.
*/
public static final String S3_REGION = "s3Region";
-
+
/**
+ * Amazon aws S3 region.
+ */
+ public static final String S3_END_POINT = "s3EndPoint";
+
+ /**
+ * Constant to rename keys
+ */
+ public static final String S3_RENAME_KEYS = "s3RenameKeys";
+
+ /**
+ * Constant to rename keys
+ */
+ public static final String S3_WRITE_THREADS = "writeThreads";
+
+ /**
* private constructor so that class cannot initialized from outside.
*/
private S3Constants() {
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (revision 1576578)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java (working copy)
@@ -21,8 +21,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
@@ -31,9 +29,8 @@
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
@@ -42,10 +39,10 @@
*/
public final class Utils {
- public static final String DEFAULT_CONFIG_FILE = "aws.properties";
-
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ public static final String DEFAULT_CONFIG_FILE = "aws.properties";
+
private static final String DELETE_CONFIG_SUFFIX = ";burn";
/**
@@ -81,43 +78,20 @@
* Delete S3 bucket. This method first deletes all objects from bucket and
* then delete empty bucket.
*
- * @param prop properties to configure @link {@link AmazonS3Client} and
- * delete bucket.
+ * @param bucketName the bucket name.
*/
- public static void deleteBucket(final Properties prop) throws IOException {
- AmazonS3Client s3service = openService(prop);
- String bucketName = prop.getProperty(S3Constants.S3_BUCKET);
- if (!s3service.doesBucketExist(bucketName)) {
- return;
- }
+ public static void deleteBucket(final String bucketName) throws IOException {
+ Properties prop = readConfig(DEFAULT_CONFIG_FILE);
+ AmazonS3 s3service = openService(prop);
ObjectListing prevObjectListing = s3service.listObjects(bucketName);
while (true) {
- List deleteList = new ArrayList();
for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
- deleteList.add(new DeleteObjectsRequest.KeyVersion(
- s3ObjSumm.getKey()));
+ s3service.deleteObject(bucketName, s3ObjSumm.getKey());
}
- if (deleteList.size() > 0) {
- DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(
- bucketName);
- delObjsReq.setKeys(deleteList);
- DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
- if (dobjs.getDeletedObjects().size() != deleteList.size()) {
- throw new IOException(
- "Incomplete delete object request. only "
- + dobjs.getDeletedObjects().size() + " out of "
- + deleteList.size() + " are deleted");
- }
- LOG.info(deleteList.size()
- + " records deleted from datastore");
- }
- if (!prevObjectListing.isTruncated()) {
- break;
- }
+ if (!prevObjectListing.isTruncated()) break;
prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
}
s3service.deleteBucket(bucketName);
- s3service.shutdown();
}
/**
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java (working copy)
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jackrabbit.aws.ext.ds;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.jackrabbit.core.data.Backend;
-import org.apache.jackrabbit.core.data.CachingDataStore;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataStoreException;
-
-/**
- * An in-memory backend implementation used to speed up testing.
- */
-public class InMemoryBackend implements Backend {
-
- private HashMap data = new HashMap();
-
- private HashMap timeMap = new HashMap();
-
- @Override
- public void init(CachingDataStore store, String homeDir, String config)
- throws DataStoreException {
- // ignore
- log("init");
- }
-
- @Override
- public void close() {
- // ignore
- log("close");
- }
-
- @Override
- public boolean exists(final DataIdentifier identifier) {
- log("exists " + identifier);
- return data.containsKey(identifier);
- }
-
- @Override
- public Iterator getAllIdentifiers()
- throws DataStoreException {
- log("getAllIdentifiers");
- return data.keySet().iterator();
- }
-
- @Override
- public InputStream read(final DataIdentifier identifier)
- throws DataStoreException {
- log("read " + identifier);
- return new ByteArrayInputStream(data.get(identifier));
- }
-
- @Override
- public void write(final DataIdentifier identifier, final File file)
- throws DataStoreException {
- log("write " + identifier + " " + file.length());
- byte[] buffer = new byte[(int) file.length()];
- try {
- DataInputStream din = new DataInputStream(new FileInputStream(file));
- din.readFully(buffer);
- din.close();
- data.put(identifier, buffer);
- timeMap.put(identifier, System.currentTimeMillis());
- } catch (IOException e) {
- throw new DataStoreException(e);
- }
- }
-
- /**
- * Log a message if logging is enabled.
- *
- * @param message the message
- */
- private void log(final String message) {
- // System.out.println(message);
- }
-
- @Override
- public long getLastModified(final DataIdentifier identifier)
- throws DataStoreException {
- log("getLastModified " + identifier);
- return timeMap.get(identifier);
- }
-
- @Override
- public void deleteRecord(final DataIdentifier identifier)
- throws DataStoreException {
- timeMap.remove(identifier);
- data.remove(identifier);
- }
-
- @Override
- public List deleteAllOlderThan(final long min) {
- log("deleteAllOlderThan " + min);
- List tobeDeleted = new ArrayList();
- for (Map.Entry entry : timeMap.entrySet()) {
- DataIdentifier identifier = entry.getKey();
- long timestamp = entry.getValue();
- if (timestamp < min) {
- tobeDeleted.add(identifier);
- }
- }
- for (DataIdentifier identifier : tobeDeleted) {
- timeMap.remove(identifier);
- data.remove(identifier);
- }
- return tobeDeleted;
- }
-
- @Override
- public long getLength(final DataIdentifier identifier) throws DataStoreException {
- try {
- return data.get(identifier).length;
- } catch (Exception e) {
- throw new DataStoreException(e);
- }
- }
-
- @Override
- public void touch(final DataIdentifier identifier, final long minModifiedDate)
- throws DataStoreException {
- if (minModifiedDate > 0 && data.containsKey(identifier)) {
- timeMap.put(identifier, System.currentTimeMillis());
- }
- }
-}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java (working copy)
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.aws.ext.ds;
-
-import org.apache.jackrabbit.core.data.Backend;
-import org.apache.jackrabbit.core.data.CachingDataStore;
-
-/**
- * A caching data store that uses the in-memory backend.
- */
-public class InMemoryDataStore extends CachingDataStore {
-
- @Override
- protected Backend createBackend() {
- return new InMemoryBackend();
- }
-
- @Override
- protected String getMarkerFile() {
- return "mem.init.done";
- }
-}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3DataStoreTest.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3DataStoreTest.java (revision 0)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/S3DataStoreTest.java (working copy)
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.core.data.Backend;
+
+/**
+ * This class intialize {@link S3DataStore} with the give bucket. The other
+ * configuration are taken from configuration file. This class is implemented so
+ * that each test case run in its own bucket. It was required as deletions in
+ * bucket are not immediately reflected in the next test case.
+ */
+public class S3DataStoreTest extends S3DataStore {
+ String bucket;
+
+ public S3DataStoreTest() {
+ super();
+ }
+
+ public S3DataStoreTest(String bucket) {
+ super();
+ this.bucket = bucket;
+ }
+
+ protected Backend createBackend() {
+ Backend backend = new S3Backend();
+ ((S3Backend) backend).setBucket(bucket);
+ return backend;
+ }
+}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java (working copy)
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.aws.ext.ds;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-
-import javax.jcr.RepositoryException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.core.data.CachingDataStore;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataRecord;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.data.RandomInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test base class which covers all scenarios.
- */
-public abstract class TestCaseBase extends TestCase {
-
- /**
- * Constant describing aws properties file path.
- */
- public static final String CONFIG = "config";
-
- /**
- * Logger
- */
- protected static final Logger LOG = LoggerFactory.getLogger(TestCaseBase.class);
-
- /**
- * temp directory
- */
- private static final String TEST_DIR = "target/temp";
-
- /**
- * File path of aws properties.
- */
- protected String config;
-
- /**
- * Parameter to use in-memory backend. If false {@link S3Backend}
- */
- protected boolean memoryBackend = true;
-
- /**
- * Parameter to use local cache. If true local cache {@link LocalCache} is
- * not used.
- */
- protected boolean noCache;
-
- /**
- * Delete temporary directory.
- */
- @Override
- protected void setUp() {
- FileUtils.deleteQuietly(new File(TEST_DIR));
- }
-
- /**
- * Delete temporary directory.
- */
- @Override
- protected void tearDown() throws IOException {
- FileUtils.deleteQuietly(new File(TEST_DIR));
- }
-
- /**
- * Testcase to validate {@link DataStore#addRecord(InputStream)} API.
- */
- public void testAddRecord() {
- try {
- doAddRecordTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate {@link DataStore#getRecord(DataIdentifier)} API.
- */
- public void testGetRecord() {
- try {
- doGetRecordTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- }
- }
-
- /**
- * Testcase to validate {@link DataStore#getAllIdentifiers()} API.
- */
- public void testGetAllIdentifiers() {
- try {
- doGetAllIdentifiersTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate {@link DataStore#updateModifiedDateOnAccess(long)}
- * API.
- */
- public void testUpdateLastModifiedOnAccess() {
- try {
- doUpdateLastModifiedOnAccessTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- }
- }
-
- /**
- * Testcase to validate
- * {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.API.
- */
- public void testDeleteRecord() {
- try {
- doDeleteRecordTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate {@link DataStore#deleteAllOlderThan(long)} API.
- */
- public void testDeleteAllOlderThan() {
- try {
- doDeleteAllOlderThan();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate {@link DataStore#getRecordFromReference(String)}
- */
- public void testReference() {
- try {
- doReferenceTest();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate mixed scenario use of {@link DataStore}.
- */
- public void testSingleThread() {
- try {
- doTestSingleThread();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
- }
-
- /**
- * Testcase to validate mixed scenario use of {@link DataStore} in
- * multi-threaded concurrent environment.
- */
- public void testMultiThreaded() {
- try {
- doTestMultiThreaded();
- } catch (Exception e) {
- LOG.error("error:", e);
- fail(e.getMessage());
- }
-
- }
-
- private CachingDataStore createDataStore() throws RepositoryException {
- CachingDataStore ds = memoryBackend
- ? new InMemoryDataStore()
- : new S3DataStore();
- ds.setConfig(config);
- if (noCache) {
- ds.setCacheSize(0);
- }
- ds.init(TEST_DIR);
- return ds;
- }
-
- /**
- * Test {@link DataStore#addRecord(InputStream)} and assert length of added
- * record.
- */
- protected void doAddRecordTest() throws Exception {
- CachingDataStore ds = createDataStore();
- byte[] data = new byte[12345];
- new Random(12345).nextBytes(data);
- DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
- assertEquals(data.length, rec.getLength());
- assertRecord(data, rec);
- ds.close();
- }
-
- /**
- * Test {@link DataStore#getRecord(DataIdentifier)} and assert length and
- * inputstream.
- */
- protected void doGetRecordTest() throws Exception {
- CachingDataStore ds = createDataStore();
- byte[] data = new byte[12345];
- new Random(12345).nextBytes(data);
- DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
- rec = ds.getRecord(rec.getIdentifier());
- assertEquals(data.length, rec.getLength());
- assertRecord(data, rec);
- ds.close();
- }
-
- /**
- * Test {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.
- */
- protected void doDeleteRecordTest() throws Exception {
- CachingDataStore ds = createDataStore();
- Random random = new Random(12345);
- byte[] data1 = new byte[12345];
- random.nextBytes(data1);
- DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data1));
-
- byte[] data2 = new byte[12345];
- random.nextBytes(data2);
- DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data2));
-
- byte[] data3 = new byte[12345];
- random.nextBytes(data3);
- DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data3));
-
- ds.deleteRecord(rec2.getIdentifier());
-
- assertNull("rec2 should be null",
- ds.getRecordIfStored(rec2.getIdentifier()));
- assertEquals(new ByteArrayInputStream(data1),
- ds.getRecord(rec1.getIdentifier()).getStream());
- assertEquals(new ByteArrayInputStream(data3),
- ds.getRecord(rec3.getIdentifier()).getStream());
- ds.close();
- }
-
- /**
- * Test {@link DataStore#getAllIdentifiers()} and asserts all identifiers
- * are returned.
- */
- protected void doGetAllIdentifiersTest() throws Exception {
- CachingDataStore ds = createDataStore();
- List list = new ArrayList();
- Random random = new Random(12345);
- byte[] data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
- list.add(rec.getIdentifier());
-
- data = new byte[12345];
- random.nextBytes(data);
- rec = ds.addRecord(new ByteArrayInputStream(data));
- list.add(rec.getIdentifier());
-
- data = new byte[12345];
- random.nextBytes(data);
- rec = ds.addRecord(new ByteArrayInputStream(data));
- list.add(rec.getIdentifier());
-
- Iterator itr = ds.getAllIdentifiers();
- while (itr.hasNext()) {
- assertTrue("record found on list", list.remove(itr.next()));
- }
- assertEquals(0, list.size());
- ds.close();
- }
-
- /**
- * Asserts that timestamp of all records accessed after
- * {@link DataStore#updateModifiedDateOnAccess(long)} invocation.
- */
- protected void doUpdateLastModifiedOnAccessTest() throws Exception {
- CachingDataStore ds = createDataStore();
- Random random = new Random(12345);
- byte[] data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
-
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
-
- Thread.sleep(1000);
- long updateTime = System.currentTimeMillis();
- ds.updateModifiedDateOnAccess(updateTime);
- Thread.sleep(1000);
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
-
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
-
- rec1 = ds.getRecord(rec1.getIdentifier());
-
- assertEquals("rec1 touched", true,
- ds.getLastModified(rec1.getIdentifier()) > updateTime);
- assertEquals("rec2 not touched", true,
- ds.getLastModified(rec2.getIdentifier()) < updateTime);
- assertEquals("rec3 touched", true,
- ds.getLastModified(rec3.getIdentifier()) > updateTime);
- assertEquals("rec4 touched", true,
- ds.getLastModified(rec4.getIdentifier()) > updateTime);
- ds.close();
-
- }
-
- /**
- * Asserts that {@link DataStore#deleteAllOlderThan(long)} only deleted
- * records older than argument passed.
- */
- protected void doDeleteAllOlderThan() throws Exception {
- CachingDataStore ds = createDataStore();
- Random random = new Random(12345);
- byte[] data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
-
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
-
- Thread.sleep(2000);
- long updateTime = System.currentTimeMillis();
- ds.updateModifiedDateOnAccess(updateTime);
-
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
-
- data = new byte[12345];
- random.nextBytes(data);
- DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
-
- rec1 = ds.getRecord(rec1.getIdentifier());
- ds.clearInUse();
- assertEquals("only rec2 should be deleted", 1,
- ds.deleteAllOlderThan(updateTime));
- assertNull("rec2 should be null",
- ds.getRecordIfStored(rec2.getIdentifier()));
-
- Iterator itr = ds.getAllIdentifiers();
- List list = new ArrayList();
- list.add(rec1.getIdentifier());
- list.add(rec3.getIdentifier());
- list.add(rec4.getIdentifier());
- while (itr.hasNext()) {
- assertTrue("record found on list", list.remove(itr.next()));
- }
-
- assertEquals("touched records found", 0, list.size());
- assertEquals("rec1 touched", true,
- ds.getLastModified(rec1.getIdentifier()) > updateTime);
- assertEquals("rec3 touched", true,
- ds.getLastModified(rec3.getIdentifier()) > updateTime);
- assertEquals("rec4 touched", true,
- ds.getLastModified(rec4.getIdentifier()) > updateTime);
- ds.close();
-
- }
-
- /**
- * Test if record can be accessed via
- * {@link DataStore#getRecordFromReference(String)}
- */
- public void doReferenceTest() throws Exception {
- CachingDataStore ds = createDataStore();
- ds.setSecret("12345");
- byte[] data = new byte[12345];
- new Random(12345).nextBytes(data);
- String reference;
- DataRecord record = ds.addRecord(new ByteArrayInputStream(data));
- reference = record.getReference();
- assertReference(data, reference, ds);
- ds.close();
- }
-
- /**
- * Method to validate mixed scenario use of {@link DataStore}.
- */
- protected void doTestSingleThread() throws Exception {
- CachingDataStore ds = createDataStore();
- doTestMultiThreaded(ds, 1);
- ds.close();
- }
-
- /**
- * Method to validate mixed scenario use of {@link DataStore} in
- * multi-threaded concurrent environment.
- */
- protected void doTestMultiThreaded() throws Exception {
- CachingDataStore ds = createDataStore();
- doTestMultiThreaded(ds, 4);
- ds.close();
- }
-
- /**
- * Method to assert record with byte array.
- */
- protected void assertRecord(byte[] expected, DataRecord record)
- throws DataStoreException, IOException {
- InputStream stream = record.getStream();
- try {
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i] & 0xff, stream.read());
- }
- assertEquals(-1, stream.read());
- } finally {
- stream.close();
- }
- }
-
- /**
- * Method to run {@link TestCaseBase#doTest(DataStore, int)} in multiple
- * concurrent threads.
- */
- protected void doTestMultiThreaded(final DataStore ds, int threadCount)
- throws Exception {
- final Exception[] exception = new Exception[1];
- Thread[] threads = new Thread[threadCount];
- for (int i = 0; i < threadCount; i++) {
- final int x = i;
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- doTest(ds, x);
- } catch (Exception e) {
- exception[0] = e;
- }
- }
- };
- threads[i] = t;
- t.start();
- }
- for (int i = 0; i < threadCount; i++) {
- threads[i].join();
- }
- if (exception[0] != null) {
- throw exception[0];
- }
- }
-
- /**
- * Assert randomly read stream from record.
- */
- void doTest(DataStore ds, int offset) throws Exception {
- LOG.info(Thread.currentThread().getName() + " started.");
- ArrayList list = new ArrayList();
- HashMap map = new HashMap();
- for (int i = 0; i < 100; i++) {
- int size = 100 + i * 10;
- RandomInputStream in = new RandomInputStream(size + offset, size);
- DataRecord rec = ds.addRecord(in);
- list.add(rec);
- map.put(rec, new Integer(size));
- }
- Random random = new Random(1);
- for (int i = 0; i < list.size(); i++) {
- int pos = random.nextInt(list.size());
- DataRecord rec = list.get(pos);
- int size = map.get(rec);
- rec = ds.getRecord(rec.getIdentifier());
- assertEquals(size, rec.getLength());
- InputStream in = rec.getStream();
- RandomInputStream expected = new RandomInputStream(size + offset,
- size);
- if (random.nextBoolean()) {
- in = readInputStreamRandomly(in, random);
- }
- assertEquals(expected, in);
- in.close();
- }
- LOG.info(Thread.currentThread().getName() + " finished.");
- }
-
- InputStream readInputStreamRandomly(InputStream in, Random random)
- throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buffer = new byte[8000];
- while (true) {
- if (random.nextBoolean()) {
- int x = in.read();
- if (x < 0) {
- break;
- }
- out.write(x);
- } else {
- if (random.nextBoolean()) {
- int l = in.read(buffer);
- if (l < 0) {
- break;
- }
- out.write(buffer, 0, l);
- } else {
- int offset = random.nextInt(buffer.length / 2);
- int len = random.nextInt(buffer.length / 2);
- int l = in.read(buffer, offset, len);
- if (l < 0) {
- break;
- }
- out.write(buffer, offset, l);
- }
- }
- }
- in.close();
- return new ByteArrayInputStream(out.toByteArray());
- }
-
- /**
- * Assert two inputstream
- */
- protected void assertEquals(InputStream a, InputStream b)
- throws IOException {
- while (true) {
- int ai = a.read();
- int bi = b.read();
- assertEquals(ai, bi);
- if (ai < 0) {
- break;
- }
- }
- }
-
- /**
- * Assert inputstream read from reference.
- */
- protected void assertReference(byte[] expected, String reference,
- DataStore store) throws Exception {
- DataRecord record = store.getRecordFromReference(reference);
- assertNotNull(record);
- assertEquals(expected.length, record.getLength());
-
- InputStream stream = record.getStream();
- try {
- for (int i = 0; i < expected.length; i++) {
- assertEquals(expected[i] & 0xff, stream.read());
- }
- assertEquals(-1, stream.read());
- } finally {
- stream.close();
- }
- }
-
-}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java (working copy)
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.aws.ext.ds;
-
-import org.apache.jackrabbit.core.data.CachingDataStore;
-
-/**
- * Test {@link CachingDataStore} with InMemoryBackend and local cache on.
- */
-public class TestInMemDs extends TestCaseBase {
-
- public TestInMemDs() {
- config = null;
- memoryBackend = true;
- noCache = false;
- }
-
-}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java (working copy)
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.aws.ext.ds;
-
-import org.apache.jackrabbit.core.data.CachingDataStore;
-
-/**
- * Test {@link CachingDataStore} with InMemoryBackend and local cache off.
- */
-public class TestInMemDsCacheOff extends TestCaseBase {
-
- public TestInMemDsCacheOff() {
- config = null;
- memoryBackend = true;
- noCache = true;
- }
-}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java (working copy)
@@ -16,12 +16,26 @@
*/
package org.apache.jackrabbit.aws.ext.ds;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import java.util.Properties;
+import javax.jcr.RepositoryException;
+
import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.TestCaseBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+
/**
* Test {@link CachingDataStore} with S3Backend and local cache on. It requires
* to pass aws config file via system property. For e.g.
@@ -30,18 +44,69 @@
*/
public class TestS3Ds extends TestCaseBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(TestS3Ds.class);
public TestS3Ds() {
- config = System.getProperty(CONFIG);
- memoryBackend = false;
- noCache = false;
+ config = System.getProperty(CONFIG);
+ memoryBackend = false;
+ noCache = false;
+ }
+
+ Date startTime = null;
+ protected void setUp() throws Exception {
+ startTime = new Date();
+ super.setUp();
}
+ protected void tearDown() throws Exception {
+ deleteBucket();
+ super.tearDown();
+ }
+
+ protected CachingDataStore createDataStore() throws RepositoryException {
+ ds = new S3DataStoreTest(String.valueOf(randomGen.nextInt(9999)) + "-test");
+ ds.setConfig(config);
+ if (noCache) {
+ ds.setCacheSize(0);
+ }
+ ds.init(dataStoreDir);
+ return ds;
+ }
-
- @Override
- protected void tearDown() throws IOException {
- super.tearDown();
+ /**
+ * Cleaning of bucket after test run.
+ */
+ /**
+ * Cleaning of bucket after test run.
+ */
+ public void deleteBucket() throws Exception {
Properties props = Utils.readConfig(config);
- Utils.deleteBucket(props);
+ AmazonS3Client s3service = Utils.openService(props);
+ Backend backend = ds.getBackend();
+ String bucket = ((S3Backend)backend).getBucket();
+ LOG.info("delete bucket [" + bucket + "]");
+ TransferManager tmx = new TransferManager(s3service);
+ if (s3service.doesBucketExist(bucket)) {
+ for (int i = 0; i < 3; i++) {
+ tmx.abortMultipartUploads(bucket, startTime);
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ while (prevObjectListing != null ) {
+ List deleteList = new ArrayList();
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+ delObjsReq.setKeys(deleteList);
+ s3service.deleteObjects(delObjsReq);
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ }
+ s3service.deleteBucket(bucket);
+ LOG.info("bucket: " + bucket + " deleted");
+ tmx.shutdownNow();
+ s3service.shutdown();
+ }
}
}
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java (working copy)
@@ -17,6 +17,8 @@
package org.apache.jackrabbit.aws.ext.ds;
import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test {@link CachingDataStore} with S3Backend and local cache Off. It requires
@@ -26,6 +28,7 @@
*/
public class TestS3DsCacheOff extends TestS3Ds {
+ protected static final Logger LOG = LoggerFactory.getLogger(TestS3DsCacheOff.class);
public TestS3DsCacheOff() {
config = System.getProperty(CONFIG);
memoryBackend = false;
Index: jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
===================================================================
--- jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (revision 1576578)
+++ jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java (working copy)
@@ -21,11 +21,9 @@
import junit.framework.TestCase;
import junit.framework.TestSuite;
-import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
-import org.apache.jackrabbit.aws.ext.ds.TestInMemDs;
-import org.apache.jackrabbit.aws.ext.ds.TestInMemDsCacheOff;
import org.apache.jackrabbit.aws.ext.ds.TestS3Ds;
import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff;
+import org.apache.jackrabbit.core.data.TestCaseBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +42,9 @@
*/
public static Test suite() {
TestSuite suite = new TestSuite("S3 tests");
- suite.addTestSuite(TestLocalCache.class);
- suite.addTestSuite(TestInMemDs.class);
- suite.addTestSuite(TestInMemDsCacheOff.class);
+ System.setProperty(
+ TestCaseBase.CONFIG,
+ "C:/sourceCodeGit/granite-modules/temp-crx-ext-s3/crx-ext-s3/src/test/resources/aws.properties");
String config = System.getProperty(TestCaseBase.CONFIG);
LOG.info("config= " + config);
if (config != null && !"".equals(config.trim())) {
Index: jackrabbit-aws-ext/src/test/resources/aws.properties
===================================================================
--- jackrabbit-aws-ext/src/test/resources/aws.properties (revision 1576578)
+++ jackrabbit-aws-ext/src/test/resources/aws.properties (working copy)
@@ -32,7 +32,14 @@
# Asia Pacific (Tokyo) ap-northeast-1
# South America (Sao Paulo) sa-east-1
s3Region=
+# S3 endpoint to be used. It is optional parameter
+# and has higher precedence over endpoint derived
+# via S3 region.
+s3EndPoint=
connectionTimeout=120000
socketTimeout=120000
-maxConnections=10
+maxConnections=20
maxErrorRetry=10
+# maximum concrruent threads to write to S3.
+writeThreads=10
+
Index: jackrabbit-aws-ext/src/test/resources/repository_sample.xml
===================================================================
--- jackrabbit-aws-ext/src/test/resources/repository_sample.xml (revision 1576578)
+++ jackrabbit-aws-ext/src/test/resources/repository_sample.xml (working copy)
@@ -40,7 +40,10 @@
-
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.5
+ test
+
+
+ org.apache.jackrabbit
+ jackrabbit-core
+ ${project.version}
+ test-jar
+
\ No newline at end of file
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java (revision 0)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCache.java (working copy)
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds all in progress uploads. This class contains two data
+ * structures, one is {@link #asyncUploadMap} which is {@link Map}
+ * of file path vs lastModified of upload. The second {@link #toBeDeleted} is
+ * {@link Set} of upload which is marked for delete, while it is already
+ * in progress. Before starting an asynchronous upload, it requires to invoke
+ * {@link #add(String)} to add entry to {@link #asyncUploadMap}. After
+ * asynchronous upload completes, it requires to invoke
+ * {@link #remove(String, AsyncUploadCacheResult)} to remove entry from
+ * {@link #asyncUploadMap} Any modification to this class are immediately
+ * persisted to local file system. {@link #asyncUploadMap} is persisted to /
+ * {@link homeDir}/ {@link #PENDIND_UPLOAD_FILE}. {@link #toBeDeleted} is
+ * persisted to / {@link homeDir}/ {@link #TO_BE_DELETED_UPLOAD_FILE}. The /
+ * {@link homeDir} refer to ${rep.home}.
+ */
+public class AsyncUploadCache {
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncUploadCache.class);
+
+ /**
+ * {@link Map} of fileName Vs lastModified to store asynchronous upload.
+ */
+ Map asyncUploadMap = new HashMap();
+
+ /**
+ * {@link Set} of fileName which are mark for delete during asynchronous
+ * Upload.
+ */
+ Set toBeDeleted = new HashSet();
+
+ String path;
+
+ String homeDir;
+
+ int asyncUploadLimit;
+
+ private File pendingUploads;
+
+ private File toBeDeletedUploads;
+
+ private static final String PENDIND_UPLOAD_FILE = "async-pending-uploads.ser";
+
+ private static final String TO_BE_DELETED_UPLOAD_FILE = "async-tobedeleted-uploads.ser";
+
+ /**
+ * This methods checks if file can be added to {@link #asyncUploadMap}. If
+ * yes it adds to {@link #asyncUploadMap} and
+ * {@link #serializeAsyncUploadMap()} the {@link #asyncUploadMap} to disk.
+ *
+ * @return {@link AsyncUploadCacheResult} if successfully added to
+ * asynchronous uploads it sets
+ * {@link AsyncUploadCacheResult#setAsyncUpload(boolean)} to true
+ * else sets to false.
+ */
+ public synchronized AsyncUploadCacheResult add(String fileName)
+ throws IOException {
+ AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+ if (asyncUploadMap.entrySet().size() >= asyncUploadLimit) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Async write limit [" + asyncUploadLimit
+ + "] reached. File [" + fileName
+ + "] not added to async write cache.");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("current set =" + asyncUploadMap.keySet());
+ }
+ result.setAsyncUpload(false);
+ } else {
+ long startTime = System.currentTimeMillis();
+ if (toBeDeleted.remove(fileName)) {
+ serializeToBeDeleted();
+ }
+ asyncUploadMap.put(fileName, System.currentTimeMillis());
+ serializeAsyncUploadMap();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("added file [" + fileName
+ + "] to asyncUploadMap upoad took ["
+ + ((System.currentTimeMillis() - startTime) / 1000)
+ + "] sec");
+ LOG.debug("current set =" + asyncUploadMap.keySet());
+ }
+ result.setAsyncUpload(true);
+ }
+ return result;
+ }
+
+ /**
+ * This methods removes file (if found) from {@link #asyncUploadMap}. If
+ * file is found, it immediately serializes the {@link #asyncUploadMap} to
+ * disk. This method sets
+ * {@link AsyncUploadCacheResult#setRequiresDelete(boolean)} to true, if
+ * asynchronous upload found to be in {@link #toBeDeleted} set i.e. marked
+ * for delete.
+ */
+ public synchronized AsyncUploadCacheResult remove(String fileName)
+ throws IOException {
+ long startTime = System.currentTimeMillis();
+ Long retVal = asyncUploadMap.remove(fileName);
+ if (retVal != null) {
+ serializeAsyncUploadMap();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("removed file [" + fileName
+ + "] from asyncUploadMap took ["
+ + ((System.currentTimeMillis() - startTime) / 1000)
+ + "] sec");
+ LOG.debug("current set =" + asyncUploadMap.keySet());
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cannot remove file [" + fileName
+ + "] from pending upoad took ["
+ + ((System.currentTimeMillis() - startTime) / 1000)
+ + "] sec. File not found");
+ LOG.debug("current set =" + asyncUploadMap.keySet());
+ }
+ }
+ AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+ result.setRequiresDelete(toBeDeleted.contains(fileName));
+ return result;
+ }
+
+ /**
+ * This methods returns the in progress asynchronous uploads which are not
+ * marked for delete.
+ */
+ public synchronized Set getAll() {
+ Set retVal = new HashSet();
+ retVal.addAll(asyncUploadMap.keySet());
+ retVal.removeAll(toBeDeleted);
+ return retVal;
+ }
+
+ /**
+ * This methos checks if asynchronous upload is in progress for @param
+ * fileName. If @param touch is true, the lastModified is updated to current
+ * time.
+ */
+ public synchronized boolean hasEntry(String fileName, boolean touch)
+ throws IOException {
+ boolean contains = asyncUploadMap.containsKey(fileName)
+ && !toBeDeleted.contains(fileName);
+ if (touch && contains) {
+ long timeStamp = System.currentTimeMillis();
+ asyncUploadMap.put(fileName, timeStamp);
+ serializeAsyncUploadMap();
+ }
+ return contains;
+ }
+
+ /**
+ * Returns lastModified from {@link #asyncUploadMap} if found else returns
+ * 0.
+ */
+ public synchronized long getLastModified(String fileName) {
+ return asyncUploadMap.get(fileName) != null
+ && !toBeDeleted.contains(fileName)
+ ? asyncUploadMap.get(fileName)
+ : 0;
+ }
+
+ /**
+ * This methods deletes asynchronous upload for @param fileName if there
+ * exists asynchronous upload for @param fileName.
+ */
+ public synchronized void delete(String fileName) throws IOException {
+ boolean serialize = false;
+ if (toBeDeleted.remove(fileName)) {
+ serialize = true;
+ }
+ if (asyncUploadMap.containsKey(fileName) && toBeDeleted.add(fileName)) {
+ serialize = true;
+ }
+ if (serialize) {
+ serializeToBeDeleted();
+ }
+ }
+
+ /**
+ * Delete in progress asynchronous uploads which are older than @param min.
+ * This method leverage lastModified stored in {@link #asyncUploadMap}
+ */
+ public synchronized Set deleteOlderThan(long min)
+ throws IOException {
+ min = min - 1000;
+ LOG.info("deleteOlderThan min =" + min);
+ Set deleteSet = new HashSet();
+ for (Map.Entry entry : asyncUploadMap.entrySet()) {
+ if (entry.getValue() < min) {
+ deleteSet.add(entry.getKey());
+ }
+ }
+ if (deleteSet.size() > 0) {
+ LOG.info("deleteOlderThan set =" + deleteSet);
+ toBeDeleted.addAll(deleteSet);
+ serializeToBeDeleted();
+ }
+ return deleteSet;
+ }
+
+ /**
+ * @param homeDir
+ * home directory of repository.
+ * @param path
+ * path of the {@link LocalCache}
+ * @param asyncUploadLimit
+ * the maximum number of asynchronous uploads
+ */
+ public synchronized void init(String homeDir, String path,
+ int asyncUploadLimit) throws IOException, ClassNotFoundException {
+ this.homeDir = homeDir;
+ this.path = path;
+ this.asyncUploadLimit = asyncUploadLimit;
+ LOG.info("AsynWriteCache:homeDir [" + homeDir + "], path [" + path
+ + "], asyncUploadLimit [" + asyncUploadLimit + "].");
+ pendingUploads = new File(homeDir + "/" + PENDIND_UPLOAD_FILE);
+ if (pendingUploads.exists()) {
+ deserializeAsyncUploadMap();
+ } else {
+ pendingUploads.createNewFile();
+ asyncUploadMap = new HashMap();
+ serializeAsyncUploadMap();
+ }
+ toBeDeletedUploads = new File(homeDir + "/" + TO_BE_DELETED_UPLOAD_FILE);
+ if (toBeDeletedUploads.exists()) {
+ deserializeToBeDeleted();
+ } else {
+ toBeDeletedUploads.createNewFile();
+ asyncUploadMap = new HashMap();
+ serializeToBeDeleted();
+ }
+ }
+
+ /**
+ * Reset the {@link AsyncUploadCache} to empty {@link #asyncUploadMap} and
+ * {@link #toBeDeleted}
+ */
+ public synchronized void reset() throws IOException {
+ String filePath = pendingUploads.getAbsolutePath();
+ if (pendingUploads.exists()) {
+ if (!pendingUploads.delete()) {
+ throw new IOException("Failed to delete file [" + filePath
+ + "]");
+ }
+ }
+ pendingUploads.createNewFile();
+ asyncUploadMap = new HashMap();
+ serializeAsyncUploadMap();
+
+ filePath = toBeDeletedUploads.getAbsolutePath();
+ if (toBeDeletedUploads.exists()) {
+ if (!toBeDeletedUploads.delete()) {
+ throw new IOException("Failed to delete file [" + filePath
+ + "]");
+ }
+ }
+ toBeDeletedUploads.createNewFile();
+ toBeDeleted = new HashSet();
+ serializeToBeDeleted();
+ }
+
+ /**
+ * Serialize {@link #asyncUploadMap} to local file system.
+ */
+ private synchronized void serializeAsyncUploadMap() throws IOException {
+
+ // use buffering
+ OutputStream fos = new FileOutputStream(pendingUploads);
+ OutputStream buffer = new BufferedOutputStream(fos);
+ ObjectOutput output = new ObjectOutputStream(buffer);
+ try {
+ output.writeObject(asyncUploadMap);
+ } finally {
+ output.close();
+ }
+ }
+
+ /**
+ * Deserialize {@link #asyncUploadMap} from local file system.
+ */
+ private synchronized void deserializeAsyncUploadMap() throws IOException,
+ ClassNotFoundException {
+ // use buffering
+ InputStream fis = new FileInputStream(pendingUploads);
+ InputStream buffer = new BufferedInputStream(fis);
+ ObjectInput input = new ObjectInputStream(buffer);
+ try {
+ asyncUploadMap = (Map) input.readObject();
+ // display its data
+ } finally {
+ input.close();
+ }
+ }
+
+ /**
+ * Serialize {@link #toBeDeleted} to local file system.
+ */
+ private synchronized void serializeToBeDeleted() throws IOException {
+
+ // use buffering
+ OutputStream fos = new FileOutputStream(toBeDeletedUploads);
+ OutputStream buffer = new BufferedOutputStream(fos);
+ ObjectOutput output = new ObjectOutputStream(buffer);
+ try {
+ output.writeObject(toBeDeleted);
+ } finally {
+ output.close();
+ }
+ }
+
+ /**
+ * Deserialize {@link #toBeDeleted} from local file system.
+ */
+ private synchronized void deserializeToBeDeleted() throws IOException,
+ ClassNotFoundException {
+ // use buffering
+ InputStream fis = new FileInputStream(toBeDeletedUploads);
+ InputStream buffer = new BufferedInputStream(fis);
+ ObjectInput input = new ObjectInputStream(buffer);
+ try {
+ toBeDeleted = (Set) input.readObject();
+ } finally {
+ input.close();
+ }
+ }
+}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java (revision 0)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCacheResult.java (working copy)
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data;
+
+import java.io.File;
+
+/**
+ * This class holds result of asynchronous upload from {@link AsyncUploadCache}
+ */
+public class AsyncUploadCacheResult {
+
+ /**
+ * flag to indicate that asynchronous upload can be started on file.
+ */
+ private boolean asyncUpload;
+
+ /**
+ * flag to indicate that cached file requires to be deleted. It is
+ * applicable in case where file marked for delete before asynchronous
+ * upload completes.
+ */
+ private boolean requiresDelete;
+
+ private File file;
+
+ /**
+ * Flag to denote that asynchronous upload can be started on file.
+ */
+ public boolean canAsyncUpload() {
+ return asyncUpload;
+ }
+
+ public void setAsyncUpload(boolean asyncUpload) {
+ this.asyncUpload = asyncUpload;
+ }
+
+ /**
+ * Flag to indicate that record to be deleted from {@link Datastore}.
+ */
+ public boolean doRequiresDelete() {
+ return requiresDelete;
+ }
+
+ public void setRequiresDelete(boolean requiresDelete) {
+ this.requiresDelete = requiresDelete;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public void setFile(File file) {
+ this.file = file;
+ }
+
+}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java (revision 0)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/AsyncUploadCallback.java (working copy)
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.data;
+
+import java.io.File;
+
+/**
+ * This interface defines method which would be called along with status.
+ */
+public interface AsyncUploadCallback {
+
+ public void call(DataIdentifier identifier, File file, RESULT result);
+
+ public enum RESULT {
+ /**
+ * Asynchronous upload has succeeded.
+ */
+ SUCCESS,
+ /**
+ * Asynchronous upload has failed.
+ */
+ FAILED,
+ /**
+ * Asynchronous upload has been aborted.
+ */
+ ABORTED
+ };
+}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (revision 1576578)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/Backend.java (working copy)
@@ -20,10 +20,8 @@
import java.io.File;
import java.io.InputStream;
import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
-
-
/**
* The interface defines the backend which can be plugged into
* {@link CachingDataStore}.
@@ -33,9 +31,12 @@
/**
* This method initialize backend with the configuration.
*
- * @param store {@link CachingDataStore}
- * @param homeDir path of repository home dir.
- * @param config path of config property file.
+ * @param store
+ * {@link CachingDataStore}
+ * @param homeDir
+ * path of repository home dir.
+ * @param config
+ * path of config property file.
* @throws DataStoreException
*/
void init(CachingDataStore store, String homeDir, String config)
@@ -44,27 +45,33 @@
/**
* Return inputstream of record identified by identifier.
*
- * @param identifier identifier of record.
+ * @param identifier
+ * identifier of record.
* @return inputstream of the record.
- * @throws DataStoreException if record not found or any error.
+ * @throws DataStoreException
+ * if record not found or any error.
*/
InputStream read(DataIdentifier identifier) throws DataStoreException;
/**
* Return length of record identified by identifier.
*
- * @param identifier identifier of record.
+ * @param identifier
+ * identifier of record.
* @return length of the record.
- * @throws DataStoreException if record not found or any error.
+ * @throws DataStoreException
+ * if record not found or any error.
*/
long getLength(DataIdentifier identifier) throws DataStoreException;
/**
* Return lastModified of record identified by identifier.
*
- * @param identifier identifier of record.
+ * @param identifier
+ * identifier of record.
* @return lastModified of the record.
- * @throws DataStoreException if record not found or any error.
+ * @throws DataStoreException
+ * if record not found or any error.
*/
long getLastModified(DataIdentifier identifier) throws DataStoreException;
@@ -72,30 +79,54 @@
* Stores file to backend with identifier used as key. If key pre-exists, it
* updates the timestamp of the key.
*
- * @param identifier key of the file
- * @param file file that would be stored in backend.
- * @throws DataStoreException for any error.
+ * @param identifier
+ * key of the file
+ * @param file
+ * file that would be stored in backend.
+ * @throws DataStoreException
+ * for any error.
*/
void write(DataIdentifier identifier, File file) throws DataStoreException;
/**
- * Returns identifiers of all records that exists in backend.
+ * Write file to backend in asynchronous mode. Backend implmentation may
+ * choose not to write asynchronously but it requires to call
+ * {@link AsyncUploadCallback#call(DataIdentifier, File, com.day.crx.cloud.s3.ds.AsyncUploadCallback.RESULT)}
+ * after upload succeed or failed.
+ *
+ * @param identifier
+ * @param file
+ * @param callback
+ * Callback interface to called after upload succeed or failed.
+ * @throws DataStoreException
+ */
+ void writeAsync(DataIdentifier identifier, File file,
+ AsyncUploadCallback callback) throws DataStoreException;
+
+ /**
+ * Returns identifiers of all records that exists in backend.
+ *
* @return iterator consisting of all identifiers
* @throws DataStoreException
*/
Iterator getAllIdentifiers() throws DataStoreException;
/**
- * Update timestamp of record identified by identifier if minModifiedDate is
- * greater than record's lastModified else no op.
+ * This method check the existence of record in backend. Return true if
+ * records exists else false. This method also touch record identified by
+ * identifier if touch is true.
*
- * @throws DataStoreException if record not found.
+ * @param identifier
+ * @throws DataStoreException
*/
- void touch(DataIdentifier identifier, long minModifiedDate)
+ boolean exists(DataIdentifier identifier, boolean touch)
throws DataStoreException;
+
/**
- * This method check the existence of record in backend.
- * @param identifier identifier to be checked.
+ * This method check the existence of record in backend.
+ *
+ * @param identifier
+ * identifier to be checked.
* @return true if records exists else false.
* @throws DataStoreException
*/
@@ -103,20 +134,24 @@
/**
* Close backend and release resources like database connection if any.
+ *
* @throws DataStoreException
*/
void close() throws DataStoreException;
/**
* Delete all records which are older than timestamp.
+ *
* @param timestamp
- * @return list of identifiers which are deleted.
+ * @return {@link Set} of identifiers which are deleted.
* @throws DataStoreException
*/
- List deleteAllOlderThan(long timestamp) throws DataStoreException;
+ Set deleteAllOlderThan(long timestamp)
+ throws DataStoreException;
/**
* Delete record identified by identifier. No-op if identifier not found.
+ *
* @param identifier
* @throws DataStoreException
*/
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java (revision 1576578)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataRecord.java (working copy)
@@ -19,6 +19,8 @@
import java.io.InputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* CachingDataRecord which stores reference to {@link CachingDataStore}. This
@@ -27,6 +29,8 @@
*/
public class CachingDataRecord extends AbstractDataRecord {
+ private static final Logger LOG = LoggerFactory.getLogger(CachingDataRecord.class);
+
private final CachingDataStore store;
public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) {
@@ -39,6 +43,8 @@
try {
return store.getLastModified(getIdentifier());
} catch (DataStoreException dse) {
+ LOG.info("exception in getLastModified for identifier ["
+ + getIdentifier() + "]. returning 0.", dse);
return 0;
}
}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (revision 1576578)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (working copy)
@@ -29,15 +29,24 @@
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.RepositoryException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +70,13 @@
* <param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
* <param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
* <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ * <param name="{@link #setContinueOnAsyncUploadFailure(boolean) continueOnAsyncUploadFailure}" value="false"/>
+ * <param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/>
+ * <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/>
* </DataStore>
*/
public abstract class CachingDataStore extends AbstractDataStore implements
- MultiDataStoreAware {
+ MultiDataStoreAware, AsyncUploadCallback {
/**
* Logger instance.
@@ -88,9 +100,7 @@
* All data identifiers that are currently in use are in this set until they
* are garbage collected.
*/
- protected Map> inUse =
- Collections.synchronizedMap(new WeakHashMap>());
+ protected Map> inUse = Collections.synchronizedMap(new WeakHashMap>());
protected Backend backend;
@@ -141,11 +151,47 @@
*/
private LocalCache cache;
+ /**
+ * Caching holding pending uploads
+ */
+ private AsyncUploadCache asyncWriteCache;
+
protected abstract Backend createBackend();
protected abstract String getMarkerFile();
/**
+ * In {@link #init(String)},it resumes all incomplete asynchronous upload
+ * from {@link AsyncUploadCache} and uploads them concurrently in multiple
+ * threads. It throws {@link RepositoryException}, if file is not found in
+ * local cache for that asynchronous upload. As far as code is concerned, it
+ * is only possible when somebody has removed files from local cache
+ * manually. If there is an exception and user want to proceed with
+ * inconsistencies, set parameter continueOnAsyncUploadFailure to true in
+ * repository.xml. This will ignore {@link RepositoryException} and log all
+ * missing files and proceed after resetting {@link AsyncUploadCache} .
+ */
+ private boolean continueOnAsyncUploadFailure;
+
+ /**
+ * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and
+ * if it doesn't exists migrates all files from fileystem to {@link Backend}
+ * . This parameter governs number of threads which will upload files
+ * concurrently to {@link Backend}.
+ */
+ private int concurrentUploadsThreads = 10;
+
+ /**
+ * This parameter limits the number of asynchronous uploads slots to
+ * {@link Backend}. Once this limit is reached, further uploads to
+ * {@link Backend} are synchronous, till one of asynchronous uploads
+ * completes and make asynchronous uploads slot available. To disable
+ * asynchronous upload, set {@link #asyncUploadLimit} parameter to 0 in
+ * repository.xml. By default it is 100
+ */
+ private int asyncUploadLimit = 100;
+
+ /**
* Initialized the data store. If the path is not set, <repository
* home>/repository/datastore is used. This directory is automatically
* created if it does not yet exist. During first initialization, it upload
@@ -154,51 +200,92 @@
*/
@Override
public void init(String homeDir) throws RepositoryException {
- if (path == null) {
- path = homeDir + "/repository/datastore";
- }
- directory = new File(path);
try {
+ if (path == null) {
+ path = homeDir + "/repository/datastore";
+ tmpDir = new File(homeDir, "/repository/s3tmp");
+ } else {
+ // cache is moved from 'path' to 'path'/repository/datastore
+ tmpDir = new File(path, "/repository/s3tmp");
+ path = path + "/repository/datastore";
+ }
+ LOG.info("path=[" + path + ",] tmpPath= [" + tmpDir.getPath() + "]");
+ directory = new File(path);
mkdirs(directory);
- } catch (IOException e) {
- throw new DataStoreException("Could not create directory "
- + directory.getAbsolutePath(), e);
- }
- tmpDir = new File(homeDir, "/repository/s3tmp");
- try {
if (!mkdirs(tmpDir)) {
FileUtils.cleanDirectory(tmpDir);
LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
}
- } catch (IOException e) {
- throw new DataStoreException("Could not create directory "
- + tmpDir.getAbsolutePath(), e);
- }
- LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor
- + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor);
- backend = createBackend();
- backend.init(this, path, config);
- String markerFileName = getMarkerFile();
- if (markerFileName != null) {
- // create marker file in homeDir to avoid deletion in cache cleanup.
- File markerFile = new File(homeDir, markerFileName);
- if (!markerFile.exists()) {
- LOG.info("load files from local cache");
- loadFilesFromCache();
- try {
- markerFile.createNewFile();
- } catch (IOException e) {
- throw new DataStoreException(
+
+ asyncWriteCache = new AsyncUploadCache();
+ asyncWriteCache.init(homeDir, path, asyncUploadLimit);
+
+ backend = createBackend();
+ backend.init(this, path, config);
+ String markerFileName = getMarkerFile();
+ if (markerFileName != null) {
+ // create marker file in homeDir to avoid deletion in cache
+ // cleanup.
+ File markerFile = new File(homeDir, markerFileName);
+ if (!markerFile.exists()) {
+ LOG.info("load files from local cache");
+ uploadFilesFromCache();
+ try {
+ markerFile.createNewFile();
+ } catch (IOException e) {
+ throw new DataStoreException(
"Could not create marker file "
- + markerFile.getAbsolutePath(), e);
+ + markerFile.getAbsolutePath(), e);
+ }
+ } else {
+ LOG.info("marker file = " + markerFile.getAbsolutePath()
+ + " exists");
}
- } else {
- LOG.info("marker file = " + markerFile.getAbsolutePath()
- + " exists");
}
+ // upload any leftover async uploads to backend during last shutdown
+ Set fileList = asyncWriteCache.getAll();
+ if (fileList != null && !fileList.isEmpty()) {
+ List errorFiles = new ArrayList();
+ LOG.info("Uploading [" + fileList + "] and size ["
+ + fileList.size() + "] from AsyncUploadCache.");
+ long totalSize = 0;
+ List files = new ArrayList(fileList.size());
+ for (String fileName : fileList) {
+ File f = new File(path, fileName);
+ if (!f.exists()) {
+ errorFiles.add(fileName);
+ LOG.error("Cannot upload pending file ["
+ + f.getAbsolutePath() + "]. File doesn't exist.");
+ } else {
+ totalSize += f.length();
+ files.add(new File(homeDir, fileName));
+ }
+ }
+ new FilesUploader(files, totalSize, concurrentUploadsThreads,
+ true).upload();
+ if (!continueOnAsyncUploadFailure && errorFiles.size() > 0) {
+ LOG.error("Pending uploads of files [" + errorFiles
+ + "] failed. Files do not exist in Local cache.");
+ LOG.error("To continue set [continueOnAsyncUploadFailure] to true in Datastore configuration in repository.xml."
+ + " There would be inconsistent data in repository due the missing files. ");
+ throw new RepositoryException(
+ "Cannot upload async uploads from local cache. Files not found.");
+ } else {
+ if (errorFiles.size() > 0) {
+ LOG.error("Pending uploads of files ["
+ + errorFiles
+ + "] failed. Files do not exist"
+ + " in Local cache. Continuing as [continueOnAsyncUploadFailure] is set to true.");
+ }
+ LOG.info("Reseting AsyncWrite Cache list.");
+ asyncWriteCache.reset();
+ }
+ }
+ cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
+ cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache);
+ } catch (Exception e) {
+ throw new RepositoryException(e);
}
- cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize,
- cachePurgeTrigFactor, cachePurgeResizeFactor);
}
/**
@@ -218,6 +305,8 @@
@Override
public DataRecord addRecord(InputStream input) throws DataStoreException {
File temporary = null;
+ long startTime = System.currentTimeMillis();
+ long length = 0;
try {
temporary = newTemporaryFile();
DataIdentifier tempId = new DataIdentifier(temporary.getName());
@@ -226,23 +315,47 @@
// stream length and the message digest of the stream
MessageDigest digest = MessageDigest.getInstance(DIGEST);
OutputStream output = new DigestOutputStream(new FileOutputStream(
- temporary), digest);
+ temporary), digest);
try {
- IOUtils.copyLarge(input, output);
+ length = IOUtils.copyLarge(input, output);
} finally {
output.close();
}
+ long currTime = System.currentTimeMillis();
DataIdentifier identifier = new DataIdentifier(
- encodeHexString(digest.digest()));
+ encodeHexString(digest.digest()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getting SHA1 hash [" + identifier + "] length ["
+ + length + "], in [" + (currTime - startTime) + "] ms.");
+ }
+ String fileName = getFileName(identifier);
+ AsyncUploadCacheResult result = null;
synchronized (this) {
usesIdentifier(identifier);
- backend.write(identifier, temporary);
- String fileName = getFileName(identifier);
- cache.store(fileName, temporary);
+ // check if async upload is already in progress
+ if (!asyncWriteCache.hasEntry(fileName, true)) {
+ result = cache.store(fileName, temporary, true);
+ }
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storing [" + identifier + "] in localCache took ["
+ + (System.currentTimeMillis() - currTime) + "] ms.");
+ }
+ if (result != null) {
+ if (result.canAsyncUpload()) {
+ backend.writeAsync(identifier, result.getFile(), this);
+ } else {
+ backend.write(identifier, result.getFile());
+ }
+ }
// this will also make sure that
// tempId is not garbage collected until here
inUse.remove(tempId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("write [" + identifier + "] length [" + length
+ + "], in [" + (System.currentTimeMillis() - startTime)
+ + "] ms.");
+ }
return new CachingDataRecord(this, identifier);
} catch (NoSuchAlgorithmException e) {
throw new DataStoreException(DIGEST + " not available", e);
@@ -256,6 +369,35 @@
}
}
+ @Override
+ public DataRecord getRecord(DataIdentifier identifier)
+ throws DataStoreException {
+ String fileName = getFileName(identifier);
+ boolean touch = minModifiedDate > 0 ? true : false;
+ synchronized (this) {
+ try {
+ if (asyncWriteCache.hasEntry(fileName, touch)) {
+ usesIdentifier(identifier);
+ return new CachingDataRecord(this, identifier);
+ } else if (cache.getFileIfStored(fileName) != null) {
+ if (touch) {
+ backend.exists(identifier, touch);
+ }
+ usesIdentifier(identifier);
+ return new CachingDataRecord(this, identifier);
+ } else if (backend.exists(identifier, touch)) {
+ usesIdentifier(identifier);
+ return new CachingDataRecord(this, identifier);
+ }
+
+ } catch (IOException ioe) {
+ throw new DataStoreException("error in getting record ["
+ + identifier + "]", ioe);
+ }
+ }
+ throw new DataStoreException("Record not found: " + identifier);
+ }
+
/**
* Get a data record for the given identifier or null it data record doesn't
* exist in {@link Backend}
@@ -267,14 +409,20 @@
@Override
public DataRecord getRecordIfStored(DataIdentifier identifier)
throws DataStoreException {
+ String fileName = getFileName(identifier);
+ boolean touch = minModifiedDate > 0 ? true : false;
synchronized (this) {
- usesIdentifier(identifier);
- if (!backend.exists(identifier)) {
- return null;
+ try {
+ if (asyncWriteCache.hasEntry(fileName, touch)
+ || backend.exists(identifier, touch)) {
+ usesIdentifier(identifier);
+ return new CachingDataRecord(this, identifier);
+ }
+ } catch (IOException ioe) {
+ throw new DataStoreException(ioe);
}
- backend.touch(identifier, minModifiedDate);
- return new CachingDataRecord(this, identifier);
}
+ return null;
}
@Override
@@ -289,7 +437,15 @@
@Override
public Iterator getAllIdentifiers()
throws DataStoreException {
- return backend.getAllIdentifiers();
+ Set ids = new HashSet();
+ for (String fileName : asyncWriteCache.getAll()) {
+ ids.add(getIdentifier(fileName));
+ }
+ Iterator itr = backend.getAllIdentifiers();
+ while (itr.hasNext()) {
+ ids.add(itr.next());
+ }
+ return ids.iterator();
}
/**
@@ -301,20 +457,35 @@
throws DataStoreException {
String fileName = getFileName(identifier);
synchronized (this) {
- backend.deleteRecord(identifier);
- cache.delete(fileName);
+ try {
+ // order is important here
+ asyncWriteCache.delete(fileName);
+ backend.deleteRecord(identifier);
+ cache.delete(fileName);
+ } catch (IOException ioe) {
+ throw new DataStoreException(ioe);
+ }
}
}
@Override
public synchronized int deleteAllOlderThan(long min)
throws DataStoreException {
- List diList = backend.deleteAllOlderThan(min);
+ Set diSet = backend.deleteAllOlderThan(min);
// remove entries from local cache
- for (DataIdentifier identifier : diList) {
+ for (DataIdentifier identifier : diSet) {
cache.delete(getFileName(identifier));
}
- return diList.size();
+ try {
+ for (String fileName : asyncWriteCache.deleteOlderThan(min)) {
+ diSet.add(getIdentifier(fileName));
+ }
+ } catch (IOException e) {
+ throw new DataStoreException(e);
+ }
+ LOG.info("deleteAllOlderThan exit. Deleted [" + diSet
+ + "] records. Number of records deleted [" + diSet.size() + "]");
+ return diSet.size();
}
/**
@@ -344,9 +515,23 @@
* Return lastModified of record from {@link Backend} assuming
* {@link Backend} as a single source of truth.
*/
- public long getLastModified(DataIdentifier identifier) throws DataStoreException {
- LOG.info("accessed lastModified");
- return backend.getLastModified(identifier);
+ public long getLastModified(DataIdentifier identifier)
+ throws DataStoreException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("accessed lastModified of identifier:" + identifier);
+ }
+ String fileName = getFileName(identifier);
+ long lastModified = asyncWriteCache.getLastModified(fileName);
+ if (lastModified != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getlastModified of identifier [" + identifier
+ + "] from AsyncUploadCache = " + lastModified);
+ }
+ return lastModified;
+
+ } else {
+ return backend.getLastModified(identifier);
+ }
}
/**
@@ -358,6 +543,22 @@
Long length = cache.getFileLength(fileName);
if (length != null) {
return length.longValue();
+ } else {
+ InputStream in = null;
+ InputStream cachedStream = null;
+ try {
+ in = backend.read(identifier);
+ cachedStream = cache.store(fileName, in);
+ } catch (IOException e) {
+ throw new DataStoreException("IO Exception: " + identifier, e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(cachedStream);
+ }
+ length = cache.getFileLength(fileName);
+ if (length != null) {
+ return length.longValue();
+ }
}
return backend.getLength(identifier);
}
@@ -371,6 +572,52 @@
}
}
+ public Set getPendingUploads() {
+ return asyncWriteCache.getAll();
+ }
+
+ public void call(DataIdentifier identifier, File file,
+ AsyncUploadCallback.RESULT resultCode) {
+ String fileName = getFileName(identifier);
+ if (AsyncUploadCallback.RESULT.SUCCESS.equals(resultCode)) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Upload completed. [" + identifier + "].");
+ }
+ AsyncUploadCacheResult result = asyncWriteCache.remove(fileName);
+ if (result.doRequiresDelete()) {
+ // added record already marked for delete
+ deleteRecord(identifier);
+ }
+ } catch (IOException ie) {
+ LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+ + identifier + "], file [" + file.getAbsolutePath() + "]",
+ ie);
+ } catch (DataStoreException dse) {
+ LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+ + identifier + "], file [" + file.getAbsolutePath() + "]",
+ dse);
+ }
+ } else if (AsyncUploadCallback.RESULT.FAILED.equals(resultCode)) {
+ LOG.error("Async Upload failed. Dataidentifer [ " + identifier
+ + "], file [" + file.getAbsolutePath() + "]");
+ } else if (AsyncUploadCallback.RESULT.ABORTED.equals(resultCode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Async Upload Aborted. Dataidentifer [ " + identifier
+ + "], file [" + file.getAbsolutePath() + "]");
+ }
+ try {
+ asyncWriteCache.remove(fileName);
+ LOG.info("Async Upload Aborted. Dataidentifer [ " + identifier
+ + "], file [" + file.getAbsolutePath() + "] removed.");
+ } catch (IOException ie) {
+ LOG.warn("Cannot remove pending file upload. Dataidentifer [ "
+ + identifier + "], file [" + file.getAbsolutePath() + "]",
+ ie);
+ }
+ }
+ }
+
/**
* Returns a unique temporary file to be used for creating a new data
* record.
@@ -382,36 +629,57 @@
/**
* Load files from {@link LocalCache} to {@link Backend}.
*/
- private void loadFilesFromCache() throws RepositoryException {
+ private void uploadFilesFromCache() throws RepositoryException {
ArrayList files = new ArrayList();
listRecursive(files, directory);
long totalSize = 0;
for (File f : files) {
totalSize += f.length();
}
+ if (concurrentUploadsThreads > 1) {
+ new FilesUploader(files, totalSize, concurrentUploadsThreads, false).upload();
+ } else {
+ uploadFilesInSingleThread(files, totalSize);
+ }
+ }
+
+ private void uploadFilesInSingleThread(List files, long totalSize)
+ throws RepositoryException {
+ long startTime = System.currentTimeMillis();
+ LOG.info("Upload: {" + files.size() + "} files in single thread.");
+ long currentCount = 0;
long currentSize = 0;
long time = System.currentTimeMillis();
for (File f : files) {
long now = System.currentTimeMillis();
if (now > time + 5000) {
- LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ LOG.info("Uploaded: {" + currentCount + "}/{" + files.size()
+ + "} files, {" + currentSize + "}/{" + totalSize
+ + "} size data");
time = now;
}
- currentSize += f.length();
String name = f.getName();
- LOG.debug("upload file = " + name);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("upload file = " + name);
+ }
if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
- && f.length() > 0) {
- loadFileToBackEnd(f);
+ && f.length() > 0) {
+ uploadFileToBackEnd(f, false);
}
+ currentSize += f.length();
+ currentCount++;
}
- LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ long endTime = System.currentTimeMillis();
+ LOG.info("Uploaded: {" + currentCount + "}/{" + files.size()
+ + "} files, {" + currentSize + "}/{" + totalSize
+ + "} size data, time taken {" + ((endTime - startTime) / 1000)
+ + "} sec");
}
/**
* Traverse recursively and populate list with files.
*/
- private void listRecursive(List list, File file) {
+ private static void listRecursive(List list, File file) {
File[] files = file.listFiles();
if (files != null) {
for (File f : files) {
@@ -431,12 +699,22 @@
* file to uploaded.
* @throws DataStoreException
*/
- private void loadFileToBackEnd(File f) throws DataStoreException {
- DataIdentifier identifier = new DataIdentifier(f.getName());
- usesIdentifier(identifier);
- backend.write(identifier, f);
- LOG.debug(f.getName() + "uploaded.");
-
+ private void uploadFileToBackEnd(File f, boolean updateAsyncUploadCache)
+ throws DataStoreException {
+ try {
+ DataIdentifier identifier = new DataIdentifier(f.getName());
+ usesIdentifier(identifier);
+ backend.write(identifier, f);
+ if (updateAsyncUploadCache) {
+ String fileName = getFileName(identifier);
+ asyncWriteCache.remove(fileName);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(f.getName() + "uploaded.");
+ }
+ } catch (IOException ioe) {
+ throw new DataStoreException(ioe);
+ }
}
/**
@@ -444,11 +722,19 @@
*/
private static String getFileName(DataIdentifier identifier) {
String name = identifier.toString();
- name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
- + name.substring(4, 6) + "/" + name;
- return name;
+ return getFileName(name);
}
+ private static String getFileName(String name) {
+ return name.substring(0, 2) + "/" + name.substring(2, 4) + "/"
+ + name.substring(4, 6) + "/" + name;
+ }
+
+ private static DataIdentifier getIdentifier(String fileName) {
+ return new DataIdentifier(
+ fileName.substring(fileName.lastIndexOf("/") + 1));
+ }
+
private void usesIdentifier(DataIdentifier identifier) {
inUse.put(identifier, new WeakReference(identifier));
}
@@ -457,15 +743,15 @@
if (dir.exists()) {
if (dir.isFile()) {
throw new IOException("Can not create a directory "
- + "because a file exists with the same name: "
- + dir.getAbsolutePath());
+ + "because a file exists with the same name: "
+ + dir.getAbsolutePath());
}
return false;
}
boolean created = dir.mkdirs();
if (!created) {
throw new IOException("Could not create directory: "
- + dir.getAbsolutePath());
+ + dir.getAbsolutePath());
}
return created;
}
@@ -483,7 +769,6 @@
public void close() throws DataStoreException {
cache.close();
backend.close();
- cache = null;
}
/**
@@ -551,7 +836,6 @@
}
/**
- *
* @return path of {@link LocalCache}.
*/
public String getPath() {
@@ -602,4 +886,208 @@
this.cachePurgeResizeFactor = cachePurgeResizeFactor;
}
+ public int getConcurrentUploadsThreads() {
+ return concurrentUploadsThreads;
+ }
+
+ public void setConcurrentUploadsThreads(int concurrentUploadsThreads) {
+ this.concurrentUploadsThreads = concurrentUploadsThreads;
+ }
+
+ public int getAsyncUploadLimit() {
+ return asyncUploadLimit;
+ }
+
+ public void setAsyncUploadLimit(int asyncUploadLimit) {
+ this.asyncUploadLimit = asyncUploadLimit;
+ }
+
+ public boolean isContinueOnAsyncUploadFailure() {
+ return continueOnAsyncUploadFailure;
+ }
+
+ public void setContinueOnAsyncUploadFailure(
+ boolean continueOnAsyncUploadFailure) {
+ this.continueOnAsyncUploadFailure = continueOnAsyncUploadFailure;
+ }
+
+ public Backend getBackend() {
+ return backend;
+ }
+
+ /**
+ * This class initiates files upload in multiple threads to backend.
+ */
+ private class FilesUploader {
+ final List files;
+
+ final long totalSize;
+
+ volatile AtomicInteger currentCount = new AtomicInteger();
+
+ volatile AtomicLong currentSize = new AtomicLong();
+
+ volatile AtomicBoolean exceptionRaised = new AtomicBoolean();
+
+ DataStoreException exception;
+
+ final int threads;
+
+ final boolean updateAsyncCache;
+
+ FilesUploader(List files, long totalSize, int threads,
+ boolean updateAsyncCache) {
+ super();
+ this.files = files;
+ this.threads = threads;
+ this.totalSize = totalSize;
+ this.updateAsyncCache = updateAsyncCache;
+ }
+
+ void addCurrentCount(int delta) {
+ currentCount.addAndGet(delta);
+ }
+
+ void addCurrentSize(long delta) {
+ currentSize.addAndGet(delta);
+ }
+
+ synchronized void setException(DataStoreException exception) {
+ exceptionRaised.getAndSet(true);
+ this.exception = exception;
+ }
+
+ boolean isExceptionRaised() {
+ return exceptionRaised.get();
+ }
+
+ void logProgress() {
+ LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size()
+ + "} files, {" + currentSize.get() + "}/{" + totalSize
+ + "} size data");
+ }
+
+ void upload() throws DataStoreException {
+ long startTime = System.currentTimeMillis();
+ LOG.info(" Uploading " + files.size() + " using " + threads
+ + " threads.");
+ ExecutorService executor = Executors.newFixedThreadPool(threads,
+ new NamedThreadFactory("backend-file-upload-worker"));
+ int partitionSize = files.size() / (threads);
+ int startIndex = 0;
+ int endIndex = partitionSize;
+ for (int i = 1; i <= threads; i++) {
+ List partitionFileList = Collections.unmodifiableList(files.subList(
+ startIndex, endIndex));
+ FileUploaderThread fut = new FileUploaderThread(
+ partitionFileList, startIndex, endIndex, this,
+ updateAsyncCache);
+ executor.execute(fut);
+
+ startIndex = endIndex;
+ if (i == (threads - 1)) {
+ endIndex = files.size();
+ } else {
+ endIndex = startIndex + partitionSize;
+ }
+ }
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ executor.shutdown();
+
+ try {
+ // Wait until all threads are finish
+ while (!isExceptionRaised()
+ && !executor.awaitTermination(15, TimeUnit.SECONDS)) {
+ logProgress();
+ }
+ } catch (InterruptedException ie) {
+
+ }
+ long endTime = System.currentTimeMillis();
+ LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size()
+ + "} files, {" + currentSize.get() + "}/{" + totalSize
+ + "} size data, time taken {" + ((endTime - startTime) / 1000)
+ + "} sec");
+ if (isExceptionRaised()) {
+ executor.shutdownNow(); // Cancel currently executing tasks
+ throw exception;
+ }
+ }
+
+ }
+
+ /**
+ * This class implements {@link Runnable} interface and uploads list of
+ * files from startIndex to endIndex to {@link Backend}
+ */
+ private class FileUploaderThread implements Runnable {
+ final List files;
+
+ final FilesUploader filesUploader;
+
+ final int startIndex;
+
+ final int endIndex;
+
+ final boolean updateAsyncCache;
+
+ FileUploaderThread(List files, int startIndex, int endIndex,
+ FilesUploader controller, boolean updateAsyncCache) {
+ super();
+ this.files = files;
+ this.filesUploader = controller;
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ this.updateAsyncCache = updateAsyncCache;
+ }
+
+ public void run() {
+ long time = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Thread [ " + Thread.currentThread().getName()
+ + "]: Uploading files from startIndex[" + startIndex
+ + "] and endIndex [" + (endIndex - 1)
+ + "], both inclusive.");
+ }
+ int uploadCount = 0;
+ long uploadSize = 0;
+ try {
+ for (File f : files) {
+
+ if (filesUploader.isExceptionRaised()) {
+ break;
+ }
+ String name = f.getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("upload file = " + name);
+ }
+ if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
+ && f.length() > 0) {
+ uploadFileToBackEnd(f, updateAsyncCache);
+ }
+ uploadCount++;
+ uploadSize += f.length();
+ // update upload status at every 15 seconds.
+ long now = System.currentTimeMillis();
+ if (now > time + 15000) {
+ filesUploader.addCurrentCount(uploadCount);
+ filesUploader.addCurrentSize(uploadSize);
+ uploadCount = 0;
+ uploadSize = 0;
+ time = now;
+ }
+ }
+ // update final state.
+ filesUploader.addCurrentCount(uploadCount);
+ filesUploader.addCurrentSize(uploadSize);
+ } catch (DataStoreException e) {
+ if (!filesUploader.isExceptionRaised()) {
+ filesUploader.setException(e);
+ }
+ }
+
+ }
+ }
+
}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (revision 1576578)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (working copy)
@@ -84,6 +84,8 @@
* no-op.
*/
private volatile boolean purgeMode;
+
+ private AsyncUploadCache asyncUploadCache;
/**
* Build LRU cache of files located at 'path'. It uses lastModified property
@@ -98,66 +100,22 @@
* cache will go in auto-purge mode.
* @param cachePurgeResizeFactor after cache purge size of cache will be
* just less (cachePurgeResizeFactor * maxSize).
+ * @param asyncUploadCache {@link AsyncUploadCache}
* @throws RepositoryException
*/
- public LocalCache(final String path, final String tmpPath,
- final long maxSize, final double cachePurgeTrigFactor,
- final double cachePurgeResizeFactor) throws RepositoryException {
- this.maxSize = maxSize;
+ public LocalCache(String path, String tmpPath, long size, double cachePurgeTrigFactor,
+ double cachePurgeResizeFactor, AsyncUploadCache asyncUploadCache) throws IOException,
+ ClassNotFoundException {
+ this.maxSize = size;
directory = new File(path);
tmp = new File(tmpPath);
- cache = new LRUCache(maxSize, cachePurgeTrigFactor,
- cachePurgeResizeFactor);
- ArrayList allFiles = new ArrayList();
+ LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor
+ + ", cachePurgeTrigFactorSize = " + (cachePurgeTrigFactor * size) + ", cachePurgeResizeFactor = "
+ + (cachePurgeResizeFactor * size));
+ cache = new LRUCache(size, cachePurgeTrigFactor, cachePurgeResizeFactor);
+ this.asyncUploadCache = asyncUploadCache;
- Iterator it = FileUtils.iterateFiles(directory, null, true);
- while (it.hasNext()) {
- File f = it.next();
- allFiles.add(f);
- }
- Collections.sort(allFiles, new Comparator() {
- @Override
- public int compare(final File o1, final File o2) {
- long l1 = o1.lastModified(), l2 = o2.lastModified();
- return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
- }
- });
- String dataStorePath = directory.getAbsolutePath();
- long time = System.currentTimeMillis();
- int count = 0;
- int deletecount = 0;
- for (File f : allFiles) {
- if (f.exists()) {
- long length = f.length();
- String name = f.getPath();
- if (name.startsWith(dataStorePath)) {
- name = name.substring(dataStorePath.length());
- }
- // convert to java path format
- name = name.replace("\\", "/");
- if (name.startsWith("/") || name.startsWith("\\")) {
- name = name.substring(1);
- }
- if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) {
- count++;
- cache.put(name, length);
- } else {
- if (tryDelete(name)) {
- deletecount++;
- }
- }
- long now = System.currentTimeMillis();
- if (now > time + 5000) {
- LOG.info("Processed {" + (count + deletecount) + "}/{"
- + allFiles.size() + "}");
- time = now;
- }
- }
- }
- LOG.info("Cached {" + count + "}/{" + allFiles.size()
- + "} , currentSizeInBytes = " + cache.currentSizeInBytes);
- LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size()
- + "} files .");
+ new Thread(new CacheBuildJob()).start();
}
/**
@@ -168,51 +126,52 @@
* doesn't close the incoming inputstream.
*
* @param fileName the key of cache.
- * @param in the inputstream.
+ * @param in {@link InputStream}
* @return the (new) input stream.
*/
- public synchronized InputStream store(String fileName, final InputStream in)
+ public InputStream store(String fileName, final InputStream in)
throws IOException {
fileName = fileName.replace("\\", "/");
File f = getFile(fileName);
long length = 0;
- if (!f.exists() || isInPurgeMode()) {
- OutputStream out = null;
- File transFile = null;
- try {
- TransientFileFactory tff = TransientFileFactory.getInstance();
- transFile = tff.createTransientFile("s3-", "tmp", tmp);
- out = new BufferedOutputStream(new FileOutputStream(transFile));
- length = IOUtils.copyLarge(in, out);
- } finally {
- IOUtils.closeQuietly(out);
- }
- // rename the file to local fs cache
- if (canAdmitFile(length)
- && (f.getParentFile().exists() || f.getParentFile().mkdirs())
- && transFile.renameTo(f) && f.exists()) {
- if (transFile.exists() && transFile.delete()) {
- LOG.warn("tmp file = " + transFile.getAbsolutePath()
- + " not deleted successfully");
+ synchronized (this) {
+ if (!f.exists() || isInPurgeMode()) {
+ OutputStream out = null;
+ File transFile = null;
+ try {
+ TransientFileFactory tff = TransientFileFactory.getInstance();
+ transFile = tff.createTransientFile("s3-", "tmp", tmp);
+ out = new BufferedOutputStream(new FileOutputStream(
+ transFile));
+ length = IOUtils.copyLarge(in, out);
+ } finally {
+ IOUtils.closeQuietly(out);
}
- transFile = null;
- toBeDeleted.remove(fileName);
- if (cache.get(fileName) == null) {
+ // rename the file to local fs cache
+ if (canAdmitFile(length)
+ && (f.getParentFile().exists() || f.getParentFile().mkdirs())
+ && transFile.renameTo(f) && f.exists()) {
+ if (transFile.exists() && transFile.delete()) {
+ LOG.info("tmp file = " + transFile.getAbsolutePath()
+ + " not deleted successfully");
+ }
+ transFile = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("file [" + fileName
+ + "] added to local cache.");
+ }
cache.put(fileName, f.length());
+ } else {
+ f = transFile;
}
} else {
- f = transFile;
- }
- } else {
- // f.exists and not in purge mode
- f.setLastModified(System.currentTimeMillis());
- toBeDeleted.remove(fileName);
- if (cache.get(fileName) == null) {
+ // f.exists and not in purge mode
+ f.setLastModified(System.currentTimeMillis());
cache.put(fileName, f.length());
}
+ cache.tryPurge();
+ return new LazyFileInputStream(f);
}
- cache.tryPurge();
- return new LazyFileInputStream(f);
}
/**
@@ -224,29 +183,59 @@
* @param src file to be added to cache.
* @throws IOException
*/
- public synchronized void store(String fileName, final File src)
- throws IOException {
+ public synchronized File store(String fileName, final File src) {
+ try {
+ return store(fileName, src, false).getFile();
+ } catch (IOException ioe) {
+ LOG.warn("Exception in addding file [" + fileName + "] to local cache.", ioe);
+ }
+ return null;
+ }
+
+ /**
+ * This method add file to {@link LocalCache} and tries that file can be
+ * added to {@link AsyncUploadCache}. If file is added to
+ * {@link AsyncUploadCache} successfully, it sets
+ * {@link AsyncUploadResult#setAsyncUpload(boolean)} to true.
+ *
+ * @param fileName name of the file.
+ * @param src source file.
+ * @param tryForAsyncUpload If true it tries to add fileName to
+ * {@link AsyncUploadCache}
+ * @return {@link AsyncUploadCacheResult}. This method sets
+ * {@link AsyncUploadResult#setAsyncUpload(boolean)} to true, if
+ * fileName is added to {@link AsyncUploadCache} successfully else
+ * it sets {@link AsyncUploadCacheResult#setAsyncUpload(boolean)} to
+ * false. {@link AsyncUploadCacheResult#getFile()} contains cached
+ * file, if it is added to {@link LocalCache} or original file.
+ * @throws IOException
+ */
+ public synchronized AsyncUploadCacheResult store(String fileName, File src, boolean tryForAsyncUpload) throws IOException {
fileName = fileName.replace("\\", "/");
File dest = getFile(fileName);
File parent = dest.getParentFile();
- if (src.exists() && !dest.exists() && !src.equals(dest)
- && canAdmitFile(src.length())
- && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest))) {
- toBeDeleted.remove(fileName);
- if (cache.get(fileName) == null) {
- cache.put(fileName, dest.length());
+ AsyncUploadCacheResult result = new AsyncUploadCacheResult();
+ result.setFile(src);
+ result.setAsyncUpload(false);
+ boolean destExists = false;
+ if ((destExists = dest.exists())
+ || (src.exists() && !dest.exists() && !src.equals(dest) && canAdmitFile(src.length())
+ && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest)))) {
+ if (destExists) {
+ dest.setLastModified(System.currentTimeMillis());
}
-
- } else if (dest.exists()) {
- dest.setLastModified(System.currentTimeMillis());
- toBeDeleted.remove(fileName);
- if (cache.get(fileName) == null) {
- cache.put(fileName, dest.length());
+ cache.put(fileName, dest.length());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("file [" + fileName + "] added to local cache.");
}
+ result.setFile(dest);
+ if (tryForAsyncUpload) {
+ result.setAsyncUpload(asyncUploadCache.add(fileName).canAsyncUpload());
+ }
}
cache.tryPurge();
+ return result;
}
-
/**
* Return the inputstream from from cache, or null if not in the cache.
*
@@ -254,16 +243,23 @@
* @return stream or null.
*/
public InputStream getIfStored(String fileName) throws IOException {
+ File file = getFileIfStored(fileName);
+ return file == null ? null : new LazyFileInputStream(file);
+ }
+ public synchronized File getFileIfStored(String fileName) throws IOException {
fileName = fileName.replace("\\", "/");
File f = getFile(fileName);
- synchronized (this) {
- if (!f.exists() || isInPurgeMode()) {
- log("purgeMode true or file doesn't exists: getIfStored returned");
- return null;
- }
+ // return file in purge mode = true and file present in asyncUploadCache
+ // as asyncUploadCache's files will be not be deleted in cache purge.
+ if (!f.exists() || (isInPurgeMode() && !asyncUploadCache.hasEntry(fileName, false))) {
+ log("purgeMode true or file doesn't exists: getFileIfStored returned");
+ return null;
+ } else {
+ // touch entry in LRU caches
+ cache.put(fileName, f.length());
f.setLastModified(System.currentTimeMillis());
- return new LazyFileInputStream(f);
+ return f;
}
}
@@ -286,17 +282,20 @@
* Returns length of file if exists in cache else returns null.
* @param fileName name of the file.
*/
- public Long getFileLength(String fileName) {
- fileName = fileName.replace("\\", "/");
- File f = getFile(fileName);
- synchronized (this) {
- if (!f.exists() || isInPurgeMode()) {
- log("purgeMode true or file doesn't exists: getFileLength returned");
- return null;
+ public synchronized Long getFileLength(String fileName) {
+ Long length = null;
+ try {
+ length = cache.get(fileName);
+ if( length == null ) {
+ File f = getFileIfStored(fileName);
+ if (f != null) {
+ length = f.length();
+ }
}
- f.setLastModified(System.currentTimeMillis());
- return f.length();
+ } catch (IOException ignore) {
+
}
+ return length;
}
/**
@@ -315,11 +314,10 @@
* @return true if yes else return false.
*/
private synchronized boolean canAdmitFile(final long length) {
- // order is important here
- boolean value = !isInPurgeMode() && cache.canAdmitFile(length);
+ //order is important here
+ boolean value = !isInPurgeMode() && (cache.canAdmitFile(length));
if (!value) {
- log("cannot admit file of length=" + length
- + " and currentSizeInBytes=" + cache.currentSizeInBytes);
+ log("cannot admit file of length=" + length + " and currentSizeInBytes=" + cache.currentSizeInBytes);
}
return value;
}
@@ -410,11 +408,11 @@
final long maxSizeInBytes;
- long cachePurgeResize;
+ final long cachePurgeResize;
- private long cachePurgeTrigSize;
+ final long cachePurgeTrigSize;
- public LRUCache(final long maxSizeInBytes,
+ LRUCache(final long maxSizeInBytes,
final double cachePurgeTrigFactor,
final double cachePurgeResizeFactor) {
super(maxSizeElements(maxSizeInBytes), (float) 0.75, true);
@@ -433,20 +431,32 @@
public synchronized Long remove(final Object key) {
String fileName = (String) key;
fileName = fileName.replace("\\", "/");
+ try {
+ // not removing file from local cache, if there is in progress
+ // async upload on it.
+ if (asyncUploadCache.hasEntry(fileName, false)) {
+ LOG.info("AsyncUploadCache upload contains file [" + fileName
+ + "]. Not removing it from LocalCache.");
+ return null;
+ }
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("error: ", e);
+ }
+ return null;
+ }
Long flength = null;
if (tryDelete(fileName)) {
flength = super.remove(key);
if (flength != null) {
- log("cache entry { " + fileName + "} with size {" + flength
- + "} removed.");
+ log("cache entry { " + fileName + "} with size {" + flength + "} removed.");
currentSizeInBytes -= flength.longValue();
}
} else if (!getFile(fileName).exists()) {
// second attempt. remove from cache if file doesn't exists
flength = super.remove(key);
if (flength != null) {
- log(" file not exists. cache entry { " + fileName
- + "} with size {" + flength + "} removed.");
+ log(" file not exists. cache entry { " + fileName + "} with size {" + flength + "} removed.");
currentSizeInBytes -= flength.longValue();
}
}
@@ -454,10 +464,15 @@
}
@Override
- public synchronized Long put(final String key, final Long value) {
- long flength = value.longValue();
- currentSizeInBytes += flength;
- return super.put(key.replace("\\", "/"), value);
+ public synchronized Long put(final String fileName, final Long value) {
+ Long oldValue = cache.get(fileName);
+ if (oldValue == null) {
+ long flength = value.longValue();
+ currentSizeInBytes += flength;
+ return super.put(fileName.replace("\\", "/"), value);
+ }
+ toBeDeleted.remove(fileName);
+ return oldValue;
}
/**
@@ -468,10 +483,14 @@
synchronized void tryPurge() {
if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) {
setPurgeMode(true);
- LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes
- + "] exceeds (cachePurgeTrigSize)["
- + cache.cachePurgeTrigSize + "]");
+ LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes + "] exceeds (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize
+ + "]");
new Thread(new PurgeJob()).start();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("currentSizeInBytes[" + cache.currentSizeInBytes + "] and (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize
+ + "], isInPurgeMode =[" + isInPurgeMode() + "]");
+ }
}
}
/**
@@ -532,4 +551,61 @@
}
}
}
+
+ private class CacheBuildJob implements Runnable {
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ ArrayList allFiles = new ArrayList();
+ @SuppressWarnings("unchecked")
+ Iterator it = FileUtils.iterateFiles(directory, null, true);
+ while (it.hasNext()) {
+ File f = (File) it.next();
+ allFiles.add(f);
+ }
+ long t1 = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to recursive [" + allFiles.size() + "] took [" + ((t1 - startTime) / 1000) + "]sec");
+ }
+ Collections.sort(allFiles, new Comparator() {
+ public int compare(File o1, File o2) {
+ long l1 = o1.lastModified(), l2 = o2.lastModified();
+ return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
+ }
+ });
+ long t2 = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to sort [" + allFiles.size() + "] took [" + ((t2 - t1) / 1000) + "]sec");
+ }
+ String dataStorePath = directory.getAbsolutePath();
+ long time = System.currentTimeMillis();
+ int count = 0;
+ for (File f : allFiles) {
+ if (f.exists()) {
+ count++;
+ long length = f.length();
+ String name = f.getPath();
+ if (name.startsWith(dataStorePath)) {
+ name = name.substring(dataStorePath.length());
+ }
+ // convert to java path format
+ name = name.replace("\\", "/");
+ if (name.startsWith("/") || name.startsWith("\\")) {
+ name = name.substring(1);
+ }
+ store(name, f);
+ long now = System.currentTimeMillis();
+ if (now > time + 10000) {
+ LOG.info("Processed {" + (count) + "}/{" + allFiles.size() + "}");
+ time = now;
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processed {" + count + "}/{" + allFiles.size() + "} , currentSizeInBytes = " + cache.currentSizeInBytes
+ + ", maxSizeInBytes = " + cache.maxSizeInBytes + ", cache.filecount = " + cache.size());
+ }
+ long t3 = System.currentTimeMillis();
+ LOG.info("Time to build cache of [" + allFiles.size() + "] took [" + ((t3 - startTime) / 1000) + "]sec");
+ }
+ }
}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (revision 0)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (working copy)
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class extends {@link ThreadFactory} to creates named threads.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+
+ private AtomicInteger threadCount = new AtomicInteger(1);
+
+ String threadPrefixName;
+
+ public NamedThreadFactory(String threadPrefixName) {
+ super();
+ this.threadPrefixName = threadPrefixName;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(getClass().getClassLoader());
+ thread.setName(threadPrefixName + "-" + threadCount.getAndIncrement());
+ return thread;
+ }
+
+}