diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java new file mode 100644 index 0000000000..c144c77597 --- /dev/null +++ b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/SameThreadExecutorService.java @@ -0,0 +1,139 @@ +package org.apache.jackrabbit.oak.plugins.blob; + +import com.google.common.util.concurrent.AbstractListeningExecutorService; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Class copied from the Guava 15, to make the AzureDataStore compatible with + * the Guava 26 (where the SameThreadExecutorService is not present). + * + * TODO: Remove this class once the whole Oak is migrated to use Guava 26. + */ +class SameThreadExecutorService extends AbstractListeningExecutorService { + /** + * Lock used whenever accessing the state variables + * (runningTasks, shutdown, terminationCondition) of the executor + */ + private final Lock lock = new ReentrantLock(); + + /** Signaled after the executor is shutdown and running tasks are done */ + private final Condition termination = lock.newCondition(); + + /* + * Conceptually, these two variables describe the executor being in + * one of three states: + * - Active: shutdown == false + * - Shutdown: runningTasks > 0 and shutdown == true + * - Terminated: runningTasks == 0 and shutdown == true + */ + private int runningTasks = 0; + private boolean shutdown = false; + + @Override + public void execute(Runnable command) { + startTask(); + try { + command.run(); + } finally { + endTask(); + } + } + + @Override + public boolean isShutdown() { + lock.lock(); + try { + return shutdown; + } finally { + lock.unlock(); + } + } + + @Override + public void shutdown() { + lock.lock(); + try { + shutdown = true; + } finally { + lock.unlock(); + } + } + + // See sameThreadExecutor javadoc for unusual behavior of this method. + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isTerminated() { + lock.lock(); + try { + return shutdown && runningTasks == 0; + } finally { + lock.unlock(); + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lock(); + try { + for (;;) { + if (isTerminated()) { + return true; + } else if (nanos <= 0) { + return false; + } else { + nanos = termination.awaitNanos(nanos); + } + } + } finally { + lock.unlock(); + } + } + + /** + * Checks if the executor has been shut down and increments the running + * task count. + * + * @throws RejectedExecutionException if the executor has been previously + * shutdown + */ + private void startTask() { + lock.lock(); + try { + if (isShutdown()) { + throw new RejectedExecutionException("Executor already shutdown"); + } + runningTasks++; + } finally { + lock.unlock(); + } + } + + /** + * Decrements the running task count. + */ + private void endTask() { + lock.lock(); + try { + runningTasks--; + if (isTerminated()) { + termination.signalAll(); + } + } finally { + lock.unlock(); + } + } +} diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java index 2ecf30264d..019a6ca8c8 100644 --- a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java +++ b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/UploadStagingCache.java @@ -21,6 +21,8 @@ package org.apache.jackrabbit.oak.plugins.blob; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -31,9 +33,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.base.Optional; -import com.google.common.base.Predicate; import com.google.common.cache.Weigher; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -44,7 +46,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.core.data.DataStoreException; import org.apache.jackrabbit.core.data.util.NamedThreadFactory; import org.apache.jackrabbit.oak.commons.StringUtils; @@ -237,15 +238,18 @@ public class UploadStagingCache implements Closeable { // Move any older cache pending uploads movePendingUploadsToStaging(home, rootPath, true); - Iterator iter = Files.fileTreeTraverser().postOrderTraversal(uploadCacheSpace) - .filter(new Predicate() { - @Override public boolean apply(File input) { - return input.isFile(); - } - }).iterator(); + List files; + try { + uploadCacheSpace.mkdirs(); + files = java.nio.file.Files.find(uploadCacheSpace.toPath(), Integer.MAX_VALUE, (path, basicFileAttributes) -> basicFileAttributes.isRegularFile()) + .map(Path::toFile) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + int count = 0; - while (iter.hasNext()) { - File toBeSyncedFile = iter.next(); + for (File toBeSyncedFile : files) { Optional> scheduled = putOptionalDisregardingSize(toBeSyncedFile.getName(), toBeSyncedFile, true); if (scheduled.isPresent()) { @@ -403,7 +407,7 @@ public class UploadStagingCache implements Closeable { result.setException(t); retryQueue.add(id); } - }); + }, new SameThreadExecutorService()); LOG.debug("File [{}] scheduled for upload [{}]", upload, result); } catch (Exception e) { LOG.error("Error staging file for upload [{}]", upload, e); @@ -411,6 +415,7 @@ public class UploadStagingCache implements Closeable { return result; } + /** * Invalidate called externally. * @param key to invalidate