Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 890765) +++ CHANGES.txt (working copy) @@ -100,6 +100,11 @@ the FieldCache rather than waiting for the WeakHashMap to release the reference (Mike McCandless) +* LUCENE-2164: ConcurrentMergeScheduler sets thread priorities of each + merge thread so that smaller merges have higher priority. This also + changes the default max thread count in ConcurrentMergeScheduler to + 2. (Mike McCandless) + Build * LUCENE-2124: Moved the JDK-based collation support from contrib/collation @@ -188,7 +193,7 @@ (Michael Busch, Uwe Schindler) * LUCENE-2060: Changed ConcurrentMergeScheduler's default for - maxNumThreads from 3 to 1, because in practice we get the most + maxNumThreads from 3 to 2, because in practice we get the most gains from running a single merge in the background. More than one concurrent merge causes a lot of thrashing (though it's possible on SSD storage that there would be net gains). (Jason Rutherglen, Index: src/java/org/apache/lucene/index/SegmentInfos.java =================================================================== --- src/java/org/apache/lucene/index/SegmentInfos.java (revision 890765) +++ src/java/org/apache/lucene/index/SegmentInfos.java (working copy) @@ -912,4 +912,14 @@ return true; return false; } + + /** Returns sum of all segment's docCounts. Note that + * this does not include deletions */ + public int totalDocCount() { + int count = 0; + for(SegmentInfo info : this) { + count += info.docCount; + } + return count; + } } Index: src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java =================================================================== --- src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (revision 890765) +++ src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (working copy) @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.List; import java.util.ArrayList; +import java.util.Comparator; +import java.util.Collections; /** A {@link MergeScheduler} that runs each merge using a * separate thread, up until a maximum number of threads @@ -40,7 +42,7 @@ protected List mergeThreads = new ArrayList(); // Max number of threads allowed to be merging at once - private int maxThreadCount = 1; + private int maxThreadCount = 2; protected Directory dir; @@ -81,19 +83,44 @@ return mergeThreadPriority; } - /** Set the priority that merge threads run at. */ + /** Set the base priority that merge threads run at. + * Note that CMS may increase priority of some merge + * threads beyond this base priority. It's best not to + * set this any higher than + * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has + * room to set relative priority among threads. */ public synchronized void setMergeThreadPriority(int pri) { if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY) throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive"); mergeThreadPriority = pri; + updateThreadPriorities(); + } - final int numThreads = mergeThreadCount(); - for(int i=0;i { + public int compare(MergeThread t1, MergeThread t2) { + return t2.getCurrentMerge().segments.totalDocCount() - + t1.getCurrentMerge().segments.totalDocCount(); } } + /** Called whenever the running merges have changed. By + * default this method takes the simplistic approach of + * assigning higher priority to smaller merges. */ + protected synchronized void updateThreadPriorities() { + + Collections.sort(mergeThreads, new CompareByMergeDocCount()); + + int pri = mergeThreadPriority; + for(MergeThread mergeThread : mergeThreads) { + if (verbose()) { + message("update priority to " + pri + " for " + mergeThread); + } + mergeThread.setThreadPriority(pri); + pri = Math.min(Thread.MAX_PRIORITY, pri+1); + } + } + private boolean verbose() { return writer != null && writer.verbose(); } @@ -211,6 +238,7 @@ // merge: merger = getMergeThread(writer, merge); mergeThreads.add(merger); + updateThreadPriorities(); if (verbose()) message(" launch new thread [" + merger.getName() + "]"); @@ -259,6 +287,14 @@ return runningMerge; } + public synchronized MergePolicy.OneMerge getCurrentMerge() { + if (runningMerge != null) { + return runningMerge; + } else { + return startMerge; + } + } + public void setThreadPriority(int pri) { try { setPriority(pri); @@ -294,6 +330,7 @@ writer.mergeInit(merge); if (verbose()) message(" merge thread: do another merge " + merge.segString(dir)); + updateThreadPriorities(); } else break; }