200b460cb6bee9ebf730f9b67b45a2487f80b931
jackrabbit-aws-ext/README.txt | 29 +
jackrabbit-aws-ext/pom.xml | 96 ++++
.../org/apache/jackrabbit/aws/ext/LocalCache.java | 465 ++++++++++++++++
.../org/apache/jackrabbit/aws/ext/S3Constants.java | 44 ++
.../java/org/apache/jackrabbit/aws/ext/Utils.java | 144 +++++
.../org/apache/jackrabbit/aws/ext/ds/Backend.java | 122 ++++
.../jackrabbit/aws/ext/ds/CachingDataRecord.java | 53 ++
.../jackrabbit/aws/ext/ds/CachingDataStore.java | 477 ++++++++++++++++
.../apache/jackrabbit/aws/ext/ds/S3Backend.java | 351 ++++++++++++
.../apache/jackrabbit/aws/ext/ds/S3DataStore.java | 32 ++
.../org/apache/jackrabbit/aws/ext/TestAll.java | 54 ++
.../apache/jackrabbit/aws/ext/TestLocalCache.java | 153 +++++
.../jackrabbit/aws/ext/ds/InMemoryBackend.java | 131 +++++
.../jackrabbit/aws/ext/ds/InMemoryDataStore.java | 35 ++
.../apache/jackrabbit/aws/ext/ds/TestCaseBase.java | 584 ++++++++++++++++++++
.../apache/jackrabbit/aws/ext/ds/TestInMemDs.java | 44 ++
.../jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java | 44 ++
.../org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java | 99 ++++
.../jackrabbit/aws/ext/ds/TestS3DsCacheOff.java | 30 +
.../src/test/resources/aws.properties | 38 ++
.../src/test/resources/log4j.properties | 29 +
.../src/test/resources/repository_sample.xml | 181 ++++++
pom.xml | 1 +
23 files changed, 3236 insertions(+)
diff --git a/jackrabbit-aws-ext/README.txt b/jackrabbit-aws-ext/README.txt
new file mode 100644
index 0000000..25df31d
--- /dev/null
+++ b/jackrabbit-aws-ext/README.txt
@@ -0,0 +1,29 @@
+====================================================
+Welcome to Jackrabbit Amazon WebServices Extension
+====================================================
+
+This is the Amazon Webservices Extension component of the Apache Jackrabbit project.
+This component contains S3 Datastore which stores binaries on Amazon S3 (http://aws.amazon.com/s3).
+
+====================================================
+Build Instructions
+====================================================
+To build the latest SNAPSHOT versions of all the components
+included here, run the following command with Maven 3:
+
+ mvn clean install
+
+To run testcases which stores in S3 bucket pass pass aws config file via system property. For e.g.
+
+ mvn clean install -Dconfig=/opt/cq/aws.properties
+
+Sample aws properties located at src/test/resources/aws.properties
+
+====================================================
+Configuration Instructions
+====================================================
+It require to configure aws.properties to configure S3 Datastore.
+
+
+
+
diff --git a/jackrabbit-aws-ext/pom.xml b/jackrabbit-aws-ext/pom.xml
new file mode 100644
index 0000000..fdd0626
--- /dev/null
+++ b/jackrabbit-aws-ext/pom.xml
@@ -0,0 +1,96 @@
+
+
+
+
+
+ 4.0.0
+
+
+
+
+
+ org.apache.jackrabbit
+ jackrabbit-parent
+ 2.8-SNAPSHOT
+ ../jackrabbit-parent/pom.xml
+
+ jackrabbit-aws-ext
+ Jackrabbit AWS Extension
+ Jackrabbit extenstion to Amazon Webservices
+
+
+
+
+
+
+ javax.jcr
+ jcr
+
+
+ com.day.crx
+ crx-api
+ 1.4.0
+
+
+ org.apache.jackrabbit
+ jackrabbit-jcr-commons
+ ${project.version}
+
+
+ com.amazonaws
+ aws-java-sdk
+ 1.4.4.1
+
+
+ org.apache.jackrabbit
+ jackrabbit-core
+ ${project.version}
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.5
+
+
+
+ junit
+ junit
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.5
+ test
+
+
+ org.apache.jackrabbit
+ jackrabbit-core
+ ${project.version}
+ test-jar
+
+
+
+
+
+ maven-surefire-plugin
+
+
+ **/aws/**/TestAll.java
+
+ -Xmx128m -Djava.awt.headless=true
+
+
+
+
+
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java
new file mode 100644
index 0000000..af5a224
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/LocalCache.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.data.LazyFileInputStream;
+import org.apache.jackrabbit.util.TransientFileFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements a LRU cache for input streams.
+ */
+public class LocalCache {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(LocalCache.class);
+
+ /**
+ * The directory where the files are created.
+ */
+ private final File directory;
+
+ /**
+ * The directory where tmp files are created.
+ */
+ private final File tmp;
+
+ /**
+ * The file names of the files that need to be deleted.
+ */
+ private final Set toBeDeleted = new HashSet();
+
+ /**
+ * The size in bytes.
+ */
+ private long size;
+
+ /**
+ * The filename Vs file size LRU cache.
+ */
+ private LRUCache cache;
+
+ /**
+ * If true cache is in purgeMode and not available. All operation would be no-op.
+ */
+ private volatile boolean purgeMode = false;
+
+ /**
+ * Build LRU cache of files located at 'path'. If cache size exceeds configurable size, all further entries are deleted.
+ *
+ * @param path
+ * @param tmpPath
+ * @param size
+ * @param cachePurgeTrigFactor
+ * @param cachePurgeResizeFactor
+ * @throws RepositoryException
+ */
+ public LocalCache(String path, String tmpPath, long size, double cachePurgeTrigFactor, double cachePurgeResizeFactor)
+ throws RepositoryException {
+ this.size = size;
+ directory = new File(path);
+ tmp = new File(tmpPath);
+ cache = new LRUCache(size, cachePurgeTrigFactor, cachePurgeResizeFactor);
+ ArrayList allFiles = new ArrayList();
+
+ @SuppressWarnings("unchecked")
+ Iterator it = FileUtils.iterateFiles(directory, null, true);
+ while (it.hasNext()) {
+ File f = (File) it.next();
+ allFiles.add(f);
+ }
+ Collections.sort(allFiles, new Comparator() {
+ public int compare(File o1, File o2) {
+ long l1 = o1.lastModified(), l2 = o2.lastModified();
+ return l1 < l2 ? -1 : l1 > l2 ? 1 : 0;
+ }
+ });
+ String dataStorePath = directory.getAbsolutePath();
+ long time = System.currentTimeMillis();
+ int count = 0;
+ int deletecount = 0;
+ for (File f : allFiles) {
+ if (f.exists()) {
+ long length = f.length();
+ String name = f.getPath();
+ if (name.startsWith(dataStorePath)) {
+ name = name.substring(dataStorePath.length());
+ }
+ // convert to java path format
+ name = name.replace("\\", "/");
+ if (name.startsWith("/") || name.startsWith("\\")) {
+ name = name.substring(1);
+ }
+ if ((cache.currentSizeInBytes + length) < cache.maxSizeInBytes) {
+ count++;
+ cache.put(name, length);
+ } else {
+ if (tryDelete(name)) {
+ deletecount++;
+ }
+ }
+ long now = System.currentTimeMillis();
+ if (now > time + 5000) {
+ LOG.info("Processed {" + (count + deletecount) + "}/{" + allFiles.size() + "}");
+ time = now;
+ }
+ }
+ }
+ LOG.info("Cached {" + count + "}/{" + allFiles.size() + "} , currentSizeInBytes = " + cache.currentSizeInBytes);
+ LOG.info("Deleted {" + deletecount + "}/{" + allFiles.size() + "} files .");
+ }
+
+ /**
+ * Store an item in the cache and return the input stream. If cache is in purgeMode or file doesn't exists, inputstream from a transient
+ * file is returned. Otherwise inputStream from cached file is returned.
+ *
+ * @param key the key
+ * @param in the input stream
+ * @return the (new) input stream
+ */
+ public synchronized InputStream store(String fileName, InputStream in) throws IOException {
+ fileName = fileName.replace("\\", "/");
+ File f = getFile(fileName);
+ long length = 0;
+ if (!f.exists() || isInPurgeMode()) {
+ OutputStream out = null;
+ File transFile = null;
+ try {
+ TransientFileFactory tff = TransientFileFactory.getInstance();
+ transFile = tff.createTransientFile("s3-", "tmp", tmp);
+ out = new BufferedOutputStream(new FileOutputStream(transFile));
+ length = IOUtils.copyLarge(in, out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ // rename the file to local fs cache
+ if (canAdmitFile(length) && (f.getParentFile().exists() || f.getParentFile().mkdirs()) && transFile.renameTo(f) && f.exists()) {
+ if (transFile.exists() && transFile.delete()) {
+ LOG.info("tmp file = " + transFile.getAbsolutePath() + " not deleted successfully");
+ }
+ transFile = null;
+ toBeDeleted.remove(fileName);
+ if (cache.get(fileName) == null) {
+ cache.put(fileName, f.length());
+ }
+ } else {
+ f = transFile;
+ }
+ } else {
+ // f.exists and not in purge mode
+ f.setLastModified(System.currentTimeMillis());
+ toBeDeleted.remove(fileName);
+ if (cache.get(fileName) == null) {
+ cache.put(fileName, f.length());
+ }
+ }
+ cache.tryPurge();
+ return new LazyFileInputStream(f);
+ }
+
+ /**
+ * Store an item along with file in cache.
+ *
+ * @param fileName
+ * @param src
+ * @throws IOException
+ */
+
+ public synchronized void store(String fileName, File src) throws IOException {
+ fileName = fileName.replace("\\", "/");
+ File dest = getFile(fileName);
+ File parent = dest.getParentFile();
+ if (src.exists() && !dest.exists() && !src.equals(dest) && canAdmitFile(src.length()) && (parent.exists() || parent.mkdirs())
+ && (src.renameTo(dest))) {
+ toBeDeleted.remove(fileName);
+ if (cache.get(fileName) == null) {
+ cache.put(fileName, dest.length());
+ }
+
+ } else if (dest.exists()) {
+ dest.setLastModified(System.currentTimeMillis());
+ toBeDeleted.remove(fileName);
+ if (cache.get(fileName) == null) {
+ cache.put(fileName, dest.length());
+ }
+ }
+ cache.tryPurge();
+ }
+
+ /**
+ * Get the stream, or null if not in the cache.
+ *
+ * @param fileName the file name
+ * @return the stream or null
+ */
+ public InputStream getIfStored(String fileName) throws IOException {
+
+ fileName = fileName.replace("\\", "/");
+ File f = getFile(fileName);
+ synchronized (this) {
+ if (!f.exists() || isInPurgeMode()) {
+ log("purgeMode true or file doesn't exists: getIfStored returned");
+ return null;
+ } else {
+ f.setLastModified(System.currentTimeMillis());
+ return new LazyFileInputStream(f);
+ }
+ }
+ }
+
+ /**
+ * delete an item from cache.
+ *
+ * @param fileName
+ */
+
+ public synchronized void delete(String fileName) {
+ if (isInPurgeMode()) {
+ log("purgeMode true :delete returned");
+ }
+ fileName = fileName.replace("\\", "/");
+ cache.remove(fileName);
+ }
+
+ /**
+ * Return length of item if exists or null
+ *
+ * @param fileName
+ * @return
+ */
+ public Long getFileLength(String fileName) {
+ fileName = fileName.replace("\\", "/");
+ File f = getFile(fileName);
+ synchronized (this) {
+ if (!f.exists() || isInPurgeMode()) {
+ log("purgeMode true or file doesn't exists: getFileLength returned");
+ return null;
+ }
+ f.setLastModified(System.currentTimeMillis());
+ return f.length();
+ }
+ }
+
+ /**
+ * Close the cache. All temporary files are deleted.
+ */
+ public void close() {
+ log("close");
+ deleteOldFiles();
+ }
+
+ /**
+ * check if cache can admit bytes of given length.
+ *
+ * @param length
+ * @return
+ */
+ private synchronized boolean canAdmitFile(long length) {
+ // order is important here
+ boolean value = !isInPurgeMode() && (cache.canAdmitFile(length));
+ if (!value) {
+ log("cannot admit file of length=" + length + " and currentSizeInBytes=" + cache.currentSizeInBytes);
+ }
+ return value;
+ }
+
+ /**
+ * Return if cache is in purge mode
+ *
+ * @return
+ */
+ private synchronized boolean isInPurgeMode() {
+ return purgeMode || size == 0;
+ }
+
+ private synchronized void setPurgeMode(boolean purgeMode) {
+ this.purgeMode = purgeMode;
+ }
+
+ private File getFile(String fileName) {
+ return new File(directory, fileName);
+ }
+
+ private void deleteOldFiles() {
+ int initialSize = toBeDeleted.size();
+ int count = 0;
+ for (String n : new ArrayList(toBeDeleted)) {
+ if (tryDelete(n)) {
+ count++;
+ }
+ }
+ LOG.info("deleted [" + count + "]/[" + initialSize + "] files");
+ }
+
+ private boolean tryDelete(String fileName) {
+ log("cache delete " + fileName);
+ File f = getFile(fileName);
+ if (f.exists() && f.delete()) {
+ log(fileName + " deleted successfully");
+ toBeDeleted.remove(fileName);
+ while (true) {
+ f = f.getParentFile();
+ if (f.equals(directory) || f.list().length > 0) {
+ break;
+ }
+ // delete empty parent folders (except the main directory)
+ f.delete();
+ }
+ return true;
+ } else if (f.exists()) {
+ LOG.info("not able to delete file = " + f.getAbsolutePath());
+ toBeDeleted.add(fileName);
+ return false;
+ }
+ return true;
+ }
+
+ private static int maxSizeElements(long bytes) {
+ // after a CQ installation, the average item in
+ // the data store is about 52 KB
+ int count = (int) (bytes / 65535);
+ count = Math.max(1024, count);
+ count = Math.min(64 * 1024, count);
+ return count;
+ }
+
+ private void log(String s) {
+ LOG.debug(s);
+ }
+
+ /**
+ * A LRU map. Key: file name, value: file length
+ */
+ private class LRUCache extends LinkedHashMap {
+ private static final long serialVersionUID = 1L;
+
+ private volatile long currentSizeInBytes;
+
+ private final long maxSizeInBytes;
+
+ private long cachePurgeTrigSize;
+
+ private long cachePurgeResize;
+
+ public LRUCache(long maxSizeInBytes, double cachePurgeTrigFactor, double cachePurgeResizeFactor) {
+ super(maxSizeElements(maxSizeInBytes), (float) 0.75, true);
+ this.maxSizeInBytes = maxSizeInBytes;
+ this.cachePurgeTrigSize = new Double(cachePurgeTrigFactor * maxSizeInBytes).longValue();
+ this.cachePurgeResize = new Double(cachePurgeResizeFactor * maxSizeInBytes).longValue();
+ }
+
+ @Override
+ public synchronized Long remove(Object key) {
+ String fileName = (String) key;
+ fileName = fileName.replace("\\", "/");
+ Long flength = null;
+ if (tryDelete(fileName)) {
+ flength = super.remove(key);
+ if (flength != null) {
+ log("cache entry { " + fileName + "} with size {" + flength + "} removed.");
+ currentSizeInBytes -= flength.longValue();
+ }
+ } else if (!getFile(fileName).exists()) {
+ // second attempt. remove from cache if file doesn't exists
+ flength = super.remove(key);
+ if (flength != null) {
+ log(" file not exists. cache entry { " + fileName + "} with size {" + flength + "} removed.");
+ currentSizeInBytes -= flength.longValue();
+ }
+ }
+ return flength;
+ }
+
+ @Override
+ public synchronized Long put(String key, Long value) {
+ long flength = value.longValue();
+ currentSizeInBytes += flength;
+ return super.put(key.replace("\\", "/"), value);
+ }
+
+ private synchronized void tryPurge() {
+ if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) {
+ setPurgeMode(true);
+ LOG.info("currentSizeInBytes[" + cache.currentSizeInBytes + "] exceeds (cachePurgeTrigSize)[" + cache.cachePurgeTrigSize
+ + "]");
+ new Thread(new PurgeJob()).start();
+ }
+ }
+
+ private synchronized boolean canAdmitFile(long length) {
+ return (cache.currentSizeInBytes + length < cache.maxSizeInBytes);
+ }
+ }
+
+ private class PurgeJob implements Runnable {
+ public void run() {
+ try {
+ synchronized (cache) {
+ LOG.info(" cache purge job started");
+ // first try to delete toBeDeleted files
+ int initialSize = cache.size();
+ for (String fileName : new ArrayList(toBeDeleted)) {
+ cache.remove(fileName);
+ }
+ Iterator> itr = cache.entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry entry = itr.next();
+ if (entry.getKey() != null) {
+ if (cache.currentSizeInBytes > cache.cachePurgeResize) {
+ itr.remove();
+
+ } else {
+ break;
+ }
+ }
+
+ }
+ LOG.info(" cache purge job completed: cleaned [" + (initialSize - cache.size()) + "] files and currentSizeInBytes = [ "
+ + cache.currentSizeInBytes + "]");
+ }
+ } catch (Exception e) {
+ LOG.error("error in purge jobs:", e);
+ } finally {
+ setPurgeMode(false);
+ }
+ }
+ }
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
new file mode 100644
index 0000000..2b971f9
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+/**
+ * Defined Amazon S3 constants.
+ */
+public class S3Constants {
+
+ /**
+ * Amazon aws access key.
+ */
+ public static String ACCESS_KEY = "accessKey";
+
+ /**
+ * Amazon aws secret key.
+ */
+ public static String SECRET_KEY = "secretKey";
+
+ /**
+ * Amazon aws S3 bucket.
+ */
+ public static String S3_BUCKET = "s3Bucket";
+
+ /**
+ * Amazon aws S3 region.
+ */
+ public static String S3_REGION = "s3Region";
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
new file mode 100644
index 0000000..d152e0c
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/Utils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+/**
+ * Amazon S3 utilities.
+ */
+public class Utils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+ public static final String DEFAULT_CONFIG_FILE = "aws.properties";
+
+ private static final String DELETE_CONFIG_SUFFIX = ";burn";
+
+ public static void main(String... args) throws Exception {
+ for (int i = 0; i < args.length; i++) {
+ String a = args[i];
+ if ("-deleteBucket".equals(a)) {
+ deleteBucket(args[++i]);
+ }
+ }
+ }
+
+ public static final String USER_AGENT_APPLICATION = "CRX/2.0";
+
+ public static AmazonS3Client openService(Properties prop) throws IOException, AmazonServiceException {
+ AWSCredentials credentials = new BasicAWSCredentials(prop.getProperty(S3Constants.ACCESS_KEY),
+ prop.getProperty(S3Constants.SECRET_KEY));
+ int connectionTimeOut = Integer.parseInt(prop.getProperty("connectionTimeout"));
+ int socketTimeOut = Integer.parseInt(prop.getProperty("socketTimeout"));
+ int maxConnections = Integer.parseInt(prop.getProperty("maxConnections"));
+ int maxErrorRetry = Integer.parseInt(prop.getProperty("maxErrorRetry"));
+ ClientConfiguration cc = new ClientConfiguration();
+ cc.setConnectionTimeout(connectionTimeOut);
+ cc.setSocketTimeout(socketTimeOut);
+ cc.setMaxConnections(maxConnections);
+ cc.setMaxErrorRetry(maxErrorRetry);
+ return new AmazonS3Client(credentials, cc);
+ }
+
+ /**
+ * Delete an S3 bucket.
+ *
+ * @param bucketName the bucket name
+ */
+ public static void deleteBucket(String bucketName) throws Exception {
+ Properties prop = readConfig(DEFAULT_CONFIG_FILE);
+ AmazonS3 s3service = openService(prop);
+ ObjectListing prevObjectListing = s3service.listObjects(bucketName);
+ while (true) {
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ s3service.deleteObject(bucketName, s3ObjSumm.getKey());
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ s3service.deleteBucket(bucketName);
+ }
+
+ /**
+ * Read a configuration properties file. If the file name ends with ";burn", the file is deleted after reading.
+ *
+ * @param fileName the properties file name
+ * @return the properties
+ * @throw IOException if the file doesn't exist
+ */
+ public static Properties readConfig(String fileName) throws IOException {
+ boolean delete = false;
+ if (fileName.endsWith(DELETE_CONFIG_SUFFIX)) {
+ delete = true;
+ fileName = fileName.substring(0, fileName.length() - DELETE_CONFIG_SUFFIX.length());
+ }
+ if (!new File(fileName).exists()) {
+ throw new IOException("Config file not found: " + fileName);
+ }
+ Properties prop = new Properties();
+ InputStream in = null;
+ try {
+ in = new FileInputStream(fileName);
+ prop.load(in);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ if (delete) {
+ deleteIfPossible(new File(fileName));
+ }
+ }
+ return prop;
+ }
+
+ static void deleteIfPossible(File file) {
+ boolean deleted = file.delete();
+ if (!deleted) {
+ LOG.warn("Could not delete " + file.getAbsolutePath());
+ }
+ }
+
+ public static void setLongMeta(S3Object obj, String meta, long x) {
+ ObjectMetadata metadata = obj.getObjectMetadata();
+ metadata.addUserMetadata(meta, String.valueOf(x));
+ obj.setObjectMetadata(metadata);
+ }
+
+ public static long getLongMeta(S3Object obj, String meta) {
+ return Long.parseLong(obj.getObjectMetadata().getUserMetadata().get(meta).toString());
+ }
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java
new file mode 100644
index 0000000..3b130ee
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/Backend.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * A data store backend to support caching data stores.
+ */
+public interface Backend {
+
+ /**
+ * initialize backend
+ *
+ * @param store
+ * @param homeDir
+ * @param config
+ * @throws DataStoreException
+ */
+ void init(CachingDataStore store, String homeDir, String config) throws DataStoreException;
+
+ /**
+ * Return inputstream of the record
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ InputStream read(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Return the length of the record
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ long getLength(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Return lastModified of the record
+ */
+ long getLastModified(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Write file to the backend.
+ *
+ * @param identifier
+ * @param file
+ * @throws DataStoreException
+ */
+ void write(DataIdentifier identifier, File file) throws DataStoreException;
+
+ /**
+ * Return all identifiers which exist in backend.
+ *
+ * @return
+ * @throws DataStoreException
+ */
+ Iterator getAllIdentifiers() throws DataStoreException;
+
+ /**
+ * Touch record identified by identifier if minModifiedDate > record's lastModified.
+ * @param identifier
+ * @throws DataStoreException
+ */
+ void touch(DataIdentifier identifier, long minModifiedDate) throws DataStoreException;
+
+ /**
+ * Return true if records exists else false.
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ boolean exists(DataIdentifier identifier) throws DataStoreException;
+
+ /**
+ * Close backend
+ *
+ * @throws DataStoreException
+ */
+ void close() throws DataStoreException;
+
+ /**
+ * Delete all records which are older than min
+ *
+ * @param min
+ * @return
+ * @throws DataStoreException
+ */
+ List deleteAllOlderThan(long min) throws DataStoreException;
+
+ /**
+ * Delete record identified by identifier. No-op if identifier not found.
+ *
+ * @param identifier
+ * @throws DataStoreException
+ */
+ void deleteRecord(DataIdentifier identifier) throws DataStoreException;
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java
new file mode 100644
index 0000000..763580c
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataRecord.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.InputStream;
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * A data record.
+ */
+public class CachingDataRecord extends AbstractDataRecord {
+
+ private final CachingDataStore store;
+
+ public CachingDataRecord(CachingDataStore store, DataIdentifier identifier) {
+ super(store, identifier);
+ this.store = store;
+ }
+
+ public long getLastModified() {
+ try {
+ return store.getLastModified(getIdentifier());
+ } catch (DataStoreException dse) {
+ return 0;
+ }
+ }
+
+ public long getLength() throws DataStoreException {
+ return store.getLength(getIdentifier());
+ }
+
+ public InputStream getStream() throws DataStoreException {
+ return store.getStream(getIdentifier());
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java
new file mode 100644
index 0000000..631375b
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/CachingDataStore.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.ref.WeakReference;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.aws.ext.LocalCache;
+import org.apache.jackrabbit.core.data.AbstractDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A caching data store. The implementing class defines the backend to use.
+ */
+public abstract class CachingDataStore extends AbstractDataStore implements MultiDataStoreAware {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(CachingDataStore.class);
+
+ /**
+ * The digest algorithm used to uniquely identify records.
+ */
+ private static final String DIGEST = "SHA-1";
+
+ private static final String DS_STORE = ".DS_Store";
+
+ /**
+ * The minimum size of an object that should be stored in this data store.
+ */
+ private int minRecordLength = 16 * 1024;
+
+ private String path;
+
+ private File directory;
+
+ private File tmpDir;
+
+ private String secret;
+
+ /**
+ * The optional backend configuration.
+ */
+ private String config;
+
+ /**
+ * Name of the directory used for temporary files. Must be at least 3 characters.
+ */
+ private static final String TMP = "tmp";
+
+ /**
+ * The minimum modified date. If a file is accessed (read or write) with a modified date older than this value, the modified date is
+ * updated to the current time.
+ */
+ private long minModifiedDate = 0L;
+
+ private double cachePurgeTrigFactor = 0.95d;
+
+ private double cachePurgeResizeFactor = 0.85d;
+
+ /**
+ * All data identifiers that are currently in use are in this set until they are garbage collected.
+ */
+ protected Map> inUse = Collections.synchronizedMap(new WeakHashMap>());
+
+ /**
+ * The number of bytes in the cache. The default value is 64 GB.
+ */
+ private long cacheSize = 64L * 1024 * 1024 * 1024;
+
+ protected Backend backend;
+
+ /**
+ * The cache.
+ */
+ private LocalCache cache;
+
+ abstract Backend createBackend();
+
+ abstract String getMarkerFile();
+
+ /**
+ * In first initialization it upload all files from local datastore to S3 and local datastore act as a local cache.
+ *
+ * @see DataStore#init(String)
+ */
+ public void init(String homeDir) throws RepositoryException {
+ if (path == null) {
+ path = homeDir + "/repository/datastore";
+ }
+ directory = new File(path);
+ try {
+ mkdirs(directory);
+ } catch (IOException e) {
+ throw new DataStoreException("Could not create directory " + directory.getAbsolutePath(), e);
+ }
+ tmpDir = new File(homeDir, "/repository/s3tmp");
+ try {
+ if (!mkdirs(tmpDir)) {
+ FileUtils.cleanDirectory(tmpDir);
+ LOG.info("tmp = " + tmpDir.getPath() + " cleaned");
+ }
+ } catch (IOException e) {
+ throw new DataStoreException("Could not create directory " + tmpDir.getAbsolutePath(), e);
+ }
+ LOG.info("cachePurgeTrigFactor = " + cachePurgeTrigFactor + ", cachePurgeResizeFactor = " + cachePurgeResizeFactor);
+ backend = createBackend();
+ backend.init(this, path, config);
+ String markerFileName = getMarkerFile();
+ if (markerFileName != null) {
+ // create marker file in homeDir to avoid deletion in cache cleanup.
+ File markerFile = new File(homeDir, markerFileName);
+ if (!markerFile.exists()) {
+ LOG.info("load files from local cache");
+ loadFilesFromCache();
+ try {
+ markerFile.createNewFile();
+ } catch (IOException e) {
+ throw new DataStoreException("Could not create marker file " + markerFile.getAbsolutePath(), e);
+ }
+ } else {
+ LOG.info("marker file = " + markerFile.getAbsolutePath() + " exists");
+ }
+ }
+ cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize, cachePurgeTrigFactor, cachePurgeResizeFactor);
+ }
+
+ /**
+ * @see DataStore#addRecord(InputStream)
+ */
+ public DataRecord addRecord(InputStream input) throws DataStoreException {
+ File temporary = null;
+ try {
+ temporary = newTemporaryFile();
+ DataIdentifier tempId = new DataIdentifier(temporary.getName());
+ usesIdentifier(tempId);
+ // Copy the stream to the temporary file and calculate the
+ // stream length and the message digest of the stream
+ long length = 0;
+ MessageDigest digest = MessageDigest.getInstance(DIGEST);
+ OutputStream output = new DigestOutputStream(new FileOutputStream(temporary), digest);
+ try {
+ length = IOUtils.copyLarge(input, output);
+ } finally {
+ output.close();
+ }
+ DataIdentifier identifier = new DataIdentifier(encodeHexString(digest.digest()));
+ synchronized (this) {
+ usesIdentifier(identifier);
+ backend.write(identifier, temporary);
+ String fileName = getFileName(identifier);
+ cache.store(fileName, temporary);
+ }
+ // this will also make sure that
+ // tempId is not garbage collected until here
+ inUse.remove(tempId);
+ return new CachingDataRecord(this, identifier);
+ } catch (NoSuchAlgorithmException e) {
+ throw new DataStoreException(DIGEST + " not available", e);
+ } catch (IOException e) {
+ throw new DataStoreException("Could not add record", e);
+ } finally {
+ if (temporary != null) {
+ // try to delete - but it's not a big deal if we can't
+ temporary.delete();
+ }
+ }
+ }
+
+ /**
+ * @see DataStore#getRecord(DataIdentifier)
+ */
+ public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+ DataRecord record = getRecordIfStored(identifier);
+ if (record == null) {
+ throw new DataStoreException("Record not found: " + identifier);
+ }
+ return record;
+ }
+
+ /**
+ * @see DataStore#getRecordIfStored(DataIdentifier)
+ */
+ public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
+ synchronized (this) {
+ usesIdentifier(identifier);
+ if (!backend.exists(identifier)) {
+ return null;
+ }
+ backend.touch(identifier, minModifiedDate);
+ return new CachingDataRecord(this, identifier);
+ }
+ }
+
+ /**
+ * @see DataStore#updateModifiedDateOnAccess(long)
+ */
+ public void updateModifiedDateOnAccess(long before) {
+ LOG.info("minModifiedDate set to: " + before);
+ minModifiedDate = before;
+ }
+
+ /**
+ * @see DataStore#getAllIdentifiers()
+ */
+ public Iterator getAllIdentifiers() throws DataStoreException {
+ return backend.getAllIdentifiers();
+ }
+
+ /**
+ * @see MultiDataStoreAware#deleteRecord(DataIdentifier)
+ */
+ public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ String fileName = getFileName(identifier);
+ synchronized (this) {
+ backend.deleteRecord(identifier);
+ cache.delete(fileName);
+ }
+ }
+
+ /**
+ * @see DataStore#deleteAllOlderThan(long)
+ */
+ public synchronized int deleteAllOlderThan(long min) throws DataStoreException {
+ List diList = backend.deleteAllOlderThan(min);
+ // remove entries from local cache
+ for (DataIdentifier identifier : diList) {
+ cache.delete(getFileName(identifier));
+ }
+ return diList.size();
+ }
+
+ /**
+ * Return inputstream from cache if available or read from backend and store it in cache for further use.
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ InputStream getStream(DataIdentifier identifier) throws DataStoreException {
+ InputStream in = null;
+ try {
+ String fileName = getFileName(identifier);
+ InputStream cached = cache.getIfStored(fileName);
+ if (cached != null) {
+ return cached;
+ }
+ in = backend.read(identifier);
+ return cache.store(fileName, in);
+ } catch (IOException e) {
+ throw new DataStoreException("IO Exception: " + identifier, e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+
+ /**
+ * Return lastModified of record from backend assuming it as a single source of truth.
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ long getLastModified(DataIdentifier identifier) throws DataStoreException {
+ LOG.info("accessed lastModified");
+ return backend.getLastModified(identifier);
+ }
+
+ /**
+ * Return the length of record from cache if available otherwise from backend
+ *
+ * @param identifier
+ * @return
+ * @throws DataStoreException
+ */
+ long getLength(DataIdentifier identifier) throws DataStoreException {
+ String fileName = getFileName(identifier);
+ Long length = cache.getFileLength(fileName);
+ if (length != null) {
+ return length.longValue();
+ }
+ return backend.getLength(identifier);
+ }
+
+ @Override
+ protected byte[] getOrCreateReferenceKey() throws DataStoreException {
+ return secret.getBytes();
+ }
+
+ /**
+ * Returns a unique temporary file to be used for creating a new data record.
+ *
+ * @return temporary file
+ * @throws IOException
+ */
+ private File newTemporaryFile() throws IOException {
+ return File.createTempFile(TMP, null, tmpDir);
+ }
+
+ private void loadFilesFromCache() throws RepositoryException {
+ ArrayList files = new ArrayList();
+ listRecursive(files, directory);
+ long totalSize = 0;
+ for (File f : files) {
+ totalSize += f.length();
+ }
+ long currentSize = 0;
+ long time = System.currentTimeMillis();
+ for (File f : files) {
+ long now = System.currentTimeMillis();
+ if (now > time + 5000) {
+ LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ time = now;
+ }
+ currentSize += f.length();
+ String name = f.getName();
+ LOG.debug("upload file = " + name);
+ if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) && f.length() > 0) {
+ loadFileToBackEnd(f);
+ }
+ }
+ LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ }
+
+ private void listRecursive(List list, File file) {
+ File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.isDirectory()) {
+ listRecursive(list, f);
+ } else {
+ list.add(f);
+ }
+ }
+ }
+ }
+
+ private void loadFileToBackEnd(File f) throws DataStoreException {
+ DataIdentifier identifier = new DataIdentifier(f.getName());
+ usesIdentifier(identifier);
+ backend.write(identifier, f);
+ LOG.debug(f.getName() + "uploaded.");
+
+ }
+
+ private String getFileName(DataIdentifier identifier) {
+ String name = identifier.toString();
+ name = name.substring(0, 2) + "/" + name.substring(2, 4) + "/" + name.substring(4, 6) + "/" + name;
+ return name;
+ }
+
+ private void usesIdentifier(DataIdentifier identifier) {
+ inUse.put(identifier, new WeakReference(identifier));
+ }
+
+ private boolean mkdirs(File dir) throws IOException {
+ if (dir.exists()) {
+ if (dir.isFile()) {
+ throw new IOException("Can not create a directory " + "because a file exists with the same name: " + dir.getAbsolutePath());
+ }
+ return false;
+ } else {
+ boolean created = dir.mkdirs();
+ if (!created) {
+ throw new IOException("Could not create directory: " + dir.getAbsolutePath());
+ }
+ return created;
+ }
+ }
+
+ public void clearInUse() {
+ inUse.clear();
+ }
+
+ public void close() throws DataStoreException {
+ cache.close();
+ backend.close();
+ cache = null;
+ }
+
+ /**
+ * Setter for configuration based secret
+ *
+ * @param secret the secret used to sign reference binaries
+ */
+ public void setSecret(String secret) {
+ this.secret = secret;
+ }
+
+ public void setMinRecordLength(int minRecordLength) {
+ this.minRecordLength = minRecordLength;
+ }
+
+ public int getMinRecordLength() {
+ return minRecordLength;
+ }
+
+ public String getConfig() {
+ return config;
+ }
+
+ public void setConfig(String config) {
+ this.config = config;
+ }
+
+ public long getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(long cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public double getCachePurgeTrigFactor() {
+ return cachePurgeTrigFactor;
+ }
+
+ public void setCachePurgeTrigFactor(double cachePurgeTrigFactor) {
+ this.cachePurgeTrigFactor = cachePurgeTrigFactor;
+ }
+
+ public double getCachePurgeResizeFactor() {
+ return cachePurgeResizeFactor;
+ }
+
+ public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) {
+ this.cachePurgeResizeFactor = cachePurgeResizeFactor;
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
new file mode 100644
index 0000000..b4588df
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.jackrabbit.aws.ext.S3Constants;
+import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
+
+/**
+ * A data store backend that stores data on Amazon S3.
+ */
+public class S3Backend implements Backend {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3Backend.class);
+
+ private AmazonS3 s3service;
+
+ private String bucket;
+
+ private TransferManager tmx;
+
+ private CachingDataStore store;
+
+ private static final String KEY_PREFIX = "dataStore_";
+
+ /**
+ * The default value AWS bucket region.
+ */
+ private static final String DEFAULT_AWS_BUCKET_REGION = "us-standard";
+
+ /**
+ * constants to define endpoint to various AWS region
+ */
+ private static final String AWSDOTCOM = "amazonaws.com";
+
+ private static final String S3 = "s3";
+
+ private static final String DOT = ".";
+
+ private static final String DASH = "-";
+
+ public void init(CachingDataStore store, String homeDir, String config) throws DataStoreException {
+ if (config == null) {
+ config = Utils.DEFAULT_CONFIG_FILE;
+ }
+ try {
+ Properties prop = Utils.readConfig(config);
+ LOG.debug("init");
+ this.store = store;
+ s3service = Utils.openService(prop);
+ bucket = prop.getProperty(S3Constants.S3_BUCKET);
+ String region = prop.getProperty(S3Constants.S3_REGION);
+ String endpoint = null;
+ if (!s3service.doesBucketExist(bucket)) {
+
+ if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+ s3service.createBucket(bucket, Region.US_Standard);
+ endpoint = S3 + DOT + AWSDOTCOM;
+ } else if (Region.EU_Ireland.toString().equals(region)) {
+ s3service.createBucket(bucket, Region.EU_Ireland);
+ endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+ } else {
+ s3service.createBucket(bucket, region);
+ endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+ }
+ LOG.info("Created bucket: " + bucket + " in " + region);
+ } else {
+ LOG.info("Using bucket: " + bucket);
+ if (DEFAULT_AWS_BUCKET_REGION.equals(region)) {
+ endpoint = S3 + DOT + AWSDOTCOM;
+ } else if (Region.EU_Ireland.toString().equals(region)) {
+ endpoint = "s3-eu-west-1" + DOT + AWSDOTCOM;
+ } else {
+ endpoint = S3 + DASH + region + DOT + AWSDOTCOM;
+ }
+ }
+ /*
+ * setting endpoint to remove latency of redirection. If endpoint is not set, invocation first goes us standard region, which
+ * redirects it to correct location.
+ */
+ s3service.setEndpoint(endpoint);
+ LOG.info("S3 service endpoint: " + endpoint);
+ tmx = new TransferManager(s3service, createDefaultExecutorService());
+ LOG.debug(" done");
+ } catch (Exception e) {
+ LOG.debug(" error ", e);
+ throw new DataStoreException("Could not initialize S3 from " + config, e);
+ }
+ }
+
+ public void write(DataIdentifier identifier, File file) throws DataStoreException {
+ String key = getKeyName(identifier);
+ ObjectMetadata objectMetaData = null;
+ long start = System.currentTimeMillis();
+ LOG.debug("write {0} length {1}", identifier, file.length());
+ try {
+ // check if the same record already exists
+ try {
+ objectMetaData = s3service.getObjectMetadata(bucket, key);
+ } catch (AmazonServiceException ase) {
+ if (ase.getStatusCode() != 404) {
+ throw ase;
+ }
+ }
+ if (objectMetaData != null) {
+ long l = objectMetaData.getContentLength();
+ if (l != file.length()) {
+ throw new DataStoreException("Collision: " + key + " new length: " + file.length() + " old length: " + l);
+ }
+ LOG.debug(key + " exists");
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, bucket, key);
+ copReq.setNewObjectMetadata(objectMetaData);
+ s3service.copyObject(copReq);
+ LOG.debug("lastModified of " + identifier.toString() + " updated successfully");
+ LOG.debug(" updated");
+ }
+ } catch (AmazonServiceException e) {
+ LOG.debug(" does not exist", e);
+ // not found - create it
+ }
+ if (objectMetaData == null) {
+ LOG.debug(" creating");
+ try {
+ // start multipart parallel upload using amazon sdk
+ Upload up = tmx.upload(new PutObjectRequest(bucket, key, file));
+ // wait for upload to finish
+ UploadResult uploadResult = up.waitForUploadResult();
+ LOG.debug(" done");
+ } catch (Exception e2) {
+ LOG.debug(" could not upload", e2);
+ throw new DataStoreException("Could not upload " + key, e2);
+ }
+ }
+ LOG.debug(" ms: {0}", System.currentTimeMillis() - start);
+
+ }
+
+ private String getKeyName(DataIdentifier identifier) {
+ return KEY_PREFIX + identifier.toString();
+ }
+
+ private String getIdentifierName(String key) {
+ if (!key.startsWith(KEY_PREFIX)) {
+ return null;
+ }
+ return key.substring(KEY_PREFIX.length());
+ }
+
+ public boolean exists(DataIdentifier identifier) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ LOG.debug("exists {0}", identifier);
+ ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket, key);
+ if (objectMetaData != null) {
+ LOG.debug(" true");
+ return true;
+ }
+ return false;
+ } catch (AmazonServiceException e) {
+ if (e.getStatusCode() == 404) {
+ LOG.info("key [" + identifier.toString() + "] not found.");
+ return false;
+ }
+ throw new DataStoreException("Error occured to getObjectMetadata for key [" + identifier.toString() + "]", e);
+ }
+ }
+
+ public void touch(DataIdentifier identifier, long minModifiedDate) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ if (minModifiedDate != 0) {
+ ObjectMetadata objectMetaData = s3service.getObjectMetadata(bucket, key);
+ if (objectMetaData.getLastModified().getTime() < minModifiedDate) {
+ CopyObjectRequest copReq = new CopyObjectRequest(bucket, key, bucket, key);
+ copReq.setNewObjectMetadata(objectMetaData);
+ s3service.copyObject(copReq);
+ LOG.debug("lastModified of " + identifier.toString() + " updated successfully");
+ }
+ }
+ } catch (Exception e) {
+ throw new DataStoreException("An Exception occurred while trying to set the last modified date of record "
+ + identifier.toString(), e);
+ }
+ }
+
+ public InputStream read(DataIdentifier identifier) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ LOG.debug("read {" + identifier + "}");
+ S3Object object = s3service.getObject(bucket, key);
+ InputStream in = object.getObjectContent();
+ LOG.debug(" return");
+ return in;
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Object not found: " + key, e);
+ }
+ }
+
+ public Iterator getAllIdentifiers() throws DataStoreException {
+ try {
+ LOG.debug("getAllIdentifiers");
+ ArrayList ids = new ArrayList();
+ ObjectListing prevObjectListing = s3service.listObjects(bucket, KEY_PREFIX);
+ while (true) {
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ String id = getIdentifierName(s3ObjSumm.getKey());
+ if (id != null) {
+ ids.add(new DataIdentifier(id));
+ }
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ LOG.debug(" return");
+ return ids.iterator();
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Could not list objects", e);
+ }
+ }
+
+ public void close() {
+ s3service = null;
+ }
+
+ public long getLastModified(DataIdentifier identifier) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+ return object.getLastModified().getTime();
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Could not getLastModified of dataIdentifier " + identifier, e);
+ }
+ }
+
+ public long getLength(DataIdentifier identifier) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ ObjectMetadata object = s3service.getObjectMetadata(bucket, key);
+ return object.getContentLength();
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Could not length of dataIdentifier " + identifier, e);
+ }
+ }
+
+ public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ String key = getKeyName(identifier);
+ try {
+ s3service.deleteObject(bucket, key);
+ } catch (AmazonServiceException e) {
+ throw new DataStoreException("Could not getLastModified of dataIdentifier " + identifier, e);
+ }
+ }
+
+ public List deleteAllOlderThan(long min) throws DataStoreException {
+ LOG.info("deleteAllOlderThan " + new Date(min));
+ List diDeleteList = new ArrayList(30);
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ while (true) {
+ List deleteList = new ArrayList();
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ DataIdentifier identifier = new DataIdentifier(getIdentifierName(s3ObjSumm.getKey()));
+ if (!store.inUse.containsKey(identifier) && s3ObjSumm.getLastModified().getTime() < min) {
+ LOG.info("add id :" + s3ObjSumm.getKey() + " to delete lists");
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ diDeleteList.add(new DataIdentifier(getIdentifierName(s3ObjSumm.getKey())));
+ }
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+ delObjsReq.setKeys(deleteList);
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ if (dobjs.getDeletedObjects().size() != deleteList.size()) {
+ throw new DataStoreException("Incomplete delete object request. only " + dobjs.getDeletedObjects().size() + " out of "
+ + deleteList.size() + " are deleted");
+ } else {
+ LOG.info(deleteList.size() + " records deleted from datastore");
+ }
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ LOG.info("deleteAllOlderThan exit");
+ return diDeleteList;
+ }
+
+ /**
+ * Returns a new thread pool configured with the default settings.
+ *
+ * @return A new thread pool configured with the default settings.
+ */
+ private ThreadPoolExecutor createDefaultExecutorService() {
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private int threadCount = 1;
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(getClass().getClassLoader());
+ thread.setName("s3-transfer-manager-worker-" + threadCount++);
+ return thread;
+ }
+ };
+ return (ThreadPoolExecutor) Executors.newFixedThreadPool(10, threadFactory);
+ }
+}
diff --git a/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java
new file mode 100644
index 0000000..a13a8aa
--- /dev/null
+++ b/jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3DataStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+/**
+ * An Amazon S3 data store.
+ */
+public class S3DataStore extends CachingDataStore {
+
+ protected Backend createBackend() {
+ return new S3Backend();
+ }
+
+ protected String getMarkerFile() {
+ return "s3.init.done";
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
new file mode 100644
index 0000000..ecdca0e
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestAll.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext;
+
+import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDs;
+import org.apache.jackrabbit.aws.ext.ds.TestInMemDsCacheOff;
+import org.apache.jackrabbit.aws.ext.ds.TestS3Ds;
+import org.apache.jackrabbit.aws.ext.ds.TestS3DsCacheOff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Test suite that includes all test cases for the this module.
+ */
+public class TestAll extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestAll.class);
+ /**
+ * @return a Test suite that executes all tests inside this package, except the multi-threading related ones.
+ */
+ public static Test suite() {
+ TestSuite suite = new TestSuite("S3 tests");
+ suite.addTestSuite(TestLocalCache.class);
+ suite.addTestSuite(TestInMemDs.class);
+ suite.addTestSuite(TestInMemDsCacheOff.class);
+ String config = System.getProperty(TestCaseBase.CONFIG);
+ LOG.info("config= " + config);
+ if (config != null && !"".equals(config.trim())) {
+ suite.addTestSuite(TestS3Ds.class);
+ suite.addTestSuite(TestS3DsCacheOff.class);
+ }
+ return suite;
+ }
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java
new file mode 100644
index 0000000..9022de5
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/TestLocalCache.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.jackrabbit.aws.ext.LocalCache;
+import org.apache.jackrabbit.aws.ext.ds.TestCaseBase;
+import org.apache.jackrabbit.core.fs.local.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testcase to test local cache.
+ */
+public class TestLocalCache extends TestCaseBase {
+
+ private static final String CACHE_DIR = "target/cache";
+
+ private static final String TEMP_DIR = "target/temp";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestLocalCache.class);
+
+ protected void setUp() {
+ try {
+ File cachedir = new File(CACHE_DIR);
+ if (cachedir.exists()) FileUtil.delete(cachedir);
+ cachedir.mkdirs();
+
+ File tempdir = new File(TEMP_DIR);
+ if (tempdir.exists()) FileUtil.delete(tempdir);
+ tempdir.mkdirs();
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail();
+ }
+ }
+
+ protected void tearDown() throws IOException {
+ File cachedir = new File(CACHE_DIR);
+ if (cachedir.exists()) FileUtil.delete(cachedir);
+
+ File tempdir = new File(TEMP_DIR);
+ if (tempdir.exists()) FileUtil.delete(tempdir);
+ }
+
+ /**
+ * Test to validate store retrieve in cache.
+ */
+ public void testStoreRetrieve() {
+ try {
+ LocalCache cache = new LocalCache(CACHE_DIR, TEMP_DIR, 400, 0.95, 0.70);
+ Random random = new Random(12345);
+ byte[] data = new byte[100];
+ Map byteMap = new HashMap();
+ random.nextBytes(data);
+ byteMap.put("a1", data);
+
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a2", data);
+
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a3", data);
+
+ cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
+ cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
+ cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), cache.getIfStored("a1"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), cache.getIfStored("a2"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), cache.getIfStored("a3"));
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail();
+ }
+
+ }
+
+ /**
+ * Test to verify cache's purging if cache current size exceeds cachePurgeTrigFactor * size.
+ */
+ public void testAutoPurge() {
+ try {
+
+ LocalCache cache = new LocalCache(CACHE_DIR, TEMP_DIR, 400, 0.95, 0.70);
+ Random random = new Random(12345);
+ byte[] data = new byte[100];
+ Map byteMap = new HashMap();
+ random.nextBytes(data);
+ byteMap.put("a1", data);
+
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a2", data);
+
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a3", data);
+
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a4", data);
+
+ cache.store("a1", new ByteArrayInputStream(byteMap.get("a1")));
+ cache.store("a2", new ByteArrayInputStream(byteMap.get("a2")));
+ cache.store("a3", new ByteArrayInputStream(byteMap.get("a3")));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a1")), cache.getIfStored("a1"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a2")), cache.getIfStored("a2"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), cache.getIfStored("a3"));
+
+ data = new byte[90];
+ random.nextBytes(data);
+ byteMap.put("a4", data);
+ // storing a4 should purge cache
+ cache.store("a4", new ByteArrayInputStream(byteMap.get("a4")));
+ Thread.sleep(1000);
+
+ assertNull("a1 should be null", cache.getIfStored("a1"));
+ assertNull("a2 should be null", cache.getIfStored("a2"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), cache.getIfStored("a3"));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a4")), cache.getIfStored("a4"));
+ data = new byte[100];
+ random.nextBytes(data);
+ byteMap.put("a5", data);
+ cache.store("a5", new ByteArrayInputStream(byteMap.get("a5")));
+ assertEquals(new ByteArrayInputStream(byteMap.get("a3")), cache.getIfStored("a3"));
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail();
+ }
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java
new file mode 100644
index 0000000..f689ae3
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryBackend.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.aws.ext.ds.Backend;
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+/**
+ * An in-memory backend used to speed up testing the implementation.
+ */
+public class InMemoryBackend implements Backend {
+
+ private HashMap data = new HashMap();
+
+ private HashMap timeMap = new HashMap();
+
+ public void init(CachingDataStore store, String homeDir, String config) throws DataStoreException {
+ // ignore
+ log("init");
+ }
+
+ public void close() {
+ // ignore
+ log("close");
+ }
+
+ public boolean exists(DataIdentifier identifier) {
+ log("exists " + identifier);
+ return data.containsKey(identifier);
+ }
+
+ public Iterator getAllIdentifiers() throws DataStoreException {
+ log("getAllIdentifiers");
+ return data.keySet().iterator();
+ }
+
+ public InputStream read(DataIdentifier identifier) throws DataStoreException {
+ log("read " + identifier);
+ return new ByteArrayInputStream(data.get(identifier));
+ }
+
+ public void write(DataIdentifier identifier, File file) throws DataStoreException {
+ log("write " + identifier + " " + file.length());
+ byte[] buffer = new byte[(int) file.length()];
+ try {
+ DataInputStream din = new DataInputStream(new FileInputStream(file));
+ din.readFully(buffer);
+ din.close();
+ data.put(identifier, buffer);
+ timeMap.put(identifier, System.currentTimeMillis());
+ } catch (IOException e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ private void log(String message) {
+ // System.out.println(message);
+ }
+
+ public long getLastModified(DataIdentifier identifier) throws DataStoreException {
+ // TODO Auto-generated method stub
+ log("getLastModified " + identifier);
+ return timeMap.get(identifier);
+ }
+
+ public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ timeMap.remove(identifier);
+ data.remove(identifier);
+ }
+
+ public List deleteAllOlderThan(long min) {
+ // TODO Auto-generated method stub
+ log("deleteAllOlderThan " + min);
+ List tobeDeleted = new ArrayList();
+ for (Map.Entry entry : timeMap.entrySet()) {
+ DataIdentifier identifier = entry.getKey();
+ long timestamp = entry.getValue();
+ if (timestamp < min) {
+ tobeDeleted.add(identifier);
+ }
+ }
+ for (DataIdentifier identifier : tobeDeleted) {
+ timeMap.remove(identifier);
+ data.remove(identifier);
+ }
+ return tobeDeleted;
+ }
+
+ public long getLength(DataIdentifier identifier) throws DataStoreException {
+ try {
+ return data.get(identifier).length;
+ } catch (Exception e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ public void touch(DataIdentifier identifier, long minModifiedDate) throws DataStoreException {
+ if (minModifiedDate > 0 && data.containsKey(identifier)) {
+ timeMap.put(identifier, System.currentTimeMillis());
+ }
+ }
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java
new file mode 100644
index 0000000..1df5e5d
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/InMemoryDataStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.aws.ext.ds.Backend;
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+
+/**
+ * A caching data store that uses the in-memory backend.
+ */
+public class InMemoryDataStore extends CachingDataStore {
+
+ Backend createBackend() {
+ return new InMemoryBackend();
+ }
+
+ String getMarkerFile() {
+ return "mem.init.done";
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java
new file mode 100644
index 0000000..0cd136a
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestCaseBase.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.aws.ext.LocalCache;
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+import org.apache.jackrabbit.aws.ext.ds.S3Backend;
+import org.apache.jackrabbit.aws.ext.ds.S3DataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.core.data.RandomInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test base class which covers all scenarios.
+ */
+public class TestCaseBase extends TestCase {
+
+ /**
+ * temp directory
+ */
+ private static final String TEST_DIR = "target/temp";
+
+ /**
+ * Constant describing aws properties file path.
+ */
+ public static String CONFIG = "config";
+
+ /**
+ * File path of aws properties.
+ */
+ protected String config = null;
+
+ /**
+ * Parameter to use in-memory backend. If false {@link S3Backend}
+ */
+ protected boolean memoryBackend = true;
+
+ /**
+ * Parameter to use local cache. If true local cache {@link LocalCache} is not used.
+ */
+ protected boolean noCache = false;
+
+ /**
+ * Logger
+ */
+ protected static final Logger LOG = LoggerFactory.getLogger(TestCaseBase.class);
+
+ /**
+ * Delete temporary directory.
+ */
+ protected void setUp() {
+ FileUtils.deleteQuietly(new File(TEST_DIR));
+ }
+
+ /**
+ * Delete temporary directory.
+ */
+ protected void tearDown() throws IOException {
+ FileUtils.deleteQuietly(new File(TEST_DIR));
+ }
+
+ /**
+ * Testcase to validate {@link DataStore#addRecord(InputStream)} API.
+ */
+ public void testAddRecord() {
+ try {
+ doAddRecordTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcase to validate {@link DataStore#getRecord(DataIdentifier)} API.
+ */
+ public void testGetRecord() {
+ try {
+ doGetRecordTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ }
+ }
+
+ /**
+ * Testcase to validate {@link DataStore#getAllIdentifiers()} API.
+ */
+ public void testGetAllIdentifiers() {
+ try {
+ doGetAllIdentifiersTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcase to validate {@link DataStore#updateModifiedDateOnAccess(long)} API.
+ */
+ public void testUpdateLastModifiedOnAccess() {
+ try {
+ doUpdateLastModifiedOnAccessTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ }
+ }
+
+ /**
+ * Testcase to validate {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.API.
+ */
+ public void testDeleteRecord() {
+ try {
+ doDeleteRecordTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcase to validate {@link DataStore#deleteAllOlderThan(long)} API.
+ */
+ public void testDeleteAllOlderThan() {
+ try {
+ doDeleteAllOlderThan(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcae to validate {@link DataStore#getRecordFromReference(String)}
+ */
+ public void testReference() {
+ try {
+ doReferenceTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcase to validate mixed scenario use of {@link DataStore}.
+ */
+ public void test() {
+ try {
+ doTest(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Testcase to validate mixed scenario use of {@link DataStore} in multi-threaded concurrent environment.
+ */
+ public void testMultiThreaded() {
+ try {
+ doTestMultiThreaded(memoryBackend, config, noCache);
+ } catch (Exception e) {
+ LOG.error("error:", e);
+ fail(e.getMessage());
+ }
+
+ }
+
+ /**
+ * Test {@link DataStore#addRecord(InputStream)} and assert length of added record.
+ */
+ protected void doAddRecordTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ byte[] data = new byte[12345];
+ new Random(12345).nextBytes(data);
+ DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
+ assertEquals(data.length, rec.getLength());
+ assertRecord(data, rec);
+ }
+
+ /**
+ * Test {@link DataStore#getRecord(DataIdentifier)} and assert length and inputstream.
+ */
+ protected void doGetRecordTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ byte[] data = new byte[12345];
+ new Random(12345).nextBytes(data);
+ DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
+ rec = ds.getRecord(rec.getIdentifier());
+ assertEquals(data.length, rec.getLength());
+ assertRecord(data, rec);
+ }
+
+ /**
+ * Test {@link MultiDataStoreAware#deleteRecord(DataIdentifier)}.
+ */
+ protected void doDeleteRecordTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ Random random = new Random(12345);
+ byte[] data1 = new byte[12345];
+ random.nextBytes(data1);
+ DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data1));
+
+ byte[] data2 = new byte[12345];
+ random.nextBytes(data2);
+ DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data2));
+
+ byte[] data3 = new byte[12345];
+ random.nextBytes(data3);
+ DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data3));
+
+ ds.deleteRecord(rec2.getIdentifier());
+
+ assertNull("rec2 should be null", ds.getRecordIfStored(rec2.getIdentifier()));
+ assertEquals(new ByteArrayInputStream(data1), ds.getRecord(rec1.getIdentifier()).getStream());
+ assertEquals(new ByteArrayInputStream(data3), ds.getRecord(rec3.getIdentifier()).getStream());
+ }
+
+ /**
+ * Test {@link DataStore#getAllIdentifiers()} and asserts all identifiers are returned.
+ */
+ protected void doGetAllIdentifiersTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ List list = new ArrayList();
+ Random random = new Random(12345);
+ byte[] data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
+ list.add(rec.getIdentifier());
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ rec = ds.addRecord(new ByteArrayInputStream(data));
+ list.add(rec.getIdentifier());
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ rec = ds.addRecord(new ByteArrayInputStream(data));
+ list.add(rec.getIdentifier());
+
+ Iterator itr = ds.getAllIdentifiers();
+ while (itr.hasNext()) {
+ assertTrue("record found on list", list.remove(itr.next()));
+ }
+ assertEquals(0, list.size());
+ }
+
+ /**
+ * Asserts that timestamp of all records accessed after {@link DataStore#updateModifiedDateOnAccess(long)} invocation.
+ */
+ protected void doUpdateLastModifiedOnAccessTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ Random random = new Random(12345);
+ byte[] data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
+
+ long updateTime = System.currentTimeMillis();
+ ds.updateModifiedDateOnAccess(updateTime);
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
+
+ rec1 = ds.getRecord(rec1.getIdentifier());
+
+ assertEquals("rec1 touched", true, ds.getLastModified(rec1.getIdentifier()) > updateTime);
+ assertEquals("rec2 not touched", true, ds.getLastModified(rec2.getIdentifier()) < updateTime);
+ assertEquals("rec3 touched", true, ds.getLastModified(rec3.getIdentifier()) > updateTime);
+ assertEquals("rec4 touched", true, ds.getLastModified(rec4.getIdentifier()) > updateTime);
+
+ }
+
+ /**
+ * Asserts that {@link DataStore#deleteAllOlderThan(long)} only deleted records older than argument passed.
+ */
+ protected void doDeleteAllOlderThan(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ Random random = new Random(12345);
+ byte[] data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec1 = ds.addRecord(new ByteArrayInputStream(data));
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(data));
+
+ long updateTime = System.currentTimeMillis();
+ ds.updateModifiedDateOnAccess(updateTime);
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec3 = ds.addRecord(new ByteArrayInputStream(data));
+
+ data = new byte[12345];
+ random.nextBytes(data);
+ DataRecord rec4 = ds.addRecord(new ByteArrayInputStream(data));
+
+ rec1 = ds.getRecord(rec1.getIdentifier());
+ assertEquals("only rec2 should be deleted", 1, ds.deleteAllOlderThan(updateTime));
+ assertNull("rec2 should be null", ds.getRecordIfStored(rec2.getIdentifier()));
+
+ Iterator itr = ds.getAllIdentifiers();
+ List list = new ArrayList();
+ list.add(rec1.getIdentifier());
+ list.add(rec3.getIdentifier());
+ list.add(rec4.getIdentifier());
+ while (itr.hasNext()) {
+ assertTrue("record found on list", list.remove(itr.next()));
+ }
+
+ assertEquals("touched records found", 0, list.size());
+ assertEquals("rec1 touched", true, ds.getLastModified(rec1.getIdentifier()) > updateTime);
+ assertEquals("rec3 touched", true, ds.getLastModified(rec3.getIdentifier()) > updateTime);
+ assertEquals("rec4 touched", true, ds.getLastModified(rec4.getIdentifier()) > updateTime);
+
+ }
+
+ /**
+ * Test if record can be accessed via {@link DataStore#getRecordFromReference(String)}
+ */
+ public void doReferenceTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ ds.setSecret("12345");
+ byte[] data = new byte[12345];
+ new Random(12345).nextBytes(data);
+ String reference;
+ DataRecord record = ds.addRecord(new ByteArrayInputStream(data));
+ reference = record.getReference();
+ assertReference(data, reference, ds);
+ }
+
+ /**
+ * Method to validate mixed scenario use of {@link DataStore}.
+ */
+ protected void doTest(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ int length = 1000;
+ DataRecord rec = ds.addRecord(new ByteArrayInputStream(new byte[length]));
+ long mod = rec.getLastModified();
+ assertEquals(length, rec.getLength());
+ // ensure the timestamp is different
+ Thread.sleep(50);
+ DataRecord rec2 = ds.addRecord(new ByteArrayInputStream(new byte[length]));
+ long mod2 = rec2.getLastModified();
+ assertTrue(mod2 > mod);
+ String recId = rec.getIdentifier().toString();
+ LOG.debug("recId:" + recId);
+ String rec2Id = rec2.getIdentifier().toString();
+ assertEquals(recId, rec2Id);
+ DataRecord rec3 = ds.getRecord(rec.getIdentifier());
+ byte[] data = IOUtils.toByteArray(rec3.getStream());
+ assertEquals(length, data.length);
+
+ Iterator it = ds.getAllIdentifiers();
+ boolean found = false;
+ while (it.hasNext()) {
+ DataIdentifier id = it.next();
+ LOG.debug(" id:" + id.toString());
+ if (id.toString().equals(recId)) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ ds.close();
+ }
+
+ /**
+ * Method to validate mixed scenario use of {@link DataStore} in multi-threaded concurrent environment.
+ */
+ protected void doTestMultiThreaded(boolean memoryBackend, String config, boolean noCache) throws Exception {
+ CachingDataStore ds = memoryBackend ? new InMemoryDataStore() : new S3DataStore();
+ ds.setConfig(config);
+ if (noCache) ds.setCacheSize(0);
+ ds.init(TEST_DIR);
+ doTestMultiThreaded(ds, 4);
+ }
+
+ /**
+ * Method to assert record with byte array.
+ */
+ protected void assertRecord(byte[] expected, DataRecord record) throws DataStoreException, IOException {
+ InputStream stream = record.getStream();
+ try {
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i] & 0xff, stream.read());
+ }
+ assertEquals(-1, stream.read());
+ } finally {
+ stream.close();
+ }
+ }
+
+ /**
+ * Method to run {@link TestCaseBase#doTest(DataStore, int)} in multiple concurrent threads.
+ */
+ protected void doTestMultiThreaded(final DataStore ds, int threadCount) throws Exception {
+ final Exception[] exception = new Exception[1];
+ Thread[] threads = new Thread[threadCount];
+ for (int i = 0; i < threadCount; i++) {
+ final int x = i;
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ doTest(ds, x);
+ } catch (Exception e) {
+ exception[0] = e;
+ }
+ }
+ };
+ threads[i] = t;
+ t.start();
+ }
+ for (int i = 0; i < threadCount; i++) {
+ threads[i].join();
+ }
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ }
+
+ /**
+ * Assert randomly read stream from record.
+ */
+ void doTest(DataStore ds, int offset) throws Exception {
+ ArrayList list = new ArrayList();
+ HashMap map = new HashMap();
+ for (int i = 0; i < 100; i++) {
+ int size = 100 + i * 10;
+ RandomInputStream in = new RandomInputStream(size + offset, size);
+ DataRecord rec = ds.addRecord(in);
+ list.add(rec);
+ map.put(rec, new Integer(size));
+ }
+ Random random = new Random(1);
+ for (int i = 0; i < list.size(); i++) {
+ int pos = random.nextInt(list.size());
+ DataRecord rec = list.get(pos);
+ int size = map.get(rec);
+ rec = ds.getRecord(rec.getIdentifier());
+ assertEquals(size, rec.getLength());
+ InputStream in = rec.getStream();
+ RandomInputStream expected = new RandomInputStream(size + offset, size);
+ if (random.nextBoolean()) {
+ in = readInputStreamRandomly(in, random);
+ }
+ assertEquals(expected, in);
+ in.close();
+ }
+ }
+
+ InputStream readInputStreamRandomly(InputStream in, Random random) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buffer = new byte[8000];
+ while (true) {
+ if (random.nextBoolean()) {
+ int x = in.read();
+ if (x < 0) {
+ break;
+ }
+ out.write(x);
+ } else {
+ if (random.nextBoolean()) {
+ int l = in.read(buffer);
+ if (l < 0) {
+ break;
+ }
+ out.write(buffer, 0, l);
+ } else {
+ int offset = random.nextInt(buffer.length / 2);
+ int len = random.nextInt(buffer.length / 2);
+ int l = in.read(buffer, offset, len);
+ if (l < 0) {
+ break;
+ }
+ out.write(buffer, offset, l);
+ }
+ }
+ }
+ in.close();
+ return new ByteArrayInputStream(out.toByteArray());
+ }
+
+ /**
+ * Assert two inputstream
+ */
+ protected void assertEquals(InputStream a, InputStream b) throws IOException {
+ while (true) {
+ int ai = a.read();
+ int bi = b.read();
+ assertEquals(ai, bi);
+ if (ai < 0) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Assert inputstream read from reference.
+ */
+ protected void assertReference(byte[] expected, String reference, DataStore store) throws Exception {
+ DataRecord record = store.getRecordFromReference(reference);
+ assertNotNull(record);
+ assertEquals(expected.length, record.getLength());
+
+ InputStream stream = record.getStream();
+ try {
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i] & 0xff, stream.read());
+ }
+ assertEquals(-1, stream.read());
+ } finally {
+ stream.close();
+ }
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java
new file mode 100644
index 0000000..82b1bcd
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDs.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test {@link CachingDataStore} with InMemoryBackend and local cache on.
+ */
+public class TestInMemDs extends TestCaseBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestInMemDs.class);
+
+ /**
+ * @inheritDoc
+ */
+ protected String config = null;
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean memoryBackend = true;
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean noCache = false;
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java
new file mode 100644
index 0000000..ec2625b
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestInMemDsCacheOff.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test {@link CachingDataStore} with InMemoryBackend and local cache off.
+ */
+public class TestInMemDsCacheOff extends TestCaseBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestInMemDsCacheOff.class);
+
+ /**
+ * @inheritDoc
+ */
+ protected String config = null;
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean memoryBackend = true;
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean noCache = true;
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
new file mode 100644
index 0000000..c691081
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3Ds.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.jackrabbit.aws.ext.S3Constants;
+import org.apache.jackrabbit.aws.ext.Utils;
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+/**
+ * Test {@link CachingDataStore} with S3Backend and local cache on. It requires to pass aws config file via system property. For e.g.
+ * -Dconfig=/opt/cq/aws.properties. Sample aws properties located at src/test/resources/aws.properties
+ */
+public class TestS3Ds extends TestCaseBase {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(TestS3Ds.class);
+
+ /**
+ * @inheritDoc
+ */
+ protected String config = System.getProperty(CONFIG);
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean memoryBackend = false;
+
+ /**
+ * @inheritDoc
+ */
+ protected boolean noCache = false;
+
+ protected void setUp() {
+ super.setUp();
+ }
+
+ protected void tearDown() throws IOException {
+ super.tearDown();
+ deleteBucket();
+ }
+
+ /**
+ * Cleaning of bucket after test run.
+ */
+ public void deleteBucket() throws IOException {
+
+ String config = System.getProperty(CONFIG);
+ Properties props = Utils.readConfig(config);
+ AmazonS3Client s3service = Utils.openService(props);
+ String bucket = props.getProperty(S3Constants.S3_BUCKET);
+ if (s3service.doesBucketExist(bucket)) {
+ ObjectListing prevObjectListing = s3service.listObjects(bucket);
+ while (true) {
+ List deleteList = new ArrayList();
+ for (S3ObjectSummary s3ObjSumm : prevObjectListing.getObjectSummaries()) {
+ LOG.info("add id :" + s3ObjSumm.getKey() + " to delete lists");
+ deleteList.add(new DeleteObjectsRequest.KeyVersion(s3ObjSumm.getKey()));
+ }
+ if (deleteList.size() > 0) {
+ DeleteObjectsRequest delObjsReq = new DeleteObjectsRequest(bucket);
+ delObjsReq.setKeys(deleteList);
+ DeleteObjectsResult dobjs = s3service.deleteObjects(delObjsReq);
+ }
+ if (!prevObjectListing.isTruncated()) break;
+ prevObjectListing = s3service.listNextBatchOfObjects(prevObjectListing);
+ }
+ s3service.deleteBucket(bucket);
+ }
+ s3service.shutdown();
+
+ }
+
+}
diff --git a/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
new file mode 100644
index 0000000..8aba1a7
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/java/org/apache/jackrabbit/aws/ext/ds/TestS3DsCacheOff.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.aws.ext.ds;
+
+import org.apache.jackrabbit.aws.ext.ds.CachingDataStore;
+
+/**
+ * Test {@link CachingDataStore} with S3Backend and local cache Off. It requires to pass aws config file via system property. For e.g.
+ * -Dconfig=/opt/cq/aws.properties. Sample aws properties located at src/test/resources/aws.properties
+ */
+public class TestS3DsCacheOff extends TestS3Ds {
+ /**
+ * @inheritDoc
+ */
+ protected boolean noCache = true;
+}
diff --git a/jackrabbit-aws-ext/src/test/resources/aws.properties b/jackrabbit-aws-ext/src/test/resources/aws.properties
new file mode 100644
index 0000000..6bc1b2d
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/resources/aws.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# AWS account ID
+accessKey=AKIAIFUNO4C75CVKYGUQ
+# AWS secret key
+secretKey=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
+# AWS bucket name
+s3Bucket=s1212121
+# AWS bucket region
+# Mapping of S3 regions to their constants
+# US Standard us-standard
+# US West us-west-2
+# US West (Northern California) us-west-1
+# EU (Ireland) EU
+# Asia Pacific (Singapore) ap-southeast-1
+# Asia Pacific (Sydney) ap-southeast-2
+# Asia Pacific (Tokyo) ap-northeast-1
+# South America (Sao Paulo) sa-east-1
+s3Region=
+connectionTimeout=120000
+socketTimeout=120000
+maxConnections=20
+maxErrorRetry=10
diff --git a/jackrabbit-aws-ext/src/test/resources/log4j.properties b/jackrabbit-aws-ext/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8645035
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# this is the log4j configuration for the JCR API tests
+log4j.rootLogger=INFO, file
+
+#log4j.logger.org.apache.jackrabbit.test=DEBUG
+
+# 'file' is set to be a FileAppender.
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.File=target/debug.log
+
+# 'file' uses PatternLayout.
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{dd.MM.yyyy HH:mm:ss} *%-5p* [%t] %c{1}: %m (%F, line %L)\n
diff --git a/jackrabbit-aws-ext/src/test/resources/repository_sample.xml b/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
new file mode 100644
index 0000000..5dc72d3
--- /dev/null
+++ b/jackrabbit-aws-ext/src/test/resources/repository_sample.xml
@@ -0,0 +1,181 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 0f553d5..956bc15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
jackrabbit-spi2dav
jackrabbit-jcr2dav
jackrabbit-jcr-client
+ jackrabbit-aws-ext
jackrabbit-bundle
jackrabbit-standalone