Index: src/java/org/apache/solr/common/util/ConcurrentLRUCache.java =================================================================== --- src/java/org/apache/solr/common/util/ConcurrentLRUCache.java Tue Oct 28 17:36:53 IST 2008 +++ src/java/org/apache/solr/common/util/ConcurrentLRUCache.java Tue Oct 28 17:36:53 IST 2008 @@ -0,0 +1,287 @@ +package org.apache.solr.common.util; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @version $Id$ + * @since solr 1.4 + */ +public class ConcurrentLRUCache { + + private Map map; + private final int upperWaterMark, lowerWaterMark; + private boolean stop = false; + private final ReentrantLock markAndSweepLock = new ReentrantLock(true); + private final boolean newThreadForCleanup; + private volatile boolean islive = true; + private final Stats stats = new Stats(); + private final int acceptableWaterMark; + + public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup, final int delay) { + if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0"); + if (lowerWaterMark >= upperWaterMark) + throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark"); + map = new ConcurrentHashMap(initialSize); + newThreadForCleanup = runNewThreadForCleanup; + this.upperWaterMark = upperWaterMark; + this.lowerWaterMark = lowerWaterMark; + this.acceptableWaterMark = acceptableWatermark; + if (runCleanupThread) { + new Thread() { + public void run() { + while (true) { + if (stop) break; + try { + Thread.sleep(delay * 1000); + } catch (InterruptedException e) {/*no op*/ } + markAndSweep(); + } + } + }.start(); + } + } + + public void setAlive(boolean live) { + islive = live; + } + + public Object get(Object key) { + CacheEntry e = map.get(key); + if (e == null) { + if (islive) stats.missCounter.incrementAndGet(); + return null; + } + if (islive) e.lastAccessed = stats.accessCounter.incrementAndGet(); + return e.value; + } + + public Object remove(Object key) { + CacheEntry cacheEntry = map.remove(key); + if (cacheEntry != null) { + stats.size.decrementAndGet(); + return cacheEntry.value; + } + return null; + } + + public Object put(Object key, Object val) { + if (val == null) return null; + CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet()); + CacheEntry oldCacheEntry = map.put(key, e); + stats.size.incrementAndGet(); + if (islive) { + stats.putCounter.incrementAndGet(); + } else { + stats.nonLivePutCounter.incrementAndGet(); + } + if (stats.size.get() > upperWaterMark) { + if (newThreadForCleanup) { + if (!markAndSweepLock.isLocked()) { + new Thread() { + public void run() { + markAndSweep(); + } + }.start(); + } + } else { + markAndSweep(); + } + } + return oldCacheEntry == null ? null : oldCacheEntry.value; + } + + private void markAndSweep() { + if (!markAndSweepLock.tryLock()) return; + try { + int size = stats.size.get(); + long currentLatestAccessed = stats.accessCounter.get(); + int itemsToBeRemoved = size - lowerWaterMark; + int itemsRemoved = 0; + if (itemsToBeRemoved < 1) return; + // currentLatestAccessed is the counter value of the item accessed most recently + // therefore remove all items whose last accessed counter is less than (currentLatestAccessed - lowerWaterMark) + long removeOlderThan = currentLatestAccessed - lowerWaterMark; + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue().lastAccessed <= removeOlderThan && itemsRemoved < itemsToBeRemoved) { + evictEntry(entry.getKey()); + } + } + + // Since the removal of items in the above loop depends on the value of the lastAccessed variable, + // between the time we recorded the number of items to be removed and the actual removal process, + // some items may graduate above the removeOlderThan value and escape eviction. + // Therefore, we again check if the size less than acceptableWaterMark, if not we remove items forcefully + // using a method which does not depend on the value of lastAccessed but can be more costly to run + + size = stats.size.get(); + // In the first attempt, try to use a simple algorithm to remove old entries + // If the size of the cache is <= acceptableWatermark then return + if (size <= acceptableWaterMark) return; + // Remove items until size becomes lower than acceptableWaterMark + itemsToBeRemoved = size - acceptableWaterMark; + TreeSet tree = new TreeSet(); + // This loop may remove a few newer items because we try to forcefully fill a + // bucket of fixed size and remove them even if they have become newer in the meantime + // The caveat is that this may lead to more cache misses because we may have removed + // an item which was used very recently (against the philosophy of LRU) + for (Map.Entry entry : map.entrySet()) { + CacheEntry v = entry.getValue(); + v.lastAccessedCopy = v.lastAccessed; + if (tree.size() < itemsToBeRemoved) { + tree.add(v); + } else { + if (v.lastAccessedCopy < tree.first().lastAccessedCopy) { + tree.remove(tree.first()); + tree.add(v); + } + } + } + for (CacheEntry sortCacheEntry : tree) + evictEntry(sortCacheEntry.key); + } finally { + markAndSweepLock.unlock(); + } + } + + + private void evictEntry(Object key) { + Object o = map.remove(key); + if (o == null) return; + stats.size.decrementAndGet(); + stats.evictionCounter++; + } + + + public Map getLatestAccessedItems(long n) { + markAndSweepLock.lock(); + Map result = new LinkedHashMap(); + TreeSet tree = new TreeSet(); + try { + for (Map.Entry entry : map.entrySet()) { + CacheEntry ce = entry.getValue(); + ce.lastAccessedCopy = ce.lastAccessed; + if (tree.size() < n) { + tree.add(ce); + } else { + if (ce.lastAccessedCopy > tree.last().lastAccessedCopy) { + tree.remove(tree.last()); + tree.add(entry.getValue()); + } + } + } + } finally { + markAndSweepLock.unlock(); + } + for (CacheEntry e : tree) { + result.put(e.key, e.value); + } + return result; + } + + public int size() { + return stats.size.get(); + } + + public void clear() { + map.clear(); + } + + public Map getMap() { + return map; + } + + private static class CacheEntry implements Comparable { + Object key, value; + volatile long lastAccessed = 0; + long lastAccessedCopy = 0; + + + public CacheEntry(Object key, Object value, long lastAccessed) { + this.key = key; + this.value = value; + this.lastAccessed = lastAccessed; + } + + public void setLastAccessed(long lastAccessed) { + this.lastAccessed = lastAccessed; + } + + public int compareTo(CacheEntry that) { + if (this.lastAccessedCopy == that.lastAccessedCopy) return 0; + return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1; + } + + public int hashCode() { + return value.hashCode(); + } + + public boolean equals(Object obj) { + return value.equals(obj); + } + + public String toString() { + return "key: " + key + " value: " + value + " lastAccessed:" + lastAccessed; + } + } + + + public void destroy() { + stop = true; + if (map != null) { + map.clear(); + map = null; + } + } + + public Stats getStats() { + return stats; + } + + protected void finalize() throws Throwable { + destroy(); + super.finalize(); + } + + public static class Stats { + private final AtomicLong accessCounter = new AtomicLong(0), + putCounter = new AtomicLong(0), + nonLivePutCounter = new AtomicLong(0), + missCounter = new AtomicLong(); + private final AtomicInteger size = new AtomicInteger(); + private long evictionCounter = 0; + + public long getCumulativeLookups() { + return (accessCounter.get() - putCounter.get() - nonLivePutCounter.get()) + missCounter.get(); + } + + public long getCumulativeHits() { + return accessCounter.get() - putCounter.get() - nonLivePutCounter.get(); + } + + public long getCumulativePuts() { + return putCounter.get(); + } + + public long getCumulativeEvictions() { + return evictionCounter; + } + + public int getCurrentSize() { + return size.get(); + } + + public long getCumulativeNonLivePuts() { + return nonLivePutCounter.get(); + } + + public long getCumulativeMisses() { + return missCounter.get(); + } + } +} Index: src/java/org/apache/solr/search/FastLRUCache.java =================================================================== --- src/java/org/apache/solr/search/FastLRUCache.java Tue Oct 28 17:37:33 IST 2008 +++ src/java/org/apache/solr/search/FastLRUCache.java Tue Oct 28 17:37:33 IST 2008 @@ -0,0 +1,226 @@ +package org.apache.solr.search; + +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.ConcurrentLRUCache; +import org.apache.solr.core.SolrCore; + +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; + +/** + * SolrCache based on ConcurrentLRUCache implementation. + *

+ * Created with reducing contention and synchronization to utilize multiple CPU cores more effectively. + * + * @version $Id$ + * @since solr 1.4 + */ +public class FastLRUCache implements SolrCache { + + private List cumulativeStats; + + private long warmupTime = 0; + + private String name; + private int autowarmCount; + private State state; + private CacheRegenerator regenerator; + private String description = "Concurrent LRU Cache"; + private ConcurrentLRUCache cache; + + public Object init(Map args, Object persistence, CacheRegenerator regenerator) { + state = State.CREATED; + this.regenerator = regenerator; + name = (String) args.get("name"); + String str = (String) args.get("size"); + final int limit = str == null ? 1024 : Integer.parseInt(str); + int minLimit; + str = (String) args.get("minSize"); + if (str == null) { + minLimit = (int) (limit * 0.9); + } else { + minLimit = Integer.parseInt(str); + } + int acceptableLimit; + str = (String) args.get("acceptableSize"); + if (str == null) { + acceptableLimit = (int) (limit * 0.95); + } else { + acceptableLimit = Integer.parseInt(str); + } + str = (String) args.get("initialSize"); + final int initialSize = str == null ? 1024 : Integer.parseInt(str); + str = (String) args.get("autowarmCount"); + autowarmCount = str == null ? 0 : Integer.parseInt(str); + + description = "Concurrent LRU Cache(maxSize=" + limit + ", initialSize=" + initialSize; + if (autowarmCount > 0) { + description += ", autowarmCount=" + autowarmCount + + ", regenerator=" + regenerator; + } + description += ')'; + + cache = new ConcurrentLRUCache(limit, minLimit, acceptableLimit, initialSize, false, false, -1); + cache.setAlive(false); + + if (persistence == null) { + // must be the first time a cache of this type is being created + // Use a CopyOnWriteArrayList since puts are very rare and iteration may a frequent operation + // because it is used in getStatistics() + persistence = new CopyOnWriteArrayList(); + } + + cumulativeStats = (List) persistence; + cumulativeStats.add(cache.getStats()); + return cumulativeStats; + } + + public String name() { + return name; + } + + public int size() { + return cache.size(); + + } + + public Object put(Object key, Object value) { + return cache.put(key, value); + } + + public Object get(Object key) { + return cache.get(key); + + } + + public void clear() { + cache.clear(); + } + + public void setState(State state) { + this.state = state; + cache.setAlive(state == State.LIVE); + } + + public State getState() { + return state; + } + + public void warm(SolrIndexSearcher searcher, SolrCache old) throws IOException { + if (regenerator == null) return; + long warmingStartTime = System.currentTimeMillis(); + FastLRUCache other = (FastLRUCache) old; + // warm entries + if (autowarmCount != 0) { + int sz = other.size(); + if (autowarmCount != -1) sz = Math.min(sz, autowarmCount); + Map items = other.cache.getLatestAccessedItems(sz); + Map.Entry[] itemsArr = new Map.Entry[items.size()]; + int counter = 0; + for (Object mapEntry : items.entrySet()) { + itemsArr[counter++] = (Map.Entry) mapEntry; + } + for (int i = itemsArr.length - 1; i >= 0; i--) { + try { + boolean continueRegen = regenerator.regenerateItem(searcher, + this, old, itemsArr[i].getKey(), itemsArr[i].getValue()); + if (!continueRegen) break; + } + catch (Throwable e) { + SolrException.log(log, "Error during auto-warming of key:" + itemsArr[i].getKey(), e); + } + } + } + warmupTime = System.currentTimeMillis() - warmingStartTime; + } + + + public void close() { + } + + //////////////////////// SolrInfoMBeans methods ////////////////////// + public String getName() { + return FastLRUCache.class.getName(); + } + + public String getVersion() { + return SolrCore.version; + } + + public String getDescription() { + return description; + } + + public Category getCategory() { + return Category.CACHE; + } + + public String getSourceId() { + return "$Id$"; + } + + public String getSource() { + return "$URL$"; + } + + public URL[] getDocs() { + return null; + } + + // returns a ratio, not a percent. + private static String calcHitRatio(long lookups, long hits) { + if (lookups == 0) return "0.00"; + if (lookups == hits) return "1.00"; + int hundredths = (int) (hits * 100 / lookups); // rounded down + if (hundredths < 10) return "0.0" + hundredths; + return "0." + hundredths; + } + + public NamedList getStatistics() { + NamedList lst = new SimpleOrderedMap(); + ConcurrentLRUCache.Stats stats = cache.getStats(); + long lookups = stats.getCumulativeLookups(); + long hits = stats.getCumulativeHits(); + long inserts = stats.getCumulativePuts(); + long evictions = stats.getCumulativeEvictions(); + long size = stats.getCurrentSize(); + + lst.add("lookups", lookups); + lst.add("hits", hits); + lst.add("hitratio", calcHitRatio(lookups, hits)); + lst.add("inserts", inserts); + lst.add("evictions", evictions); + lst.add("size", size); + + lst.add("warmupTime", warmupTime); + + long clookups = 0; + long chits = 0; + long cinserts = 0; + long cevictions = 0; + // NOTE: It is safe to iterate on a CopyOnWriteArrayList + for (ConcurrentLRUCache.Stats statistiscs : cumulativeStats) { + clookups += statistiscs.getCumulativeLookups(); + chits += statistiscs.getCumulativeHits(); + cinserts += statistiscs.getCumulativePuts(); + cevictions += statistiscs.getCumulativeEvictions(); + } + lst.add("cumulative_lookups", clookups); + lst.add("cumulative_hits", chits); + lst.add("cumulative_hitratio", calcHitRatio(clookups, chits)); + lst.add("cumulative_inserts", cinserts); + lst.add("cumulative_evictions", cevictions); + + return lst; + } + + public String toString() { + return name + getStatistics().toString(); + } +} + Index: src/test/org/apache/solr/search/TestFastLRUCache.java =================================================================== --- src/test/org/apache/solr/search/TestFastLRUCache.java Tue Oct 28 16:14:44 IST 2008 +++ src/test/org/apache/solr/search/TestFastLRUCache.java Tue Oct 28 16:14:44 IST 2008 @@ -0,0 +1,65 @@ +package org.apache.solr.search; + +import junit.framework.TestCase; + +import java.util.Map; +import java.util.HashMap; +import java.io.IOException; + +import org.apache.solr.common.util.NamedList; + + +/** + * Test for FastLRUCache + * + * @see org.apache.solr.search.FastLRUCache + * @since solr 1.4 + * @version $Id$ + */ +public class TestFastLRUCache extends TestCase { + public void testSimple() throws IOException { + FastLRUCache sc = new FastLRUCache(); + Map l = new HashMap(); + l.put("size","100"); + l.put("initialSize","10"); + l.put("autowarmCount","25"); + CacheRegenerator cr = new CacheRegenerator() { + public boolean regenerateItem(SolrIndexSearcher newSearcher, SolrCache newCache, + SolrCache oldCache, Object oldKey, Object oldVal) throws IOException { + newCache.put(oldKey, oldVal); + return true; + } + }; + Object o = sc.init(l,null,cr); + sc.setState(SolrCache.State.LIVE); + for(int i=0;i<101;i++){ + sc.put(i+1,""+(i+1)); + } + assertEquals("25",sc.get(25)); + assertEquals(null,sc.get(110)); + NamedList nl = sc.getStatistics(); + assertEquals(2L ,nl.get("lookups")); + assertEquals(1L ,nl.get("hits")); + assertEquals(101L ,nl.get("inserts")); + assertEquals(11L ,nl.get("evictions")); + + FastLRUCache scNew = new FastLRUCache(); + scNew.init(l,o,cr); + scNew.warm(null,sc); + scNew.setState(SolrCache.State.LIVE); + scNew.put(103,"103"); + assertEquals("90",scNew.get(90)); + assertEquals(null,scNew.get(50)); + nl = scNew.getStatistics(); + assertEquals(2L ,nl.get("lookups")); + assertEquals(1L ,nl.get("hits")); + assertEquals(1L ,nl.get("inserts")); + assertEquals(0L ,nl.get("evictions")); + + assertEquals(4L ,nl.get("cumulative_lookups")); + assertEquals(2L ,nl.get("cumulative_hits")); + assertEquals(102L ,nl.get("cumulative_inserts")); + assertEquals(11L ,nl.get("cumulative_evictions")); + } + +}