Index: jackrabbit-aws-ext/pom.xml
===================================================================
--- jackrabbit-aws-ext/pom.xml (revision 1574775)
+++ jackrabbit-aws-ext/pom.xml (working copy)
@@ -45,7 +45,7 @@
com.amazonaws
aws-java-sdk
- 1.5.6
+ 1.7.2
org.apache.jackrabbit
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1574775)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy)
@@ -36,6 +36,7 @@
import org.apache.jackrabbit.core.data.CachingDataStore;
import org.apache.jackrabbit.core.data.DataIdentifier;
import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,7 +138,16 @@
*/
s3service.setEndpoint(endpoint);
LOG.info("S3 service endpoint: " + endpoint);
- tmx = new TransferManager(s3service, createDefaultExecutorService());
+ int writeThreads = 10;
+ String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS);
+ if (writeThreadsStr != null) {
+ writeThreads = Integer.parseInt(writeThreadsStr);
+ }
+ LOG.info("Using thread pool of [" + writeThreads
+ + "] threads in S3 transfer manager");
+ tmx = new TransferManager(s3service,
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads,
+ new NamedThreadFactory("s3-transfer-manager-worker")));
LOG.debug(" done");
} catch (Exception e) {
LOG.debug(" error ", e);
Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java
===================================================================
--- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1574775)
+++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy)
@@ -41,6 +41,11 @@
* Amazon aws S3 region.
*/
public static final String S3_REGION = "s3Region";
+
+ /**
+ * Constant to specify S3 write workers.
+ */
+ public static final String S3_WRITE_THREADS = "writeThreads";
/**
* private constructor so that class cannot initialized from outside.
Index: jackrabbit-aws-ext/src/test/resources/aws.properties
===================================================================
--- jackrabbit-aws-ext/src/test/resources/aws.properties (revision 1574775)
+++ jackrabbit-aws-ext/src/test/resources/aws.properties (working copy)
@@ -36,3 +36,5 @@
socketTimeout=120000
maxConnections=10
maxErrorRetry=10
+# maximum concrruent threads to write to S3.
+writeThreads=10
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (revision 1574775)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (working copy)
@@ -33,14 +33,20 @@
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.RepositoryException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A caching data store that consists of {@link LocalCache} and {@link Backend}.
* {@link Backend} is single source of truth. All methods first try to fetch
@@ -58,9 +64,10 @@
* <param name="{@link #setConfig(String) config}" value="${rep.home}/backend.properties"/>
* <param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/>
* <param name="{@link #setSecret(String) secret}" value="123456"/>
- * <param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/>
- * <param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/>
+ * <param name="{@link #setCachePurgeTrigFactor(double) cachePurgeTrigFactor}" value="0.95d"/>
+ * <param name="{@link #setCachePurgeResizeFactor(double) cachePurgeResizeFactor}" value="0.85d"/>
* <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ * <param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/>
* </DataStore>
*/
public abstract class CachingDataStore extends AbstractDataStore implements
@@ -132,6 +139,14 @@
private double cachePurgeResizeFactor = 0.85d;
/**
+ * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and
+ * if it doesn't exists migrates all files from fileystem to {@link Backend}
+ * . This parameter governs number of threads which will upload files
+ * concurrently to {@link Backend}.
+ */
+ private int concurrentUploadsThreads = 10;
+
+ /**
* The number of bytes in the cache. The default value is 64 GB.
*/
private long cacheSize = 64L * 1024 * 1024 * 1024;
@@ -184,7 +199,7 @@
File markerFile = new File(homeDir, markerFileName);
if (!markerFile.exists()) {
LOG.info("load files from local cache");
- loadFilesFromCache();
+ uploadFilesFromCache();
try {
markerFile.createNewFile();
} catch (IOException e) {
@@ -379,35 +394,48 @@
return File.createTempFile(TMP, null, tmpDir);
}
- /**
- * Load files from {@link LocalCache} to {@link Backend}.
- */
- private void loadFilesFromCache() throws RepositoryException {
+ private void uploadFilesFromCache() throws RepositoryException {
ArrayList files = new ArrayList();
listRecursive(files, directory);
long totalSize = 0;
for (File f : files) {
totalSize += f.length();
}
+ if (concurrentUploadsThreads > 1) {
+ new FilesUploader(files, totalSize, concurrentUploadsThreads).upload();
+ } else {
+ uploadFilesInSingleThread(files, totalSize);
+ }
+ }
+
+ private void uploadFilesInSingleThread(List files, long totalSize) throws RepositoryException {
+ long startTime = System.currentTimeMillis();
+ LOG.info("Upload: {" + files.size() + "} files in single thread.");
+ long currentCount = 0;
long currentSize = 0;
long time = System.currentTimeMillis();
for (File f : files) {
long now = System.currentTimeMillis();
if (now > time + 5000) {
- LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + "} files, {" + currentSize + "}/{" + totalSize
+ + "} size data");
time = now;
}
- currentSize += f.length();
String name = f.getName();
- LOG.debug("upload file = " + name);
- if (!name.startsWith(TMP) && !name.endsWith(DS_STORE)
- && f.length() > 0) {
- loadFileToBackEnd(f);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("upload file = " + name);
}
+ if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) && f.length() > 0) {
+ uploadFileToBackEnd(f);
+ }
+ currentSize += f.length();
+ currentCount++;
}
- LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}");
+ long endTime = System.currentTimeMillis();
+ LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + "} files, {" + currentSize + "}/{" + totalSize
+ + "} size data, time taken {" + ((endTime - startTime) / 1000) + "} sec");
}
-
+
/**
* Traverse recursively and populate list with files.
*/
@@ -431,7 +459,7 @@
* file to uploaded.
* @throws DataStoreException
*/
- private void loadFileToBackEnd(File f) throws DataStoreException {
+ private void uploadFileToBackEnd(File f) throws DataStoreException {
DataIdentifier identifier = new DataIdentifier(f.getName());
usesIdentifier(identifier);
backend.write(identifier, f);
@@ -601,5 +629,168 @@
public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) {
this.cachePurgeResizeFactor = cachePurgeResizeFactor;
}
+
+ public int getConcurrentUploadsThreads() {
+ return concurrentUploadsThreads;
+ }
+ public void setConcurrentUploadsThreads(int concurrentUploadsThreads) {
+ this.concurrentUploadsThreads = concurrentUploadsThreads;
+ }
+
+ /**
+ * This class initiates files upload in multiple threads to backend.
+ */
+ private class FilesUploader {
+ private List files;
+
+ private long totalSize;
+
+ private volatile AtomicInteger currentCount = new AtomicInteger();
+
+ private volatile AtomicLong currentSize = new AtomicLong();
+
+ private volatile AtomicBoolean exceptionRaised = new AtomicBoolean();
+
+ private DataStoreException exception;
+
+ private final int threads;
+
+ public FilesUploader(List files, long totalSize, int threads) {
+ super();
+ this.files = files;
+ this.threads = threads;
+ this.totalSize = totalSize;
+ }
+
+ protected void addCurrentCount(int delta) {
+ currentCount.addAndGet(delta);
+ }
+
+ protected void addCurrentSize(long delta) {
+ currentSize.addAndGet(delta);
+ }
+
+ protected synchronized void setException(DataStoreException exception) {
+ exceptionRaised.getAndSet(true);
+ this.exception = exception;
+ }
+
+ protected boolean isExceptionRaised() {
+ return exceptionRaised.get();
+ }
+
+ protected void logProgress() {
+ LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + "} files, {" + currentSize.get() + "}/{" + totalSize
+ + "} size data");
+ }
+
+ public void upload() throws DataStoreException {
+ long startTime = System.currentTimeMillis();
+ LOG.info(" Uploading " + files.size() + " using " + threads + " threads.");
+ ExecutorService executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory("backend-file-upload-worker"));
+ int partitionSize = files.size() / (threads);
+ int startIndex = 0;
+ int endIndex = partitionSize;
+ for (int i = 1; i <= threads; i++) {
+ List partitionFileList = Collections.unmodifiableList(files.subList(startIndex, endIndex));
+ FileUploaderThread fut = new FileUploaderThread(partitionFileList, startIndex, endIndex, this);
+ executor.execute(fut);
+
+ startIndex = endIndex;
+ if (i == (threads - 1)) {
+ endIndex = files.size();
+ } else {
+ endIndex = startIndex + partitionSize;
+ }
+ }
+ // This will make the executor accept no new threads
+ // and finish all existing threads in the queue
+ executor.shutdown();
+
+ try {
+ // Wait until all threads are finish
+ while (!isExceptionRaised() && !executor.awaitTermination(15, TimeUnit.SECONDS)) {
+ logProgress();
+ }
+ } catch (InterruptedException ie) {
+
+ }
+ long endTime = System.currentTimeMillis();
+ LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + "} files, {" + currentSize.get() + "}/{" + totalSize
+ + "} size data, time taken {" + ((endTime - startTime) / 1000) + "} sec");
+ if (isExceptionRaised()) {
+ executor.shutdownNow(); // Cancel currently executing tasks
+ throw exception;
+ }
+ }
+
+ }
+
+ /**
+ * This class implements @link {@link Runnable} interface and uploads list
+ * of files to backend.
+ */
+ private class FileUploaderThread implements Runnable {
+ private List files;
+
+ private FilesUploader filesUploader;
+
+ private int startIndex;
+
+ private int endIndex;
+
+ public FileUploaderThread(List files, int startIndex, int endIndex, FilesUploader controller) {
+ super();
+ this.files = files;
+ this.filesUploader = controller;
+ this.startIndex = startIndex;
+ this.endIndex = endIndex;
+ }
+
+ public void run() {
+ long time = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Thread [ " + Thread.currentThread().getName() + "]: Uploading files from startIndex[" + startIndex
+ + "] and endIndex [" + (endIndex - 1) + "], both inclusive.");
+ }
+ int uploadCount = 0;
+ long uploadSize = 0;
+ try {
+ for (File f : files) {
+
+ if (filesUploader.isExceptionRaised()) {
+ break;
+ }
+ String name = f.getName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("upload file = " + name);
+ }
+ if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) && f.length() > 0) {
+ uploadFileToBackEnd(f);
+ }
+ uploadCount++;
+ uploadSize += f.length();
+ // update upload status at every 15 seconds.
+ long now = System.currentTimeMillis();
+ if (now > time + 15000) {
+ filesUploader.addCurrentCount(uploadCount);
+ filesUploader.addCurrentSize(uploadSize);
+ uploadCount = 0;
+ uploadSize = 0;
+ time = now;
+ }
+ }
+ // update final state.
+ filesUploader.addCurrentCount(uploadCount);
+ filesUploader.addCurrentSize(uploadSize);
+ } catch (DataStoreException e) {
+ if (!filesUploader.isExceptionRaised()) {
+ filesUploader.setException(e);
+ }
+ }
+
+ }
+ }
+
}
Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java
===================================================================
--- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (revision 0)
+++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (working copy)
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.core.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class extends {@link ThreadFactory} to creates named threads.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+
+ private AtomicInteger threadCount = new AtomicInteger(1);
+
+ String threadPrefixName;
+
+ public NamedThreadFactory(String threadPrefixName) {
+ super();
+ this.threadPrefixName = threadPrefixName;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(getClass().getClassLoader());
+ thread.setName(threadPrefixName + "-" + threadCount.getAndIncrement());
+ return thread;
+ }
+
+}