Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java =================================================================== --- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java (Revision 793830) +++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java (Arbeitskopie) @@ -34,7 +34,7 @@ createNodes(testRootNode, 10, 5, 0); superuser.save(); - SearchIndex index = (SearchIndex) getQueryHandler(); + SearchIndex index = getSearchIndex(); int resultFetchSize = index.getResultFetchSize(); try { String stmt = testPath + "//*[@" + jcrPrimaryType + "] order by @jcr:score descending"; Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java =================================================================== --- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java (Revision 793830) +++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java (Arbeitskopie) @@ -35,6 +35,7 @@ import javax.jcr.query.qom.QueryObjectModelFactory; import org.apache.jackrabbit.commons.iterator.NodeIteratorAdapter; +import org.apache.jackrabbit.core.query.lucene.SearchIndex; import org.apache.jackrabbit.test.AbstractJCRTest; /** @@ -139,6 +140,7 @@ */ protected void executeXPathQuery(String xpath, Node[] nodes) throws RepositoryException { + getSearchIndex().flush(); QueryResult res = qm.createQuery(xpath, Query.XPATH).execute(); checkResult(res, nodes); } @@ -153,6 +155,7 @@ */ protected void executeSQLQuery(String sql, Node[] nodes) throws RepositoryException { + getSearchIndex().flush(); QueryResult res = qm.createQuery(sql, Query.SQL).execute(); checkResult(res, nodes); } @@ -253,9 +256,11 @@ } /** + * Returns a reference to the underlying search index. + * * @return the query handler inside the {@link #qm query manager}. */ - protected QueryHandler getQueryHandler() { - return ((QueryManagerImpl) qm).getQueryHandler(); + protected SearchIndex getSearchIndex() { + return (SearchIndex) ((QueryManagerImpl) qm).getQueryHandler(); } } Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java =================================================================== --- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (Revision 793830) +++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (Arbeitskopie) @@ -50,7 +50,7 @@ public void testQueue() throws Exception { Extractor.sleepTime = 200; - SearchIndex index = (SearchIndex) getQueryHandler(); + SearchIndex index = getSearchIndex(); IndexingQueue queue = index.getIndex().getIndexingQueue(); assertEquals(0, queue.getNumPendingDocuments()); @@ -70,11 +70,8 @@ NodeIterator nodes = q.execute().getNodes(); assertFalse(nodes.hasNext()); - synchronized (index.getIndex()) { - while (queue.getNumPendingDocuments() > 0) { - index.getIndex().wait(50); - } - } + index.flush(); + assertEquals(0, queue.getNumPendingDocuments()); q = qm.createQuery(testPath + "/*[jcr:contains(., 'fox')]", Query.XPATH); nodes = q.execute().getNodes(); @@ -83,8 +80,7 @@ public void testInitialIndex() throws Exception { Extractor.sleepTime = 200; - SearchIndex index = (SearchIndex) getQueryHandler(); - File indexDir = new File(index.getPath()); + File indexDir = new File(getSearchIndex().getPath()); // fill workspace Node testFolder = testRootNode.addNode("folder", "nt:folder"); @@ -133,20 +129,8 @@ } qm = session.getWorkspace().getQueryManager(); - index = (SearchIndex) getQueryHandler(); - IndexingQueue queue = index.getIndex().getIndexingQueue(); + getSearchIndex().flush(); - // flush index to make sure any documents in the buffer are written - // to the index. this is to make sure all nodes are pushed either to - // the index or to the indexing queue - index.getIndex().flush(); - - synchronized (index.getIndex()) { - while (queue.getNumPendingDocuments() > 0) { - index.getIndex().wait(50); - } - } - String stmt = testPath + "//element(*, nt:resource)[jcr:contains(., 'fox')] order by @jcr:score descending"; Query q = qm.createQuery(stmt, Query.XPATH); assertEquals(num, q.execute().getNodes().getSize()); Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java =================================================================== --- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java (Revision 793830) +++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java (Arbeitskopie) @@ -58,7 +58,6 @@ resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray())); testRootNode.save(); - waitUntilQueueEmpty(); executeSQLQuery(sqlDog, new Node[]{file}); @@ -68,7 +67,6 @@ writer.flush(); resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray())); testRootNode.save(); - waitUntilQueueEmpty(); executeSQLQuery(sqlCat, new Node[]{file}); @@ -95,22 +93,10 @@ resource.setProperty("jcr:mimeType", "text/plain"); resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray())); testRootNode.save(); - waitUntilQueueEmpty(); executeSQLQuery(sqlCat, new Node[]{file}); } - protected void waitUntilQueueEmpty() throws Exception { - SearchIndex index = (SearchIndex) getQueryHandler(); - IndexingQueue queue = index.getIndex().getIndexingQueue(); - index.getIndex().flush(); - synchronized (index.getIndex()) { - while (queue.getNumPendingDocuments() > 0) { - index.getIndex().wait(50); - } - } - } - public void testContentLastModified() throws RepositoryException { List expected = new ArrayList(); long time = System.currentTimeMillis(); Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java (Revision 793840) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java (Arbeitskopie) @@ -150,6 +150,7 @@ queueStore.removeUUID(uuid); log.debug("removed node {}. New size of indexing queue: {}", uuid, new Integer(pendingDocuments.size())); + notifyIfEmpty(); } return doc; } @@ -189,6 +190,7 @@ it.remove(); } queueStore.close(); + notifyIfEmpty(); } /** @@ -201,14 +203,36 @@ } } - //----------------------------< testing only >------------------------------ + /** + * Notifies all threads waiting for this queue to become empty. + * The notification is only sent if this queue actually is empty. + */ + private synchronized void notifyIfEmpty() { + if (pendingDocuments.isEmpty()) { + notifyAll(); + } + } /** - * This method is for testing only! + * Waits until this queue is empty. + */ + synchronized void waitUntilEmpty() { + while (!pendingDocuments.isEmpty()) { + try { + wait(); + } catch (InterruptedException e) { + // Interrupted, check again if we're empty + } + } + } + + /** + * Returns the number of pending documents. * * @return the number of the currently pending documents. */ synchronized int getNumPendingDocuments() { return pendingDocuments.size(); } + } Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (Revision 793830) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (Arbeitskopie) @@ -746,6 +746,21 @@ } /** + * Waits until all pending text extraction tasks have been processed + * and the updated index has been flushed to disk. + * + * @throws RepositoryException if the index update can not be written + */ + public void flush() throws RepositoryException { + try { + index.getIndexingQueue().waitUntilEmpty(); + index.flush(); + } catch (IOException e) { + throw new RepositoryException("Failed to flush the index", e); + } + } + + /** * Closes this QueryHandler and frees resources attached * to this handler. */ Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java (Revision 793830) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java (Arbeitskopie) @@ -20,9 +20,6 @@ import java.io.IOException; import java.io.StringReader; -import EDU.oswego.cs.dl.util.concurrent.Executor; -import EDU.oswego.cs.dl.util.concurrent.DirectExecutor; - /** * TextExtractorReader implements a specialized reader that runs * the text extractor in a background thread. @@ -30,11 +27,6 @@ class TextExtractorReader extends Reader { /** - * A direct executor in case text extraction is requested for immediate use. - */ - private static final Executor DIRECT_EXECUTOR = new DirectExecutor(); - - /** * Reference to the extracted text. This reference is initially * null and later set to a valid reader when the text extractor * finished its work. @@ -47,33 +39,20 @@ private TextExtractorJob job; /** - * The pooled executor. - */ - private final Executor executor; - - /** * The timeout in milliseconds to wait at most for the text extractor * when {@link #isExtractorFinished()} is called. */ private final long timeout; /** - * Set to true when the text extractor job has been started - * and is running. - */ - private boolean jobStarted = false; - - /** * Creates a new TextExtractorReader with the given * job. * * @param job the extractor job. - * @param executor the executor to use when text extraction is requested. * @param timeout the timeout to wait at most for the text extractor. */ - TextExtractorReader(TextExtractorJob job, Executor executor, long timeout) { + TextExtractorReader(TextExtractorJob job, long timeout) { this.job = job; - this.executor = executor; this.timeout = timeout; } @@ -86,9 +65,6 @@ if (extractedText != null) { extractedText.close(); } - if (jobStarted) { - job.discard(); - } } /** @@ -96,26 +72,11 @@ */ public int read(char[] cbuf, int off, int len) throws IOException { if (extractedText == null) { - // no reader present - // check if job is started already - if (jobStarted) { - // wait until available - extractedText = job.getReader(Long.MAX_VALUE); - } else { - // execute with current thread - try { - DIRECT_EXECUTOR.execute(job); - } catch (InterruptedException e) { - // current thread is in interrupted state - // -> ignore (job will not return a reader, which is fine) - } - extractedText = job.getReader(0); + String text = job.getExtractedText(timeout); + if (text == null) { + text = ""; } - - if (extractedText == null) { - // exception occurred - extractedText = new StringReader(""); - } + extractedText = new StringReader(text); } return extractedText.read(cbuf, off, len); } @@ -125,25 +86,6 @@ * finished its work and this reader will return extracted text. */ public boolean isExtractorFinished() { - if (!jobStarted) { - try { - executor.execute(job); - jobStarted = true; - } catch (InterruptedException e) { - // this thread is in interrupted state - return false; - } - extractedText = job.getReader(timeout); - } else { - // job is already running, check for immediate result - extractedText = job.getReader(0); - } - - if (extractedText == null && job.getException() != null) { - // exception occurred - extractedText = new StringReader(""); - } - - return extractedText != null; + return job.hasExtractedText(); } } Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java (Revision 793830) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java (Arbeitskopie) @@ -16,47 +16,34 @@ */ package org.apache.jackrabbit.core.query.lucene; -import EDU.oswego.cs.dl.util.concurrent.FutureResult; -import EDU.oswego.cs.dl.util.concurrent.Callable; - import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.extractor.TextExtractor; -import org.apache.jackrabbit.util.LazyFileInputStream; import org.slf4j.LoggerFactory; import org.slf4j.Logger; import java.io.InputStream; import java.io.Reader; -import java.io.IOException; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.io.BufferedWriter; -import java.io.InputStreamReader; -import java.io.StringReader; -import java.lang.reflect.InvocationTargetException; /** * TextExtractorJob implements a future result and is runnable * in a background thread. */ -public class TextExtractorJob extends FutureResult implements Runnable { +public class TextExtractorJob implements Runnable { /** - * UTF-8 encoding. + * The logger instance for this class. */ - private static final String ENCODING_UTF8 = "UTF-8"; + private static final Logger log = LoggerFactory.getLogger(TextExtractorJob.class); /** - * The logger instance for this class. + * The text extractor. */ - private static final Logger log = LoggerFactory.getLogger(TextExtractorJob.class); + private final TextExtractor extractor; /** - * The command of the future result. + * The binary stream. */ - private final Runnable cmd; + private final InputStream stream; /** * The mime type of the resource to extract text from. @@ -64,14 +51,14 @@ private final String type; /** - * Set to true if this job timed out. + * The encoding of the binary content, or null. */ - private transient boolean timedOut = false; + private final String encoding; /** - * true if this extractor job has been flaged as discarded. + * The extracted text. Set when the text extraction task completes. */ - private transient boolean discarded = false; + private transient String text = null; /** * Creates a new TextExtractorJob with the given @@ -83,76 +70,40 @@ * @param encoding the encoding of the binary content. May be * null. */ - public TextExtractorJob(final TextExtractor extractor, - final InputStream stream, - final String type, - final String encoding) { + public TextExtractorJob( + TextExtractor extractor, + InputStream stream, String type, String encoding) { + this.extractor = extractor; + this.stream = stream; this.type = type; - this.cmd = setter(new Callable() { - public Object call() throws Exception { - Reader r = extractor.extractText(stream, type, encoding); - if (r != null) { - if (discarded) { - r.close(); - r = null; - } else if (timedOut) { - // spool a temp file to save memory - r = getSwappedOutReader(r); - } - } - return r; - } - }); + this.encoding = encoding; } + public boolean hasExtractedText() { + return text != null; + } + /** * Returns the reader with the extracted text from the input stream passed - * to the constructor of this TextExtractorJob. The caller of - * this method is responsible for closing the returned reader. Returns + * to the constructor of this TextExtractorJob. Returns * null if a timeoutoccurs while waiting for the * text extractor to get the reader. * - * @return the Reader with the extracted text. Returns null if - * a timeout or an exception occured extracting the text. + * @return the extracted text, or null if a timeout or + * an exception occurred while extracting the text */ - public Reader getReader(long timeout) { - Reader reader = null; - try { - reader = (Reader) timedGet(timeout); - } catch (InterruptedException e) { - // also covers TimeoutException - // text not extracted within timeout or interrupted - if (timeout > 0) { - log.debug("Text extraction for {} timed out (>{}ms).", - type, new Long(timeout)); - timedOut = true; - } - } catch (InvocationTargetException e) { - // extraction failed - log.warn("Exception while indexing binary property: " + e.getCause()); - log.debug("Dump: ", e.getCause()); - } - return reader; - } - - /** - * Discards this extractor job. If the reader within this job is ready at - * the time of this call, it is closed. If the reader is not yet ready this - * job will be flaged as discarded and any later call to - * {@link #getReader(long)} will return null. The reader that - * is about to be constructed by a background thread will be closed - * automatically as soon as it becomes ready. - */ - void discard() { - discarded = true; - Reader r = (Reader) peek(); - if (r != null) { + public synchronized String getExtractedText(long timeout) { + if (text == null) { try { - r.close(); - } catch (IOException e) { - log.warn("Exception when trying to discard extractor job: " + e); + wait(timeout); + } catch (InterruptedException e) { + if (text == null) { + log.debug("Text extraction for {} timed out (> {}ms)", + type, timeout); + } } } + return text; } /** @@ -168,69 +119,20 @@ * Runs the actual text extraction. */ public void run() { - // forward to command - cmd.run(); - } - - //----------------------------< internal >---------------------------------- - - /** - * Returns a Reader for r using a temp file. - * - * @param r the reader to swap out into a temp file. - * @return a reader to the temp file. - */ - private Reader getSwappedOutReader(Reader r) { - final File temp; try { - temp = File.createTempFile("extractor", null); - } catch (IOException e) { - // unable to create temp file - // return reader as is - return r; - } - Writer out; - try { - out = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(temp), ENCODING_UTF8)); - } catch (IOException e) { - // should never happend actually - if (!temp.delete()) { - temp.deleteOnExit(); - } - return r; - } - - // spool into temp file - InputStream in = null; - try { try { - IOUtils.copy(r, out); - out.close(); + Reader reader = extractor.extractText(stream, type, encoding); + this.text = IOUtils.toString(reader); } finally { - r.close(); + stream.close(); } - in = new LazyFileInputStream(temp); - - return new InputStreamReader(in, ENCODING_UTF8) { - public void close() throws IOException { - super.close(); - // delete file - if (!temp.delete()) { - temp.deleteOnExit(); - } - } - }; - } catch (IOException e) { - // do some clean up - IOUtils.closeQuietly(out); - IOUtils.closeQuietly(in); - - if (!temp.delete()) { - temp.deleteOnExit(); - } - // use empty string reader as fallback - return new StringReader(""); + } catch (Throwable e) { + log.warn("Text extraction failed for type " + type, e); + this.text = ""; } + synchronized (this) { + notifyAll(); + } } + } Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java =================================================================== --- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java (Revision 793830) +++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java (Arbeitskopie) @@ -20,9 +20,9 @@ import org.slf4j.LoggerFactory; import org.slf4j.Logger; +import java.io.IOException; import java.io.Reader; import java.io.InputStream; -import java.io.IOException; import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; import EDU.oswego.cs.dl.util.concurrent.Channel; @@ -105,11 +105,17 @@ *

* This implementation returns an instance of {@link TextExtractorReader}. */ - public Reader extractText(InputStream stream, - String type, - String encoding) throws IOException { + public Reader extractText( + InputStream stream, String type, String encoding) + throws IOException { TextExtractorJob job = new TextExtractorJob(extractor, stream, type, encoding); - return new TextExtractorReader(job, executor, timout); + try { + executor.execute(job); + } catch (InterruptedException e) { + log.warn("Failed to start a background text extraction task", e); + stream.close(); + } + return new TextExtractorReader(job, timout); } /**