Index: src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (revision 833445) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (working copy) @@ -16,20 +16,23 @@ */ package org.apache.jackrabbit.core.query.lucene; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; -import org.apache.lucene.index.IndexReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Collections; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Semaphore; -import java.io.IOException; - /** * Merges indexes in a separate daemon thread. */ @@ -41,11 +44,6 @@ private static final Logger log = LoggerFactory.getLogger(IndexMerger.class); /** - * Marker task to signal the background thread to quit. - */ - private static final Merge QUIT = new Merge(new Index[0]); - - /** * minMergeDocs config parameter. */ private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS; @@ -61,11 +59,6 @@ private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR; /** - * Queue of merge Tasks - */ - private final BlockingQueue mergeTasks = new LinkedBlockingQueue(); - - /** * List of IndexBuckets in ascending document limit. */ private final List indexBuckets = new ArrayList(); @@ -76,14 +69,34 @@ private final MultiIndex multiIndex; /** + * The executor of the repository. + */ + private final Executor executor; + + /** + * Flag that indicates that this index merger is shuting down and should + * quit. + */ + private final AtomicBoolean quit = new AtomicBoolean(false); + + /** + * Flag that indicates if this index merger has already been started. + * @see #start() + */ + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + /** * Monitor object to synchronize merge calculation. */ private final Object lock = new Object(); /** - * Mutex that is acquired when replacing indexes on MultiIndex. + * Read/write lock for index segment replacement. A shared read lock is + * aquired for an index replacement. An exclusive write lock is acquired + * when this index merger is shuting down, to prevent further index + * replacements. */ - private final Semaphore indexReplacement; + private final ReadWriteLock indexReplacement = new ReentrantReadWriteLock(); /** * List of merger threads that are currently busy. @@ -91,32 +104,23 @@ private final List busyMergers = new ArrayList(); /** - * List of merger threads. - */ - private final List workers = new ArrayList(); - - /** * Creates an IndexMerger. * * @param multiIndex the MultiIndex. - * @param numWorkers the number of worker threads to use. + * @param executor the executor of the repository. */ - IndexMerger(MultiIndex multiIndex, int numWorkers) { + IndexMerger(MultiIndex multiIndex, Executor executor) { this.multiIndex = multiIndex; - for (int i = 0; i < numWorkers; i++) { - Worker w = new Worker(); - workers.add(w); - busyMergers.add(w); - } - this.indexReplacement = new Semaphore(workers.size()); + this.executor = executor; } /** * Starts this index merger. */ void start() { - for (Thread t : workers) { - t.start(); + isStarted.set(true); + for (Worker worker : busyMergers) { + worker.unblock(); } } @@ -190,7 +194,6 @@ } addMergeTask(new Merge(idxs)); if (log.isDebugEnabled()) { - log.debug("merge queue now contains " + mergeTasks.size() + " tasks."); int numBusy; synchronized (busyMergers) { numBusy = busyMergers.size(); @@ -236,22 +239,18 @@ */ void dispose() { log.debug("dispose IndexMerger"); - // get all permits for index replacements + // get exclusive lock on index replacements try { - indexReplacement.acquire(workers.size()); + indexReplacement.writeLock().lockInterruptibly(); } catch (InterruptedException e) { - log.warn("Interrupted while acquiring index replacement permits: " + e); + log.warn("Interrupted while acquiring index replacement exclusive lock: " + e); // try to stop IndexMerger without the sync } - log.debug("merge queue size: " + mergeTasks.size()); - // clear task queue - mergeTasks.clear(); + // set quit + quit.set(true); + log.debug("quit flag set"); - // send quit - addMergeTask(QUIT); - log.debug("quit sent"); - try { // give the merger threads some time to quit, // it is possible that the mergers are busy working on a large index. @@ -259,9 +258,13 @@ // die without being able to finish the merge. // at this point it is not possible anymore to replace indexes // on the MultiIndex because we hold all indexReplacement permits. - for (Thread t : workers) { - t.join(500); - if (t.isAlive()) { + Worker[] workers; + synchronized (busyMergers) { + workers = busyMergers.toArray(new Worker[busyMergers.size()]); + } + for (Worker w : workers) { + w.join(500); + if (w.isAlive()) { log.info("Unable to stop IndexMerger.Worker. Daemon is busy."); } else { log.debug("IndexMerger.Worker thread stopped"); @@ -305,14 +308,17 @@ //------------------------------< internal >-------------------------------- private void addMergeTask(Merge task) { - for (;;) { - try { - mergeTasks.put(task); - break; - } catch (InterruptedException e) { - // try again - Thread.interrupted(); + // only enqueue if still running + if (!quit.get()) { + Worker worker = new Worker(task); + if (isStarted.get()) { + // immediately unblock if this index merger is already started + worker.unblock(); } + synchronized (busyMergers) { + busyMergers.add(worker); + } + executor.execute(worker); } } @@ -448,7 +454,7 @@ } } - private class Worker extends Thread implements IndexListener { + private class Worker implements Runnable, IndexListener { /** * List of id Term that identify documents that were deleted @@ -456,53 +462,50 @@ */ private final List deletedDocuments = Collections.synchronizedList(new ArrayList()); - public Worker() { - setName("IndexMerger.Worker"); - setDaemon(true); + /** + * A latch that is set to zero when this worker is unblocked. + */ + private final CountDownLatch start = new CountDownLatch(1); + + /** + * Flat that indicates whether this woker has finished its work. + */ + private final AtomicBoolean terminated = new AtomicBoolean(true); + + /** + * The merge task. + */ + private final Merge task; + + /** + * Creates a new worker which is initially blocked. Call + * {@link #unblock()} to unblock it. + * + * @param task the merge task. + */ + private Worker(Merge task) { + this.task = task; } /** * Implements the index merging. */ public void run() { - for (;;) { - boolean isIdle = false; - if (mergeTasks.size() == 0) { - synchronized (busyMergers) { - busyMergers.remove(this); - busyMergers.notifyAll(); + // worker is initially suspended + try { + try { + start.await(); + } catch (InterruptedException e) { + // check if we should quit + if (!quit.get()) { + // enqueue task again and retry with another thread + addMergeTask(task); } - isIdle = true; + return; } - Merge task; - for (;;) { - try { - task = mergeTasks.take(); - break; - } catch (InterruptedException e) { - // try again - Thread.interrupted(); - } - } - if (task == QUIT) { - synchronized (busyMergers) { - busyMergers.remove(this); - } - // put back QUIT to signal other workers - addMergeTask(task); - break; - } - if (isIdle) { - synchronized (busyMergers) { - busyMergers.add(this); - } - } log.debug("accepted merge request"); - // reset deleted documents - deletedDocuments.clear(); - // get readers String[] names = new String[task.indexes.length]; for (int i = 0; i < task.indexes.length; i++) { @@ -538,15 +541,16 @@ // inform multi index // if we cannot get the sync immediately we have to quit - if (!indexReplacement.tryAcquire()) { + Lock shared = indexReplacement.readLock(); + if (!shared.tryLock()) { log.debug("index merging canceled"); - break; + return; } try { log.debug("replace indexes"); multiIndex.replaceIndexes(names, index, deletedDocuments); } finally { - indexReplacement.release(); + shared.unlock(); } success = true; @@ -556,13 +560,24 @@ // delete index log.debug("deleting index " + index.getName()); multiIndex.deleteIndex(index); + // add task again and retry + addMergeTask(task); } } } catch (Throwable e) { log.error("Error while merging indexes: ", e); } + } finally { + synchronized (terminated) { + terminated.set(true); + terminated.notifyAll(); + } + synchronized (busyMergers) { + busyMergers.remove(this); + busyMergers.notifyAll(); + } + log.debug("Worker finished"); } - log.info("IndexMerger.Worker terminated"); } /** @@ -572,5 +587,37 @@ log.debug("document deleted: " + id.text()); deletedDocuments.add(id); } + + /** + * Unblocks this worker and allows it to start with the index merging. + */ + void unblock() { + start.countDown(); + } + + /** + * Waits until this worker is finished or the specified amount of time + * has elapsed. + * + * @param timeout the timeout in milliseconds. + * @throws InterruptedException if the current thread is interrupted + * while waiting for this worker to + * terminate. + */ + void join(long timeout) throws InterruptedException { + synchronized (terminated) { + while (!terminated.get()) { + terminated.wait(timeout); + } + } + } + + /** + * @return true if this worker is still alive and not yet + * terminated. + */ + boolean isAlive() { + return !terminated.get(); + } } } Index: src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (revision 833445) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (working copy) @@ -16,41 +16,42 @@ */ package org.apache.jackrabbit.core.query.lucene; +import java.io.IOException; +import java.text.DateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jcr.RepositoryException; + import org.apache.jackrabbit.core.id.NodeId; import org.apache.jackrabbit.core.query.lucene.directory.DirectoryManager; +import org.apache.jackrabbit.core.state.ChildNodeEntry; import org.apache.jackrabbit.core.state.ItemStateException; import org.apache.jackrabbit.core.state.ItemStateManager; import org.apache.jackrabbit.core.state.NoSuchItemStateException; import org.apache.jackrabbit.core.state.NodeState; -import org.apache.jackrabbit.core.state.ChildNodeEntry; -import org.apache.jackrabbit.util.Timer; import org.apache.jackrabbit.spi.Path; import org.apache.jackrabbit.spi.PathFactory; +import org.apache.jackrabbit.spi.commons.conversion.DefaultNamePathResolver; +import org.apache.jackrabbit.spi.commons.conversion.PathResolver; import org.apache.jackrabbit.spi.commons.name.PathFactoryImpl; -import org.apache.jackrabbit.spi.commons.conversion.PathResolver; -import org.apache.jackrabbit.spi.commons.conversion.DefaultNamePathResolver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.util.Timer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.store.Directory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import javax.jcr.RepositoryException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Arrays; -import java.util.Set; -import java.util.HashSet; -import java.util.HashMap; -import java.util.Map; -import java.util.Collection; -import java.util.Collections; -import java.util.Calendar; -import java.text.DateFormat; - /** * A MultiIndex consists of a {@link VolatileIndex} and multiple * {@link PersistentIndex}es. The goal is to keep most parts of the index open @@ -255,7 +256,7 @@ this.redoLog = redoLogFactory.createRedoLog(this); // initialize IndexMerger - merger = new IndexMerger(this, handler.getIndexMergerPoolSize()); + merger = new IndexMerger(this, handler.getContext().getExecutor()); merger.setMaxMergeDocs(handler.getMaxMergeDocs()); merger.setMergeFactor(handler.getMergeFactor()); merger.setMinMergeDocs(handler.getMinMergeDocs()); Index: src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (revision 833445) +++ src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (working copy) @@ -170,11 +170,6 @@ public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1; /** - * The default value for {@link #indexMergerPoolSize}. - */ - public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2; - - /** * The path factory. */ protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance(); @@ -447,11 +442,6 @@ private boolean initializeHierarchyCache = true; /** - * The number of worker threads for merging index segments. - */ - private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE; - - /** * The name of the redo log factory class implementation. */ private String redoLogFactoryClass = DefaultRedoLogFactory.class.getName(); @@ -2227,26 +2217,6 @@ } /** - * @return the current size of the index merger pool. - */ - public int getIndexMergerPoolSize() { - return indexMergerPoolSize; - } - - /** - * Sets a new value for the index merger pool size. - * - * @param indexMergerPoolSize the number of worker threads. - * @throws IllegalArgumentException if the size is less than or equal 0. - */ - public void setIndexMergerPoolSize(int indexMergerPoolSize) { - if (indexMergerPoolSize <= 0) { - throw new IllegalArgumentException("must be greater than 0"); - } - this.indexMergerPoolSize = indexMergerPoolSize; - } - - /** * @return the maximum age in seconds for outdated generations of * {@link IndexInfos}. */