Index: src/java/org/apache/lucene/util/cache/LRUBench.java =================================================================== --- src/java/org/apache/lucene/util/cache/LRUBench.java (revision 0) +++ src/java/org/apache/lucene/util/cache/LRUBench.java (revision 0) @@ -0,0 +1,188 @@ +package org.apache.lucene.util.cache; + +import org.apache.lucene.index.*; +import java.util.*; +import java.text.NumberFormat; + +import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap; + +public final class LRUBench { + + public class LRUThread extends Thread { + private final String[] terms; + private final Cache c; + private final Map m; + private final long endTime; + private final int startIDX; + + public LRUThread(Cache c, + Map m, + String[] terms, int startIDX, long endTime) { + this.c = c; + this.m = m; + this.terms = terms; + this.endTime = endTime; + this.startIDX = startIDX; + } + + public void run() { + long count = startIDX; + long miss = 0; + long hit = 0; + final int limit = terms.length; + final Object o = new Object(); + int maxSize = 0; + + /* + final ConcurrentLRUCache cc; + if (c instanceof ConcurrentLRUCache) { + cc = (ConcurrentLRUCache) c; + } else { + cc = null; + } + */ + + if (c != null) { + while(true) { + final String term = terms[(int) ((count/2) % limit)]; + if (c.get(term) == null) { + c.put(term, o); + miss++; + } else { + hit++; + } + /* + if (cc != null) { + int sz = cc.size(); + if (sz > maxSize) { + maxSize = sz; + } + } + */ + if ((++count % 10000) == 0) { + if (System.currentTimeMillis() >= endTime) { + break; + } + } + } + } else { + while(true) { + final String term = terms[(int) ((count/2) % limit)]; + if (m.get(term) == null) { + m.put(term, o); + miss++; + } else { + hit++; + } + if ((++count % 10000) == 0) { + if (System.currentTimeMillis() >= endTime) { + break; + } + } + } + } + + addResults(miss, hit, maxSize); + } + } + + private long totHit, totMiss; + private int totMaxSize; + + synchronized void addResults(long miss, long hit, int maxSize) { + // System.out.println("add miss=" + miss + " hit=" + hit); + totMiss += miss; + totHit += hit; + if (maxSize > totMaxSize) { + totMaxSize = maxSize; + } + } + + private LRUBench(String label, + String[][] termsByThread, + Cache c, Map m, int numThreads, double runTimeSec) throws Exception { + final Thread[] threads = new Thread[numThreads]; + final long endTime = System.currentTimeMillis()+((long) runTimeSec*1000); + for(int i=0;i(cacheSize)), + null, + numThreads, runTimeSec); + + new LRUBench("DoubleBarreLRU", + termsByThread, + new DoubleBarrelLRUCache(cacheSize/2), + null, + numThreads, runTimeSec); + + new LRUBench("ConcurrentLRU", + termsByThread, + new ConcurrentLRUCache(cacheSize, (3*cacheSize/4), (5*cacheSize)/6, 0), + null, + numThreads, runTimeSec); + + new LRUBench("ConcurrentLinkedHashMap", + termsByThread, + null, + ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU, + cacheSize), + numThreads, runTimeSec); + } + } +} Property changes on: src/java/org/apache/lucene/util/cache/LRUBench.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/java/org/apache/lucene/util/cache/ConcurrentLRUCache.java =================================================================== --- src/java/org/apache/lucene/util/cache/ConcurrentLRUCache.java (revision 0) +++ src/java/org/apache/lucene/util/cache/ConcurrentLRUCache.java (revision 0) @@ -0,0 +1,398 @@ +package org.apache.lucene.util.cache; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.lucene.util.PriorityQueue; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce + * contention and synchronization overhead to utilize multiple CPU cores more effectively. + *

+ * Note that the implementation does not follow a true LRU (least-recently-used) eviction + * strategy. Instead it strives to remove least recently used items but when the initial + * cleanup does not remove enough items to reach the 'acceptableWaterMark' limit, it can + * remove more items forcefully regardless of access order. + *

+ */ +public class ConcurrentLRUCache extends Cache { + private final ConcurrentHashMap> map; + private final int upperWaterMark, lowerWaterMark; + private final ReentrantLock markAndSweepLock = new ReentrantLock(true); + private boolean isCleaning = false; // not volatile... piggybacked on other volatile/atomic vars + private final int acceptableWaterMark; + private long oldestEntry = 0; // not volatile, only accessed in the cleaning method + private final AtomicInteger size = new AtomicInteger(); + private final AtomicLong accessCounter = new AtomicLong(); + + public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark, int initialSize) { + if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0"); + if (lowerWaterMark >= upperWaterMark) + throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark"); + map = new ConcurrentHashMap>(initialSize); + this.upperWaterMark = upperWaterMark; + this.lowerWaterMark = lowerWaterMark; + this.acceptableWaterMark = acceptableWatermark; + } + + + @Override + public V get(Object key) { + CacheEntry e = map.get(key); + if (e == null) { + return null; + } + e.lastAccessed = accessCounter.incrementAndGet(); + return e.value; + } + + public V remove(Object key) { + CacheEntry cacheEntry = map.remove(key); + if (cacheEntry != null) { + size.decrementAndGet(); + return cacheEntry.value; + } + return null; + } + + @Override + public void put(K key, V val) { + CacheEntry e = new CacheEntry(key, val, accessCounter.incrementAndGet()); + CacheEntry oldCacheEntry = map.put(key, e); + int currentSize; + if (oldCacheEntry == null) { + currentSize = size.incrementAndGet(); + } else { + currentSize = size.get(); + } + + // Check if we need to clear out old entries from the cache. + // isCleaning variable is checked instead of markAndSweepLock.isLocked() + // for performance because every put invokation will check until + // the size is back to an acceptable level. + // + // There is a race between the check and the call to markAndSweep, but + // it's unimportant because markAndSweep actually aquires the lock or returns if it can't. + // + // Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads + // in this method. + if (currentSize > upperWaterMark && !isCleaning) { + markAndSweep(); + } + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public void close() { + } + + /** + * Removes items from the cache to bring the size down + * to an acceptable value ('acceptableWaterMark'). + *

+ * It is done in two stages. In the first stage, least recently used items are evicted. + * If, after the first stage, the cache size is still greater than 'acceptableSize' + * config parameter, the second stage takes over. + *

+ * The second stage is more intensive and tries to bring down the cache size + * to the 'lowerWaterMark' config parameter. + */ + private void markAndSweep() { + // if we want to keep at least 1000 entries, then timestamps of + // current through current-1000 are guaranteed not to be the oldest (but that does + // not mean there are 1000 entries in that group... it's acutally anywhere between + // 1 and 1000). + // Also, if we want to remove 500 entries, then + // oldestEntry through oldestEntry+500 are guaranteed to be + // removed (however many there are there). + + if (!markAndSweepLock.tryLock()) return; + try { + long oldestEntry = this.oldestEntry; + isCleaning = true; + this.oldestEntry = oldestEntry; // volatile write to make isCleaning visible + + long timeCurrent = accessCounter.get(); + int sz = size.get(); + + int numRemoved = 0; + int numKept = 0; + long newestEntry = timeCurrent; + long newNewestEntry = -1; + long newOldestEntry = Long.MAX_VALUE; + + int wantToKeep = lowerWaterMark; + int wantToRemove = sz - lowerWaterMark; + + @SuppressWarnings("unchecked") CacheEntry[] eset = new CacheEntry[sz]; + int eSize = 0; + + // System.out.println("newestEntry="+newestEntry + " oldestEntry="+oldestEntry); + // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved)); + + for (CacheEntry ce : map.values()) { + // set lastAccessedCopy to avoid more volatile reads + ce.lastAccessedCopy = ce.lastAccessed; + long thisEntry = ce.lastAccessedCopy; + + // since the wantToKeep group is likely to be bigger than wantToRemove, check it first + if (thisEntry > newestEntry - wantToKeep) { + // this entry is guaranteed not to be in the bottom + // group, so do nothing. + numKept++; + newOldestEntry = Math.min(thisEntry, newOldestEntry); + } else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group? + // this entry is guaranteed to be in the bottom group + // so immediately remove it from the map. + evictEntry(ce.key); + numRemoved++; + } else { + // This entry *could* be in the bottom group. + // Collect these entries to avoid another full pass... this is wasted + // effort if enough entries are normally removed in this first pass. + // An alternate impl could make a full second pass. + if (eSize < eset.length-1) { + eset[eSize++] = ce; + newNewestEntry = Math.max(thisEntry, newNewestEntry); + newOldestEntry = Math.min(thisEntry, newOldestEntry); + } + } + } + + // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved)); + // TODO: allow this to be customized in the constructor? + int numPasses=1; // maximum number of linear passes over the data + + // if we didn't remove enough entries, then make more passes + // over the values we collected, with updated min and max values. + while (sz - numRemoved > acceptableWaterMark && --numPasses>=0) { + + oldestEntry = newOldestEntry == Long.MAX_VALUE ? oldestEntry : newOldestEntry; + newOldestEntry = Long.MAX_VALUE; + newestEntry = newNewestEntry; + newNewestEntry = -1; + wantToKeep = lowerWaterMark - numKept; + wantToRemove = sz - lowerWaterMark - numRemoved; + + // iterate backward to make it easy to remove items. + for (int i=eSize-1; i>=0; i--) { + CacheEntry ce = eset[i]; + long thisEntry = ce.lastAccessedCopy; + + if (thisEntry > newestEntry - wantToKeep) { + // this entry is guaranteed not to be in the bottom + // group, so do nothing but remove it from the eset. + numKept++; + // remove the entry by moving the last element to it's position + eset[i] = eset[eSize-1]; + eSize--; + + newOldestEntry = Math.min(thisEntry, newOldestEntry); + + } else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group? + + // this entry is guaranteed to be in the bottom group + // so immediately remove it from the map. + evictEntry(ce.key); + numRemoved++; + + // remove the entry by moving the last element to it's position + eset[i] = eset[eSize-1]; + eSize--; + } else { + // This entry *could* be in the bottom group, so keep it in the eset, + // and update the stats. + newNewestEntry = Math.max(thisEntry, newNewestEntry); + newOldestEntry = Math.min(thisEntry, newOldestEntry); + } + } + // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved)); + } + + + + // if we still didn't remove enough entries, then make another pass while + // inserting into a priority queue + if (sz - numRemoved > acceptableWaterMark) { + + oldestEntry = newOldestEntry == Long.MAX_VALUE ? oldestEntry : newOldestEntry; + newOldestEntry = Long.MAX_VALUE; + newestEntry = newNewestEntry; + newNewestEntry = -1; + wantToKeep = lowerWaterMark - numKept; + wantToRemove = sz - lowerWaterMark - numRemoved; + + PQueue queue = new PQueue(wantToRemove); + + for (int i=eSize-1; i>=0; i--) { + CacheEntry ce = eset[i]; + long thisEntry = ce.lastAccessedCopy; + + if (thisEntry > newestEntry - wantToKeep) { + // this entry is guaranteed not to be in the bottom + // group, so do nothing but remove it from the eset. + numKept++; + // removal not necessary on last pass. + // eset[i] = eset[eSize-1]; + // eSize--; + + newOldestEntry = Math.min(thisEntry, newOldestEntry); + + } else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group? + // this entry is guaranteed to be in the bottom group + // so immediately remove it. + evictEntry(ce.key); + numRemoved++; + + // removal not necessary on last pass. + // eset[i] = eset[eSize-1]; + // eSize--; + } else { + // This entry *could* be in the bottom group. + // add it to the priority queue + + // everything in the priority queue will be removed, so keep track of + // the lowest value that ever comes back out of the queue. + + // first reduce the size of the priority queue to account for + // the number of items we have already removed while executing + // this loop so far. + queue.setMaxSize(sz - lowerWaterMark - numRemoved); + while (queue.size() > queue.getMaxSize() && queue.size() > 0) { + CacheEntry otherEntry = queue.pop(); + newOldestEntry = Math.min(otherEntry.lastAccessedCopy, newOldestEntry); + } + if (queue.getMaxSize() <= 0) break; + + final CacheEntry o = queue.insertWithOverflow(ce); + if (o != null) { + newOldestEntry = Math.min(o.lastAccessedCopy, newOldestEntry); + } + } + } + + // Now delete everything in the priority queue. + // avoid using pop() since order doesn't matter anymore + for (CacheEntry ce : queue) { + if (ce==null) continue; + evictEntry(ce.key); + numRemoved++; + } + + // System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " initialQueueSize="+ wantToRemove + " finalQueueSize=" + queue.size() + " sz-numRemoved=" + (sz-numRemoved)); + } + + oldestEntry = newOldestEntry == Long.MAX_VALUE ? oldestEntry : newOldestEntry; + this.oldestEntry = oldestEntry; + } finally { + isCleaning = false; // set before markAndSweep.unlock() for visibility + markAndSweepLock.unlock(); + } + } + + + private static class PQueue extends PriorityQueue> implements Iterable> { + + PQueue(int maxSz) { + super.initialize(maxSz); + } + + public Iterator> iterator() { + return Arrays.asList(heap).iterator(); + } + + void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + + int getMaxSize() { + return maxSize; + } + + @Override + protected boolean lessThan(CacheEntry a, CacheEntry b) { + // reverse the parameter order so that the queue keeps the oldest items + return b.lastAccessedCopy < a.lastAccessedCopy; + } + + } + + + private void evictEntry(Object key) { + CacheEntry o = map.remove(key); + if (o == null) return; + size.decrementAndGet(); + } + + public int size() { + return size.get(); + } + + public void clear() { + map.clear(); + } + + public Map> getMap() { + return map; + } + + private static class CacheEntry implements Comparable> { + K key; + V value; + volatile long lastAccessed = 0; + long lastAccessedCopy = 0; + + + public CacheEntry(K key, V value, long lastAccessed) { + this.key = key; + this.value = value; + this.lastAccessed = lastAccessed; + } + + public void setLastAccessed(long lastAccessed) { + this.lastAccessed = lastAccessed; + } + + public int compareTo(CacheEntry that) { + if (this.lastAccessedCopy == that.lastAccessedCopy) return 0; + return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1; + } + + public int hashCode() { + return value.hashCode(); + } + + public boolean equals(Object obj) { + return value.equals(obj); + } + + public String toString() { + return "key: " + key + " value: " + value + " lastAccessed:" + lastAccessed; + } + } +} Property changes on: src/java/org/apache/lucene/util/cache/ConcurrentLRUCache.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/java/org/apache/lucene/util/cache/DoubleBarrelLRUCache.java =================================================================== --- src/java/org/apache/lucene/util/cache/DoubleBarrelLRUCache.java (revision 0) +++ src/java/org/apache/lucene/util/cache/DoubleBarrelLRUCache.java (revision 0) @@ -0,0 +1,88 @@ +package org.apache.lucene.util.cache; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.Map; + +public class DoubleBarrelLRUCache extends Cache { + private final Map cache1; + private final Map cache2; + private final AtomicInteger putCount = new AtomicInteger(); + private final AtomicInteger swapCount = new AtomicInteger(); + private final int maxSize; + + public DoubleBarrelLRUCache(int maxSize) { + this.maxSize = maxSize; + cache1 = new ConcurrentHashMap(); + cache2 = new ConcurrentHashMap(); + } + + private final boolean swapped() { + return (swapCount.get()&1) != 0; + } + + public boolean containsKey(Object k) { + return false; + } + + public void close() { + } + + public V get(Object key) { + final Map primary; + final Map secondary; + if (swapped()) { + primary = cache2; + secondary = cache1; + } else { + primary = cache1; + secondary = cache2; + } + + V result = primary.get(key); + if (result == null) { + result = secondary.get(key); + if (result != null) { + // Carry forward + put((K) key, result); + } + } + return result; + } + + public void put(K key, V value) { + final Map primary; + final Map secondary; + if (swapped()) { + primary = cache2; + secondary = cache1; + } else { + primary = cache1; + secondary = cache2; + } + primary.put(key, value); + + if (putCount.getAndIncrement() % maxSize == maxSize-1) { + // swap + secondary.clear(); + swapCount.getAndIncrement(); + } + } +} Property changes on: src/java/org/apache/lucene/util/cache/DoubleBarrelLRUCache.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/java/org/apache/lucene/util/PriorityQueue.java =================================================================== --- src/java/org/apache/lucene/util/PriorityQueue.java (revision 882507) +++ src/java/org/apache/lucene/util/PriorityQueue.java (working copy) @@ -27,7 +27,7 @@ */ public abstract class PriorityQueue { private int size; - private int maxSize; + protected int maxSize; protected T[] heap; /** Determines the ordering of objects in this priority queue. Subclasses