Index: src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java	(revision 1199780)
+++ src/test/java/org/apache/jackrabbit/core/query/lucene/SQL2IndexingAggregateTest2.java	(working copy)
@@ -131,6 +131,7 @@
                 .createBinary(new ByteArrayInputStream(out.toByteArray())));
 
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
 
         // update jcr:data
@@ -140,6 +141,7 @@
         resource.setProperty("jcr:data", session.getValueFactory()
                 .createBinary(new ByteArrayInputStream(out.toByteArray())));
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, new Node[] {});
         executeSQL2Query(sqlCat, expectedNodes.toArray(new Node[] {}));
 
@@ -149,12 +151,14 @@
         Node foo = unstrContent.addNode("foo");
         foo.setProperty("text", "the quick brown fox jumps over the lazy dog.");
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, expectedNodes.toArray(new Node[] {}));
         executeSQL2Query(sqlCat, new Node[] {});
 
         // remove foo
         foo.remove();
         testRootNode.getSession().save();
+        waitForTextExtractionTasksToFinish();
         executeSQL2Query(sqlDog, new Node[] {});
         executeSQL2Query(sqlCat, new Node[] {});
 
Index: src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java	(revision 1199780)
+++ src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java	(working copy)
@@ -40,7 +40,7 @@
 public class IndexingQueueTest extends AbstractIndexingTest {
 
     private static final File TEMP_DIR =
-        new File(System.getProperty("java.io.tmpdir")); 
+        new File(System.getProperty("java.io.tmpdir"));
 
     public void testQueue() throws Exception {
         SearchIndex index = getSearchIndex();
@@ -63,6 +63,7 @@
         assertFalse(nodes.hasNext());
 
         BlockingParser.unblock();
+        waitForTextExtractionTasksToFinish();
         index.flush();
         assertEquals(0, queue.getNumPendingDocuments());
 
@@ -121,6 +122,7 @@
         }
 
         qm = session.getWorkspace().getQueryManager();
+        waitForTextExtractionTasksToFinish();
         getSearchIndex().flush();
 
         String stmt = testPath + "//element(*, nt:resource)[jcr:contains(., 'fox')] order by @jcr:score descending";
Index: src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java	(revision 1199780)
+++ src/test/java/org/apache/jackrabbit/core/query/AbstractIndexingTest.java	(working copy)
@@ -16,9 +16,15 @@
  */
 package org.apache.jackrabbit.core.query;
 
+import java.util.concurrent.TimeUnit;
+
 import javax.jcr.Node;
 import javax.jcr.Session;
 
+import org.apache.jackrabbit.core.JackrabbitRepositoryStub;
+import org.apache.jackrabbit.core.JackrabbitThreadPool;
+import org.apache.jackrabbit.core.RepositoryContext;
+
 /**
  * <code>AbstractIndexingTest</code> is a base class for all indexing
  * configuration tests.
@@ -52,4 +58,18 @@
     protected String getWorkspaceName() {
         return WORKSPACE_NAME;
     }
+
+    /**
+     * wait for async text-extraction tasks to finish
+     */
+    protected void waitForTextExtractionTasksToFinish() throws Exception {
+        final RepositoryContext context = JackrabbitRepositoryStub
+                .getRepositoryContext(session.getRepository());
+        JackrabbitThreadPool jtp = ((JackrabbitThreadPool) context
+                .getExecutor());
+        while (jtp.getPendingLowPriorityTaskCount() != 0) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+        getSearchIndex().flush();
+    }
 }
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java	(revision 1201657)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java	(working copy)
@@ -1011,7 +1011,9 @@
         synchronized (iq) {
             while (iq.getNumPendingDocuments() > 0 || indexingQueueCommitPending) {
                 try {
-                    log.debug("waiting for indexing queue to become empty");
+                    log.debug(
+                            "waiting for indexing queue to become empty. {} pending docs.",
+                            iq.getNumPendingDocuments());
                     iq.wait();
                     log.debug("notified");
                 } catch (InterruptedException e) {
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java	(revision 1201657)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/Util.java	(working copy)
@@ -52,8 +52,7 @@
      * @param old the document to dispose.
      */
     public static void disposeDocument(Document old) {
-        for (Object o : old.getFields()) {
-            Fieldable f = (Fieldable) o;
+        for (Fieldable f : old.getFields()) {
             try {
                 if (f.readerValue() != null) {
                     f.readerValue().close();
@@ -76,8 +75,7 @@
      *         otherwise.
      */
     public static boolean isDocumentReady(Document doc) {
-        for (Object o : doc.getFields()) {
-            Fieldable f = (Fieldable) o;
+        for (Fieldable f : doc.getFields()) {
             if (f instanceof LazyTextExtractorField) {
                 LazyTextExtractorField field = (LazyTextExtractorField) f;
                 if (!field.isExtractorFinished()) {
Index: src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java	(revision 1201657)
+++ src/main/java/org/apache/jackrabbit/core/query/lucene/LazyTextExtractorField.java	(working copy)
@@ -20,6 +20,7 @@
 import java.io.Reader;
 import java.util.concurrent.Executor;
 
+import org.apache.jackrabbit.core.JackrabbitThreadPool;
 import org.apache.jackrabbit.core.value.InternalValue;
 import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.document.AbstractField;
@@ -147,7 +148,8 @@
     /**
      * The background task for extracting text from a binary value.
      */
-    private class ParsingTask extends DefaultHandler implements Runnable {
+    private class ParsingTask extends DefaultHandler implements Runnable,
+            JackrabbitThreadPool.LOW_PRIORITY_MARKER {
 
         private final Parser parser;
 
Index: src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java	(revision 1199780)
+++ src/main/java/org/apache/jackrabbit/core/JackrabbitThreadPool.java	(working copy)
@@ -17,15 +17,27 @@
 package org.apache.jackrabbit.core;
 
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Thread pool used by the repository.
  */
-class JackrabbitThreadPool extends ScheduledThreadPoolExecutor {
+public class JackrabbitThreadPool extends ScheduledThreadPoolExecutor {
+
+    /**
+     * The logger instance for this class.
+     */
+    private static final Logger log = LoggerFactory
+            .getLogger(JackrabbitThreadPool.class);
 
     /**
      * Size of the per-repository thread pool.
@@ -64,14 +76,160 @@
     /**
      * Handler for tasks for which no free thread is found within the pool.
      */
-    private static final RejectedExecutionHandler handler =
-            new ThreadPoolExecutor.CallerRunsPolicy();
+    private static final RejectedExecutionHandler handler = new CallerRunsPolicy();
+
+    /**
+     * Property to control the value at which the thread pool starts to schedule
+     * the {@link LOW_PRIORITY_MARKER} tasks for later execution.
+     * 
+     * Set to <code>0</code> to disable the check
+     * 
+     * Default value is 0 (check is disabled).
+     * 
+     */
+    public static final String MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY = "org.apache.jackrabbit.core.JackrabbitThreadPool.maxLoadForLowPriorityTasks";
+
+    /**
+     * @see #MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY
+     */
+    private final static Integer maxLoadForLowPriorityTasks = getMaxLoadForLowPriorityTasks();
+
+    private static int getMaxLoadForLowPriorityTasks() {
+        final int defaultMaxLoad = 75;
+        int max = Integer.getInteger(MAX_LOAD_FOR_LOW_PRIORITY_TASKS_PROPERTY,
+                defaultMaxLoad);
+        if (max < 0 || max > 100) {
+            return defaultMaxLoad;
+        }
+        return max;
+    }
+
+    /**
+     * marker interface for low priority tasks (like text extraction) that can
+     * be scheduled later based on the extractor's current load
+     * 
+     */
+    public static interface LOW_PRIORITY_MARKER {
+    }
+
+    /**
+     * Queue where all the {@link LOW_PRIORITY_MARKER} tasks go for later
+     * execution
+     */
+    private final BlockingQueue<Runnable> lowPriorityTasksQueue;
+
+    /**
+     * Tasks that handles the scheduling and the execution of
+     * {@link LOW_PRIORITY_MARKER} tasks
+     */
+    private final RetryLowPriorityTask retryTask;
 
     /**
      * Creates a new thread pool.
      */
     public JackrabbitThreadPool() {
         super(size, factory, handler);
+        lowPriorityTasksQueue = new LinkedBlockingQueue<Runnable>();
+        retryTask = new RetryLowPriorityTask(this, lowPriorityTasksQueue);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        if (command instanceof LOW_PRIORITY_MARKER) {
+            scheduleLowPriority(command);
+            return;
+        }
+        super.execute(command);
     }
 
+    private void scheduleLowPriority(Runnable command) {
+        if (isOverDefinedMaxLoad()) {
+            lowPriorityTasksQueue.add(command);
+            retryTask.retryLater();
+            return;
+        }
+        super.execute(command);
+    }
+
+    /**
+     * compares the current load of the executor with the defined
+     * <code>{@link #maxLoadForLowPriorityTasks}</code> parameter.
+     * 
+     * Used to determine if the executor can handle additional
+     * {@link LOW_PRIORITY_MARKER} tasks.
+     * 
+     * @return true if the load is under the
+     *         <code>{@link #maxLoadForLowPriorityTasks}</code> parameter
+     */
+    private boolean isOverDefinedMaxLoad() {
+        if (maxLoadForLowPriorityTasks == 0) {
+            return false;
+        }
+        double currentLoad = ((double) getActiveCount()) / getPoolSize() * 100;
+        return currentLoad > maxLoadForLowPriorityTasks;
+    }
+
+    /**
+     * TEST ONLY
+     * 
+     * @return the number of low priority tasks that are waiting in the queue
+     */
+    public int getPendingLowPriorityTaskCount() {
+        return lowPriorityTasksQueue.size();
+    }
+
+    private static final class RetryLowPriorityTask implements Runnable {
+
+        /**
+         * schedule interval in ms for delayed tasks
+         */
+        private static final int LATER_MS = 50;
+
+        private final JackrabbitThreadPool executor;
+        private final BlockingQueue<Runnable> lowPriorityTasksQueue;
+
+        /**
+         * flag to indicate that another execute has been scheduled or is
+         * currently running.
+         */
+        private final AtomicBoolean retryPending;
+
+        public RetryLowPriorityTask(JackrabbitThreadPool executor,
+                BlockingQueue<Runnable> lowPriorityTasksQueue) {
+            this.executor = executor;
+            this.lowPriorityTasksQueue = lowPriorityTasksQueue;
+            this.retryPending = new AtomicBoolean(false);
+        }
+
+        public void retryLater() {
+            if (!retryPending.getAndSet(true)) {
+                executor.schedule(this, LATER_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        public void run() {
+            if (lowPriorityTasksQueue.isEmpty()) {
+                retryPending.set(false);
+                return;
+            }
+
+            int count = 0;
+            while (!executor.isOverDefinedMaxLoad()) {
+                Runnable r = lowPriorityTasksQueue.poll();
+                if (r == null) {
+                    log.debug("Executed {} low priority tasks.", count);
+                    break;
+                }
+                count++;
+                executor.execute(r);
+            }
+            retryPending.set(false);
+            if (!lowPriorityTasksQueue.isEmpty()) {
+                log.debug(
+                        "Executor is under load, will schedule {} remaining tasks for {} ms later",
+                        lowPriorityTasksQueue.size(), LATER_MS);
+                retryLater();
+            }
+        }
+    }
 }
