.../jackrabbit/core/data/CachingDataStore.java | 75 +++++++++++++++++++--- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java b/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java index a0c190f..7107f43 100644 --- a/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java +++ b/jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java @@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory; * <param name="{@link #setAsyncUploadLimit(int) asyncUploadLimit}" value="100"/> * <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/> * <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/> + * <param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/> * </DataStore> */ public abstract class CachingDataStore extends AbstractDataStore implements @@ -120,6 +121,12 @@ public abstract class CachingDataStore extends AbstractDataStore implements */ protected final Map asyncTouchCache = new ConcurrentHashMap(5); + /** + * In memory map to hold in-progress asynchronous downloads. Once + * download is finished corresponding entry is flushed from the map. + */ + protected final Map asyncDownloadCache = new ConcurrentHashMap(5); + protected Backend backend; /** @@ -141,6 +148,12 @@ public abstract class CachingDataStore extends AbstractDataStore implements private boolean touchAsync = false; /** + * Flag to indicate that binary content will be cached proactively and + * asynchronously when binary metadata is retrieved from {@link Backend}. + */ + private boolean proactiveCaching = true; + + /** * The optional backend configuration. */ private String config; @@ -184,6 +197,11 @@ public abstract class CachingDataStore extends AbstractDataStore implements */ private AsyncUploadCache asyncWriteCache; + /** + * {@link ExecutorService} to asynchronous downloads + */ + private ExecutorService downloadExecService; + protected abstract Backend createBackend(); protected abstract String getMarkerFile(); @@ -311,6 +329,8 @@ public abstract class CachingDataStore extends AbstractDataStore implements asyncWriteCache.reset(); } } + downloadExecService = Executors.newFixedThreadPool(5, + new NamedThreadFactory("backend-file-download-worker")); cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize, cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache); } catch (Exception e) { @@ -397,6 +417,7 @@ public abstract class CachingDataStore extends AbstractDataStore implements public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException { String fileName = getFileName(identifier); + boolean existsAtBacked = false; try { if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) { LOG.debug("[{}] record retrieved from asyncUploadmap", @@ -404,14 +425,18 @@ public abstract class CachingDataStore extends AbstractDataStore implements usesIdentifier(identifier); return new CachingDataRecord(this, identifier); } else if (cache.getFileIfStored(fileName) != null - || backend.exists(identifier)) { - LOG.debug("[{}] record retrieved from local cache or backend", - identifier); + || (existsAtBacked = backend.exists(identifier))) { + if (existsAtBacked) { + LOG.debug("[{}] record retrieved from backend", identifier); + asyncDownload(identifier); + } else { + LOG.debug("[{}] record retrieved from local cache", + identifier); + } touchInternal(identifier); usesIdentifier(identifier); return new CachingDataRecord(this, identifier); } - } catch (IOException ioe) { throw new DataStoreException("error in getting record [" + identifier + "]", ioe); @@ -558,6 +583,7 @@ public abstract class CachingDataStore extends AbstractDataStore implements LOG.debug( "identifier [{}], lastModified=[{}] retrireved from backend ", identifier, lastModified); + asyncDownload(identifier); } return lastModified; } @@ -566,13 +592,16 @@ public abstract class CachingDataStore extends AbstractDataStore implements * Return the length of record from {@link LocalCache} if available, * otherwise retrieve it from {@link Backend}. */ - public long getLength(DataIdentifier identifier) throws DataStoreException { + public long getLength(final DataIdentifier identifier) + throws DataStoreException { String fileName = getFileName(identifier); Long length = cache.getFileLength(fileName); if (length != null) { return length.longValue(); } else { - return backend.getLength(identifier); + length = backend.getLength(identifier); + asyncDownload(identifier); + return length; } } @@ -763,6 +792,33 @@ public abstract class CachingDataStore extends AbstractDataStore implements } } + /** + * Invoke {@link #getStream(DataIdentifier)} asynchronously to cache binary + * asynchronously. + */ + private void asyncDownload(final DataIdentifier identifier) { + if (proactiveCaching + && cacheSize != 0 + && asyncDownloadCache.put(identifier, System.currentTimeMillis()) == null) { + downloadExecService.execute(new Runnable() { + @Override + public void run() { + InputStream input = null; + try { + // getStream to cache file + LOG.debug("Async download [{}] started.", identifier); + input = getStream(identifier); + } catch (RepositoryException re) { + // ignore exception + } finally { + asyncDownloadCache.remove(identifier); + IOUtils.closeQuietly(input); + LOG.debug("Async download [{}] completed.", identifier); + } + } + }); + } + } /** * Returns a unique temporary file to be used for creating a new data @@ -914,6 +970,7 @@ public abstract class CachingDataStore extends AbstractDataStore implements public void close() throws DataStoreException { cache.close(); backend.close(); + downloadExecService.shutdown(); } /** @@ -1063,13 +1120,15 @@ public abstract class CachingDataStore extends AbstractDataStore implements public void setUploadRetries(int uploadRetries) { this.uploadRetries = uploadRetries; } - - public void setTouchAsync(boolean touchAsync) { this.touchAsync = touchAsync; } + public void setProactiveCaching(boolean proactiveCaching) { + this.proactiveCaching = proactiveCaching; + } + public Backend getBackend() { return backend; }