Index: jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java =================================================================== --- jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (revision 1671014) +++ jackrabbit-aws-ext/src/main/java/org/apache/jackrabbit/aws/ext/ds/S3Backend.java (working copy) @@ -529,7 +529,7 @@ && s3service.getObjectMetadata(bucket, s3ObjSumm.getKey()).getLastModified().getTime() < min) { - + store.deleteFromCache(identifier); LOG.debug("add id [{}] to delete lists", s3ObjSumm.getKey()); deleteList.add(new DeleteObjectsRequest.KeyVersion( Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (revision 1671014) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/CachingDataStore.java (working copy) @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,7 @@ * <param name="{@link #setUploadRetries(int) uploadRetries}" value="3"/> * <param name="{@link #setTouchAsync(boolean) touchAsync}" value="false"/> * <param name="{@link #setProactiveCaching(boolean) proactiveCaching}" value="true"/> + * <param name="{@link #setRecLengthCacheSize(int) recLengthCacheSize}" value="200"/> * </DataStore> */ public abstract class CachingDataStore extends AbstractDataStore implements @@ -127,6 +129,12 @@ */ protected final Map asyncDownloadCache = new ConcurrentHashMap(5); + /** + * In memory cache to hold {@link DataRecord#getLength()} against + * {@link DataIdentifier} + */ + protected Map recLenCache = null; + protected Backend backend; /** @@ -236,6 +244,12 @@ * repository.xml. By default it is 100 */ private int asyncUploadLimit = 100; + + /** + * Size of {@link #recLenCache}. Each entry consumes of approx 140 bytes. + * Default total memory consumption of {@link #recLenCache} 28KB. + */ + private int recLengthCacheSize = 200; /** * Initialized the data store. If the path is not set, <repository @@ -333,6 +347,25 @@ new NamedThreadFactory("backend-file-download-worker")); cache = new LocalCache(path, tmpDir.getAbsolutePath(), cacheSize, cachePurgeTrigFactor, cachePurgeResizeFactor, asyncWriteCache); + /* + * Initialize LRU cache of size {@link #recLengthCacheSize} + */ + recLenCache = Collections.synchronizedMap(new LinkedHashMap( + recLengthCacheSize, 0.75f, true) { + + private static final long serialVersionUID = -8752749075395630485L; + + @Override + protected boolean removeEldestEntry( + Map.Entry eldest) { + if (size() > recLengthCacheSize) { + LOG.trace("evicted from recLengthCache [{}]", + eldest.getKey()); + return true; + } + return false; + } + }); } catch (Exception e) { throw new RepositoryException(e); } @@ -415,24 +448,17 @@ @Override public DataRecord getRecord(DataIdentifier identifier) - throws DataStoreException { + throws DataStoreException { String fileName = getFileName(identifier); - boolean existsAtBackend = false; try { if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) { - LOG.debug("[{}] record retrieved from asyncUploadmap", + LOG.debug("getRecord: [{}] retrieved from asyncUploadmap", identifier); usesIdentifier(identifier); return new CachingDataRecord(this, identifier); - } else if (cache.getFileIfStored(fileName) != null - || (existsAtBackend = backend.exists(identifier))) { - if (existsAtBackend) { - LOG.debug("[{}] record retrieved from backend", identifier); - asyncDownload(identifier); - } else { - LOG.debug("[{}] record retrieved from local cache", - identifier); - } + } else if (getLength(identifier) > -1) { + LOG.debug("getRecord: [{}] retrieved using getLength", + identifier); touchInternal(identifier); usesIdentifier(identifier); return new CachingDataRecord(this, identifier); @@ -453,19 +479,36 @@ */ @Override public DataRecord getRecordIfStored(DataIdentifier identifier) - throws DataStoreException { + throws DataStoreException { String fileName = getFileName(identifier); try { if (asyncWriteCache.hasEntry(fileName, minModifiedDate > 0)) { - LOG.debug("[{}] record retrieved from asyncuploadmap", + LOG.debug( + "getRecordIfStored: [{}] retrieved from asyncuploadmap", identifier); usesIdentifier(identifier); return new CachingDataRecord(this, identifier); - } else if (backend.exists(identifier)) { - LOG.debug("[{}] record retrieved from backend", identifier); + } else if (recLenCache.containsKey(identifier)) { + LOG.debug( + "getRecordIfStored: [{}] retrieved using recLenCache", + identifier); touchInternal(identifier); usesIdentifier(identifier); return new CachingDataRecord(this, identifier); + } else { + try { + long length = backend.getLength(identifier); + LOG.debug( + "getRecordIfStored :[{}] retrieved from backend", + identifier); + recLenCache.put(identifier, length); + touchInternal(identifier); + usesIdentifier(identifier); + return new CachingDataRecord(this, identifier); + } catch (DataStoreException ignore) { + LOG.warn(" getRecordIfStored: [{}] not found", identifier); + } + } } catch (IOException ioe) { throw new DataStoreException(ioe); @@ -507,6 +550,7 @@ synchronized (this) { try { // order is important here + recLenCache.remove(identifier); asyncWriteCache.delete(fileName); backend.deleteRecord(identifier); cache.delete(fileName); @@ -520,8 +564,10 @@ public synchronized int deleteAllOlderThan(long min) throws DataStoreException { Set diSet = backend.deleteAllOlderThan(min); + // remove entries from local cache for (DataIdentifier identifier : diSet) { + recLenCache.remove(identifier); cache.delete(getFileName(identifier)); } try { @@ -593,13 +639,24 @@ * otherwise retrieve it from {@link Backend}. */ public long getLength(final DataIdentifier identifier) - throws DataStoreException { + throws DataStoreException { String fileName = getFileName(identifier); - Long length = cache.getFileLength(fileName); + + Long length = recLenCache.get(identifier); if (length != null) { - return length.longValue(); + LOG.debug(" identifier [{}] length fetched from recLengthCache", + identifier); + return length; + } else if ((length = cache.getFileLength(fileName)) != null) { + LOG.debug(" identifier [{}] length fetched from local cache", + identifier); + recLenCache.put(identifier, length); + return length; } else { length = backend.getLength(identifier); + LOG.debug(" identifier [{}] length fetched from backend", + identifier); + recLenCache.put(identifier, length); asyncDownload(identifier); return length; } @@ -618,6 +675,20 @@ return asyncWriteCache.getAll(); } + + public void deleteFromCache(DataIdentifier identifier) + throws DataStoreException { + try { + // order is important here + recLenCache.remove(identifier); + String fileName = getFileName(identifier); + asyncWriteCache.delete(fileName); + cache.delete(fileName); + } catch (IOException ioe) { + throw new DataStoreException(ioe); + } + } + @Override public void onSuccess(AsyncUploadResult result) { DataIdentifier identifier = result.getIdentifier(); @@ -1128,6 +1199,10 @@ public void setProactiveCaching(boolean proactiveCaching) { this.proactiveCaching = proactiveCaching; } + + public void setRecLengthCacheSize(int recLengthCacheSize) { + this.recLengthCacheSize = recLengthCacheSize; + } public Backend getBackend() { return backend; Index: jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java =================================================================== --- jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (revision 1671014) +++ jackrabbit-data/src/main/java/org/apache/jackrabbit/core/data/LocalCache.java (working copy) @@ -615,8 +615,8 @@ LOG.info ("tmp file [{}] skipped ", filePath); continue; } - if (name.startsWith(dataStorePath)) { - name = name.substring(dataStorePath.length()); + if (filePath.startsWith(dataStorePath)) { + name = filePath.substring(dataStorePath.length()); } if (name.startsWith("/") || name.startsWith("\\")) { name = name.substring(1); Index: jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java =================================================================== --- jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (revision 1671014) +++ jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/InMemoryBackend.java (working copy) @@ -101,7 +101,7 @@ } @Override - public Set deleteAllOlderThan(final long min) { + public Set deleteAllOlderThan(final long min) throws DataStoreException { log("deleteAllOlderThan " + min); Set tobeDeleted = new HashSet(); for (Map.Entry entry : timeMap.entrySet()) { @@ -109,6 +109,7 @@ long timestamp = entry.getValue(); if (timestamp < min && !store.isInUse(identifier) && store.confirmDelete(identifier)) { + store.deleteFromCache(identifier); tobeDeleted.add(identifier); } } Index: jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java =================================================================== --- jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java (revision 1671014) +++ jackrabbit-data/src/test/java/org/apache/jackrabbit/core/data/TestLocalCache.java (working copy) @@ -31,6 +31,7 @@ import junit.framework.TestCase; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.core.data.util.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +46,9 @@ private static final String TEMP_DIR = "target/temp"; private static final String TARGET_DIR = "target"; - - protected String cacheDirPath; + protected String cacheDirPath; + protected String tempDirPath; /** @@ -61,15 +62,23 @@ protected void setUp() { try { cacheDirPath = CACHE_DIR + "-" + + String.valueOf(randomGen.nextInt(9999)) + "-" + String.valueOf(randomGen.nextInt(9999)); File cachedir = new File(cacheDirPath); - if (cachedir.exists()) FileUtils.deleteQuietly(cachedir); + for (int i = 0; i < 4 && cachedir.exists(); i++) { + FileUtils.deleteQuietly(cachedir); + Thread.sleep(1000); + } cachedir.mkdirs(); tempDirPath = TEMP_DIR + "-" + + String.valueOf(randomGen.nextInt(9999)) + "-" + String.valueOf(randomGen.nextInt(9999)); File tempdir = new File(tempDirPath); - if (tempdir.exists()) FileUtils.deleteQuietly(tempdir); + for (int i = 0; i < 4 && tempdir.exists(); i++) { + FileUtils.deleteQuietly(tempdir); + Thread.sleep(1000); + } tempdir.mkdirs(); } catch (Exception e) { LOG.error("error:", e); @@ -78,15 +87,17 @@ } @Override - protected void tearDown() throws IOException { + protected void tearDown() throws Exception { File cachedir = new File(cacheDirPath); - if (cachedir.exists()) { + for (int i = 0; i < 4 && cachedir.exists(); i++) { FileUtils.deleteQuietly(cachedir); + Thread.sleep(1000); } File tempdir = new File(tempDirPath); - if (tempdir.exists()) { + for (int i = 0; i < 4 && tempdir.exists(); i++) { FileUtils.deleteQuietly(tempdir); + Thread.sleep(1000); } } @@ -98,8 +109,8 @@ AsyncUploadCache pendingFiles = new AsyncUploadCache(); pendingFiles.init(tempDirPath, cacheDirPath, 100); pendingFiles.reset(); - LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95, - 0.70, pendingFiles); + LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, + 0.95, 0.70, pendingFiles); Random random = new Random(12345); byte[] data = new byte[100]; Map byteMap = new HashMap(); @@ -117,12 +128,15 @@ cache.store("a1", new ByteArrayInputStream(byteMap.get("a1"))); cache.store("a2", new ByteArrayInputStream(byteMap.get("a2"))); cache.store("a3", new ByteArrayInputStream(byteMap.get("a3"))); - assertEquals(new ByteArrayInputStream(byteMap.get("a1")), - cache.getIfStored("a1")); - assertEquals(new ByteArrayInputStream(byteMap.get("a2")), - cache.getIfStored("a2")); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); + InputStream result = cache.getIfStored("a1"); + assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result); + IOUtils.closeQuietly(result); + result = cache.getIfStored("a2"); + assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result); + IOUtils.closeQuietly(result); + result = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result); + IOUtils.closeQuietly(result); } catch (Exception e) { LOG.error("error:", e); fail(); @@ -138,8 +152,8 @@ AsyncUploadCache pendingFiles = new AsyncUploadCache(); pendingFiles.init(tempDirPath, cacheDirPath, 100); pendingFiles.reset(); - LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95, - 0.70, pendingFiles); + LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, + 0.95, 0.70, pendingFiles); Random random = new Random(12345); byte[] data = new byte[100]; Map byteMap = new HashMap(); @@ -161,13 +175,17 @@ cache.store("a1", new ByteArrayInputStream(byteMap.get("a1"))); cache.store("a2", new ByteArrayInputStream(byteMap.get("a2"))); cache.store("a3", new ByteArrayInputStream(byteMap.get("a3"))); - assertEquals(new ByteArrayInputStream(byteMap.get("a1")), - cache.getIfStored("a1")); - assertEquals(new ByteArrayInputStream(byteMap.get("a2")), - cache.getIfStored("a2")); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); + InputStream result = cache.getIfStored("a1"); + assertEquals(new ByteArrayInputStream(byteMap.get("a1")), result); + IOUtils.closeQuietly(result); + result = cache.getIfStored("a2"); + assertEquals(new ByteArrayInputStream(byteMap.get("a2")), result); + IOUtils.closeQuietly(result); + result = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result); + IOUtils.closeQuietly(result); + data = new byte[90]; random.nextBytes(data); byteMap.put("a4", data); @@ -174,18 +192,30 @@ // storing a4 should purge cache cache.store("a4", new ByteArrayInputStream(byteMap.get("a4"))); Thread.sleep(1000); - assertNull("a1 should be null", cache.getIfStored("a1")); - assertNull("a2 should be null", cache.getIfStored("a2")); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); - assertEquals(new ByteArrayInputStream(byteMap.get("a4")), - cache.getIfStored("a4")); + + result = cache.getIfStored("a1"); + assertNull("a1 should be null", result); + IOUtils.closeQuietly(result); + + result = cache.getIfStored("a2"); + assertNull("a2 should be null", result); + IOUtils.closeQuietly(result); + + result = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result); + IOUtils.closeQuietly(result); + + result = cache.getIfStored("a4"); + assertEquals(new ByteArrayInputStream(byteMap.get("a4")), result); + IOUtils.closeQuietly(result); + data = new byte[100]; random.nextBytes(data); byteMap.put("a5", data); cache.store("a5", new ByteArrayInputStream(byteMap.get("a5"))); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); + result = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), result); + IOUtils.closeQuietly(result); } catch (Exception e) { LOG.error("error:", e); fail(); @@ -201,8 +231,8 @@ AsyncUploadCache pendingFiles = new AsyncUploadCache(); pendingFiles.init(tempDirPath, cacheDirPath, 100); pendingFiles.reset(); - LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, 0.95, - 0.70, pendingFiles); + LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 400, + 0.95, 0.70, pendingFiles); Random random = new Random(12345); byte[] data = new byte[125]; Map byteMap = new HashMap(); @@ -245,12 +275,15 @@ assertTrue("should be able to add to pending upload", result.canAsyncUpload()); - assertEquals(new ByteArrayInputStream(byteMap.get("a1")), - cache.getIfStored("a1")); - assertEquals(new ByteArrayInputStream(byteMap.get("a2")), - cache.getIfStored("a2")); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); + InputStream inp = cache.getIfStored("a1"); + assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp); + IOUtils.closeQuietly(inp); + inp = cache.getIfStored("a2"); + assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp); + IOUtils.closeQuietly(inp); + inp = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp); + IOUtils.closeQuietly(inp); data = new byte[90]; random.nextBytes(data); @@ -266,13 +299,17 @@ result.canAsyncUpload()); Thread.sleep(1000); - assertEquals(new ByteArrayInputStream(byteMap.get("a1")), - cache.getIfStored("a1")); - assertEquals(new ByteArrayInputStream(byteMap.get("a2")), - cache.getIfStored("a2")); - assertEquals(new ByteArrayInputStream(byteMap.get("a3")), - cache.getIfStored("a3")); - assertNull("a4 should be null", cache.getIfStored("a4")); + inp = cache.getIfStored("a1"); + assertEquals(new ByteArrayInputStream(byteMap.get("a1")), inp); + IOUtils.closeQuietly(inp); + inp = cache.getIfStored("a2"); + assertEquals(new ByteArrayInputStream(byteMap.get("a2")), inp); + IOUtils.closeQuietly(inp); + inp = cache.getIfStored("a3"); + assertEquals(new ByteArrayInputStream(byteMap.get("a3")), inp); + IOUtils.closeQuietly(inp); + inp = cache.getIfStored("a4"); + assertNull("a4 should be null", inp); } catch (Exception e) { LOG.error("error:", e); fail(); @@ -288,8 +325,8 @@ AsyncUploadCache pendingFiles = new AsyncUploadCache(); pendingFiles.init(tempDirPath, cacheDirPath, 100); pendingFiles.reset(); - LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, - 0.95, 0.70, pendingFiles); + LocalCache cache = new LocalCache(cacheDirPath, tempDirPath, + 10000000, 0.95, 0.70, pendingFiles); Random random = new Random(12345); int fileUploads = 1000; Map byteMap = new HashMap( @@ -303,11 +340,11 @@ cache.store(key, new ByteArrayInputStream(byteMap.get(key))); } cache.close(); - + ExecutorService executor = Executors.newFixedThreadPool(10, new NamedThreadFactory("localcache-store-worker")); - cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95, 0.70, - pendingFiles); + cache = new LocalCache(cacheDirPath, tempDirPath, 10000000, 0.95, + 0.70, pendingFiles); executor.execute(new StoreWorker(cache, byteMap)); executor.shutdown(); while (!executor.awaitTermination(15, TimeUnit.SECONDS)) { @@ -318,7 +355,6 @@ } } - private class StoreWorker implements Runnable { Map byteMap; @@ -345,11 +381,12 @@ } } } + /** * Assert two inputstream */ protected void assertEquals(InputStream a, InputStream b) - throws IOException { + throws IOException { while (true) { int ai = a.read(); int bi = b.read(); @@ -358,6 +395,8 @@ break; } } + IOUtils.closeQuietly(a); + IOUtils.closeQuietly(b); } }