Index: jackrabbit-aws-ext/pom.xml =================================================================== --- jackrabbit-aws-ext/pom.xml (revision 1574775) +++ jackrabbit-aws-ext/pom.xml (working copy) @@ -45,7 +45,7 @@ com.amazonaws aws-java-sdk - 1.5.6 + 1.7.2 org.apache.jackrabbit Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1574775) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy) @@ -36,6 +36,7 @@ import org.apache.jackrabbit.core.data.CachingDataStore; import org.apache.jackrabbit.core.data.DataIdentifier; import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.util.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +138,16 @@ */ s3service.setEndpoint(endpoint); LOG.info("S3 service endpoint: " + endpoint); - tmx = new TransferManager(s3service, createDefaultExecutorService()); + int writeThreads = 10; + String writeThreadsStr = prop.getProperty(S3Constants.S3_WRITE_THREADS); + if (writeThreadsStr != null) { + writeThreads = Integer.parseInt(writeThreadsStr); + } + LOG.info("Using thread pool of [" + writeThreads + + "] threads in S3 transfer manager"); + tmx = new TransferManager(s3service, + (ThreadPoolExecutor) Executors.newFixedThreadPool(writeThreads, + new NamedThreadFactory("s3-transfer-manager-worker"))); LOG.debug(" done"); } catch (Exception e) { LOG.debug(" error ", e); Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (revision 1574775) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/S3Constants.java (working copy) @@ -41,6 +41,11 @@ * Amazon aws S3 region. */ public static final String S3_REGION = "s3Region"; + + /** + * Constant to specify S3 write workers. + */ + public static final String S3_WRITE_THREADS = "writeThreads"; /** * private constructor so that class cannot initialized from outside. Index: jackrabbit-aws-ext/src/test/resources/aws.properties =================================================================== --- jackrabbit-aws-ext/src/test/resources/aws.properties (revision 1574775) +++ jackrabbit-aws-ext/src/test/resources/aws.properties (working copy) @@ -36,3 +36,5 @@ socketTimeout=120000 maxConnections=10 maxErrorRetry=10 +# maximum concrruent threads to write to S3. +writeThreads=10 Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (revision 1574775) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (working copy) @@ -33,14 +33,20 @@ import java.util.List; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.jcr.RepositoryException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.core.util.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A caching data store that consists of {@link LocalCache} and {@link Backend}. * {@link Backend} is single source of truth. All methods first try to fetch @@ -58,9 +64,10 @@ * <param name="{@link #setConfig(String) config}" value="${rep.home}/backend.properties"/> * <param name="{@link #setCacheSize(long) cacheSize}" value="68719476736"/> * <param name="{@link #setSecret(String) secret}" value="123456"/> - * <param name="{@link #setCachePurgeTrigFactor(double)}" value="0.95d"/> - * <param name="{@link #setCachePurgeResizeFactor(double) cacheSize}" value="0.85d"/> + * <param name="{@link #setCachePurgeTrigFactor(double) cachePurgeTrigFactor}" value="0.95d"/> + * <param name="{@link #setCachePurgeResizeFactor(double) cachePurgeResizeFactor}" value="0.85d"/> * <param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/> + * <param name="{@link #setConcurrentUploadsThreads(int) concurrentUploadsThreads}" value="10"/> * </DataStore> */ public abstract class CachingDataStore extends AbstractDataStore implements @@ -132,6 +139,14 @@ private double cachePurgeResizeFactor = 0.85d; /** + * The {@link #init(String)} methods checks for {@link #getMarkerFile()} and + * if it doesn't exists migrates all files from fileystem to {@link Backend} + * . This parameter governs number of threads which will upload files + * concurrently to {@link Backend}. + */ + private int concurrentUploadsThreads = 10; + + /** * The number of bytes in the cache. The default value is 64 GB. */ private long cacheSize = 64L * 1024 * 1024 * 1024; @@ -184,7 +199,7 @@ File markerFile = new File(homeDir, markerFileName); if (!markerFile.exists()) { LOG.info("load files from local cache"); - loadFilesFromCache(); + uploadFilesFromCache(); try { markerFile.createNewFile(); } catch (IOException e) { @@ -379,35 +394,48 @@ return File.createTempFile(TMP, null, tmpDir); } - /** - * Load files from {@link LocalCache} to {@link Backend}. - */ - private void loadFilesFromCache() throws RepositoryException { + private void uploadFilesFromCache() throws RepositoryException { ArrayList files = new ArrayList(); listRecursive(files, directory); long totalSize = 0; for (File f : files) { totalSize += f.length(); } + if (concurrentUploadsThreads > 1) { + new FilesUploader(files, totalSize, concurrentUploadsThreads).upload(); + } else { + uploadFilesInSingleThread(files, totalSize); + } + } + + private void uploadFilesInSingleThread(List files, long totalSize) throws RepositoryException { + long startTime = System.currentTimeMillis(); + LOG.info("Upload: {" + files.size() + "} files in single thread."); + long currentCount = 0; long currentSize = 0; long time = System.currentTimeMillis(); for (File f : files) { long now = System.currentTimeMillis(); if (now > time + 5000) { - LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}"); + LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + "} files, {" + currentSize + "}/{" + totalSize + + "} size data"); time = now; } - currentSize += f.length(); String name = f.getName(); - LOG.debug("upload file = " + name); - if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) - && f.length() > 0) { - loadFileToBackEnd(f); + if (LOG.isDebugEnabled()) { + LOG.debug("upload file = " + name); } + if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) && f.length() > 0) { + uploadFileToBackEnd(f); + } + currentSize += f.length(); + currentCount++; } - LOG.info("Uploaded {" + currentSize + "}/{" + totalSize + "}"); + long endTime = System.currentTimeMillis(); + LOG.info("Uploaded: {" + currentCount + "}/{" + files.size() + "} files, {" + currentSize + "}/{" + totalSize + + "} size data, time taken {" + ((endTime - startTime) / 1000) + "} sec"); } - + /** * Traverse recursively and populate list with files. */ @@ -431,7 +459,7 @@ * file to uploaded. * @throws DataStoreException */ - private void loadFileToBackEnd(File f) throws DataStoreException { + private void uploadFileToBackEnd(File f) throws DataStoreException { DataIdentifier identifier = new DataIdentifier(f.getName()); usesIdentifier(identifier); backend.write(identifier, f); @@ -601,5 +629,168 @@ public void setCachePurgeResizeFactor(double cachePurgeResizeFactor) { this.cachePurgeResizeFactor = cachePurgeResizeFactor; } + + public int getConcurrentUploadsThreads() { + return concurrentUploadsThreads; + } + public void setConcurrentUploadsThreads(int concurrentUploadsThreads) { + this.concurrentUploadsThreads = concurrentUploadsThreads; + } + + /** + * This class initiates files upload in multiple threads to backend. + */ + private class FilesUploader { + private List files; + + private long totalSize; + + private volatile AtomicInteger currentCount = new AtomicInteger(); + + private volatile AtomicLong currentSize = new AtomicLong(); + + private volatile AtomicBoolean exceptionRaised = new AtomicBoolean(); + + private DataStoreException exception; + + private final int threads; + + public FilesUploader(List files, long totalSize, int threads) { + super(); + this.files = files; + this.threads = threads; + this.totalSize = totalSize; + } + + protected void addCurrentCount(int delta) { + currentCount.addAndGet(delta); + } + + protected void addCurrentSize(long delta) { + currentSize.addAndGet(delta); + } + + protected synchronized void setException(DataStoreException exception) { + exceptionRaised.getAndSet(true); + this.exception = exception; + } + + protected boolean isExceptionRaised() { + return exceptionRaised.get(); + } + + protected void logProgress() { + LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + "} files, {" + currentSize.get() + "}/{" + totalSize + + "} size data"); + } + + public void upload() throws DataStoreException { + long startTime = System.currentTimeMillis(); + LOG.info(" Uploading " + files.size() + " using " + threads + " threads."); + ExecutorService executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory("backend-file-upload-worker")); + int partitionSize = files.size() / (threads); + int startIndex = 0; + int endIndex = partitionSize; + for (int i = 1; i <= threads; i++) { + List partitionFileList = Collections.unmodifiableList(files.subList(startIndex, endIndex)); + FileUploaderThread fut = new FileUploaderThread(partitionFileList, startIndex, endIndex, this); + executor.execute(fut); + + startIndex = endIndex; + if (i == (threads - 1)) { + endIndex = files.size(); + } else { + endIndex = startIndex + partitionSize; + } + } + // This will make the executor accept no new threads + // and finish all existing threads in the queue + executor.shutdown(); + + try { + // Wait until all threads are finish + while (!isExceptionRaised() && !executor.awaitTermination(15, TimeUnit.SECONDS)) { + logProgress(); + } + } catch (InterruptedException ie) { + + } + long endTime = System.currentTimeMillis(); + LOG.info("Uploaded: {" + currentCount.get() + "}/{" + files.size() + "} files, {" + currentSize.get() + "}/{" + totalSize + + "} size data, time taken {" + ((endTime - startTime) / 1000) + "} sec"); + if (isExceptionRaised()) { + executor.shutdownNow(); // Cancel currently executing tasks + throw exception; + } + } + + } + + /** + * This class implements @link {@link Runnable} interface and uploads list + * of files to backend. + */ + private class FileUploaderThread implements Runnable { + private List files; + + private FilesUploader filesUploader; + + private int startIndex; + + private int endIndex; + + public FileUploaderThread(List files, int startIndex, int endIndex, FilesUploader controller) { + super(); + this.files = files; + this.filesUploader = controller; + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + public void run() { + long time = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Thread [ " + Thread.currentThread().getName() + "]: Uploading files from startIndex[" + startIndex + + "] and endIndex [" + (endIndex - 1) + "], both inclusive."); + } + int uploadCount = 0; + long uploadSize = 0; + try { + for (File f : files) { + + if (filesUploader.isExceptionRaised()) { + break; + } + String name = f.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("upload file = " + name); + } + if (!name.startsWith(TMP) && !name.endsWith(DS_STORE) && f.length() > 0) { + uploadFileToBackEnd(f); + } + uploadCount++; + uploadSize += f.length(); + // update upload status at every 15 seconds. + long now = System.currentTimeMillis(); + if (now > time + 15000) { + filesUploader.addCurrentCount(uploadCount); + filesUploader.addCurrentSize(uploadSize); + uploadCount = 0; + uploadSize = 0; + time = now; + } + } + // update final state. + filesUploader.addCurrentCount(uploadCount); + filesUploader.addCurrentSize(uploadSize); + } catch (DataStoreException e) { + if (!filesUploader.isExceptionRaised()) { + filesUploader.setException(e); + } + } + + } + } + } Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (revision 0) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/util/NamedThreadFactory.java (working copy) @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.core.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class extends {@link ThreadFactory} to creates named threads. + */ +public class NamedThreadFactory implements ThreadFactory { + + private AtomicInteger threadCount = new AtomicInteger(1); + + String threadPrefixName; + + public NamedThreadFactory(String threadPrefixName) { + super(); + this.threadPrefixName = threadPrefixName; + } + + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setContextClassLoader(getClass().getClassLoader()); + thread.setName(threadPrefixName + "-" + threadCount.getAndIncrement()); + return thread; + } + +}