Index: LocalCache.java =================================================================== --- LocalCache.java (revision 1631345) +++ LocalCache.java (working copy) @@ -165,17 +165,24 @@ transFile.getAbsolutePath()); } transFile = null; - LOG.debug("file [{}] added to local cache.", fileName); + LOG.debug( + "file [{}] doesn't exists. adding to local cache using inputstream.", + fileName); cache.put(fileName, f.length()); } else { + LOG.debug( + "file [{}] doesn't exists. returning transient file [{}].", + fileName, transFile.getAbsolutePath()); f = transFile; } } else { - // f.exists and not in purge mode f.setLastModified(System.currentTimeMillis()); + LOG.debug( + "file [{}] exists. adding to local cache using inputstream.", + fileName); cache.put(fileName, f.length()); } - cache.tryPurge(); + tryPurge(); return new LazyFileInputStream(f); } @@ -214,7 +221,8 @@ * file, if it is added to {@link LocalCache} or original file. * @throws IOException */ - public AsyncUploadCacheResult store(String fileName, File src, boolean tryForAsyncUpload) throws IOException { + public AsyncUploadCacheResult store(String fileName, File src, + boolean tryForAsyncUpload) throws IOException { fileName = fileName.replace("\\", "/"); File dest = getFile(fileName); File parent = dest.getParentFile(); @@ -223,19 +231,24 @@ result.setAsyncUpload(false); boolean destExists = false; if ((destExists = dest.exists()) - || (src.exists() && !dest.exists() && !src.equals(dest) && canAdmitFile(src.length()) + || (src.exists() && !dest.exists() && !src.equals(dest) + && canAdmitFile(src.length()) && (parent.exists() || parent.mkdirs()) && (src.renameTo(dest)))) { if (destExists) { dest.setLastModified(System.currentTimeMillis()); } + LOG.debug("file [{}] exists= [{}] and adding to local cache.", + fileName, destExists); cache.put(fileName, dest.length()); - LOG.debug("file [{}] added to local cache.", fileName); result.setFile(dest); if (tryForAsyncUpload) { result.setAsyncUpload(asyncUploadCache.add(fileName).canAsyncUpload()); } + } else { + LOG.info("file [{}] exists= [{}] not added to local cache.", + fileName, destExists); } - cache.tryPurge(); + tryPurge(); return result; } /** @@ -261,8 +274,8 @@ return null; } else { // touch entry in LRU caches - cache.put(fileName, f.length()); f.setLastModified(System.currentTimeMillis()); + cache.get(fileName); return f; } } @@ -351,8 +364,9 @@ private void deleteOldFiles() { int initialSize = toBeDeleted.size(); int count = 0; - for (String n : new ArrayList(toBeDeleted)) { - if (tryDelete(n)) { + for (String fileName : new ArrayList(toBeDeleted)) { + fileName = fileName.replace("\\", "/"); + if( cache.remove(fileName) != null) { count++; } } @@ -397,6 +411,28 @@ count = Math.min(64 * 1024, count); return count; } + + /** + * This method tries purging of local cache. It checks if local cache + * has exceeded the defined limit then it triggers purge cache job in a + * seperate thread. + */ + synchronized void tryPurge() { + if (!isInPurgeMode() + && cache.currentSizeInBytes > cache.cachePurgeTrigSize) { + setPurgeMode(true); + LOG.info( + "cache.entries = [{}], currentSizeInBytes=[{}] exceeds cachePurgeTrigSize=[{}]", + new Object[] { cache.size(), cache.currentSizeInBytes, + cache.cachePurgeTrigSize }); + new Thread(new PurgeJob()).start(); + } else { + LOG.debug( + "currentSizeInBytes=[{}],cachePurgeTrigSize=[{}], isInPurgeMode =[{}]", + new Object[] { cache.currentSizeInBytes, + cache.cachePurgeTrigSize, isInPurgeMode() }); + } + } /** * A LRU based extension {@link LinkedHashMap}. The key is file name and @@ -462,41 +498,42 @@ fileName, flength); currentSizeInBytes -= flength.longValue(); } + } else { + LOG.info("not able to remove cache entry [{}], size [{}]", key, + super.get(key)); } return flength; } @Override - public synchronized Long put(final String fileName, final Long value) { - Long oldValue = cache.get(fileName); - if (oldValue == null) { - long flength = value.longValue(); - currentSizeInBytes += flength; - return super.put(fileName.replace("\\", "/"), value); + public Long put(final String fileName, final Long value) { + if( isInPurgeMode()) { + LOG.debug("cache is purge mode: put is no-op"); + return null; } - toBeDeleted.remove(fileName); - return oldValue; + synchronized (this) { + Long oldValue = cache.get(fileName); + if (oldValue == null) { + long flength = value.longValue(); + currentSizeInBytes += flength; + return super.put(fileName.replace("\\", "/"), value); + } + toBeDeleted.remove(fileName); + return oldValue; + } } - - /** - * This method tries purging of local cache. It checks if local cache - * has exceeded the defined limit then it triggers purge cache job in a - * seperate thread. - */ - synchronized void tryPurge() { - if (currentSizeInBytes > cachePurgeTrigSize && !isInPurgeMode()) { - setPurgeMode(true); - LOG.info( - "currentSizeInBytes=[{}] exceeds cachePurgeTrigSize=[{}]", - cache.currentSizeInBytes, cache.cachePurgeTrigSize); - new Thread(new PurgeJob()).start(); - } else { - LOG.debug( - "currentSizeInBytes=[{}],cachePurgeTrigSize=[{}], isInPurgeMode =[{}]", - new Object[] { cache.currentSizeInBytes, - cache.cachePurgeTrigSize, isInPurgeMode() }); + + @Override + public Long get(Object key) { + if( isInPurgeMode()) { + LOG.debug("cache is purge mode: get is no-op"); + return null; } + synchronized (this) { + return super.get(key); + } } + /** * This method check if cache can admit file of given length. * @param length length of file. @@ -524,9 +561,9 @@ public void run() { try { synchronized (cache) { - LOG.info(" cache purge job started"); // first try to delete toBeDeleted files int initialSize = cache.size(); + LOG.info(" cache purge job started. initial cache entries = [{}]", initialSize); for (String fileName : new ArrayList(toBeDeleted)) { cache.remove(fileName); } @@ -536,12 +573,10 @@ if (entry.getKey() != null) { if (cache.currentSizeInBytes > cache.cachePurgeResize) { itr.remove(); - } else { break; } } - } LOG.info( " cache purge job completed: cleaned [{}] files and currentSizeInBytes = [{}]",