Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java
===================================================================
--- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java (Revision 793830)
+++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/LargeResultSetTest.java (Arbeitskopie)
@@ -34,7 +34,7 @@
createNodes(testRootNode, 10, 5, 0);
superuser.save();
- SearchIndex index = (SearchIndex) getQueryHandler();
+ SearchIndex index = getSearchIndex();
int resultFetchSize = index.getResultFetchSize();
try {
String stmt = testPath + "//*[@" + jcrPrimaryType + "] order by @jcr:score descending";
Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java
===================================================================
--- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java (Revision 793830)
+++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/AbstractQueryTest.java (Arbeitskopie)
@@ -35,6 +35,7 @@
import javax.jcr.query.qom.QueryObjectModelFactory;
import org.apache.jackrabbit.commons.iterator.NodeIteratorAdapter;
+import org.apache.jackrabbit.core.query.lucene.SearchIndex;
import org.apache.jackrabbit.test.AbstractJCRTest;
/**
@@ -139,6 +140,7 @@
*/
protected void executeXPathQuery(String xpath, Node[] nodes)
throws RepositoryException {
+ getSearchIndex().flush();
QueryResult res = qm.createQuery(xpath, Query.XPATH).execute();
checkResult(res, nodes);
}
@@ -153,6 +155,7 @@
*/
protected void executeSQLQuery(String sql, Node[] nodes)
throws RepositoryException {
+ getSearchIndex().flush();
QueryResult res = qm.createQuery(sql, Query.SQL).execute();
checkResult(res, nodes);
}
@@ -253,9 +256,11 @@
}
/**
+ * Returns a reference to the underlying search index.
+ *
* @return the query handler inside the {@link #qm query manager}.
*/
- protected QueryHandler getQueryHandler() {
- return ((QueryManagerImpl) qm).getQueryHandler();
+ protected SearchIndex getSearchIndex() {
+ return (SearchIndex) ((QueryManagerImpl) qm).getQueryHandler();
}
}
Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java
===================================================================
--- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (Revision 793830)
+++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingQueueTest.java (Arbeitskopie)
@@ -50,7 +50,7 @@
public void testQueue() throws Exception {
Extractor.sleepTime = 200;
- SearchIndex index = (SearchIndex) getQueryHandler();
+ SearchIndex index = getSearchIndex();
IndexingQueue queue = index.getIndex().getIndexingQueue();
assertEquals(0, queue.getNumPendingDocuments());
@@ -70,11 +70,8 @@
NodeIterator nodes = q.execute().getNodes();
assertFalse(nodes.hasNext());
- synchronized (index.getIndex()) {
- while (queue.getNumPendingDocuments() > 0) {
- index.getIndex().wait(50);
- }
- }
+ index.flush();
+ assertEquals(0, queue.getNumPendingDocuments());
q = qm.createQuery(testPath + "/*[jcr:contains(., 'fox')]", Query.XPATH);
nodes = q.execute().getNodes();
@@ -83,8 +80,7 @@
public void testInitialIndex() throws Exception {
Extractor.sleepTime = 200;
- SearchIndex index = (SearchIndex) getQueryHandler();
- File indexDir = new File(index.getPath());
+ File indexDir = new File(getSearchIndex().getPath());
// fill workspace
Node testFolder = testRootNode.addNode("folder", "nt:folder");
@@ -133,20 +129,8 @@
}
qm = session.getWorkspace().getQueryManager();
- index = (SearchIndex) getQueryHandler();
- IndexingQueue queue = index.getIndex().getIndexingQueue();
+ getSearchIndex().flush();
- // flush index to make sure any documents in the buffer are written
- // to the index. this is to make sure all nodes are pushed either to
- // the index or to the indexing queue
- index.getIndex().flush();
-
- synchronized (index.getIndex()) {
- while (queue.getNumPendingDocuments() > 0) {
- index.getIndex().wait(50);
- }
- }
-
String stmt = testPath + "//element(*, nt:resource)[jcr:contains(., 'fox')] order by @jcr:score descending";
Query q = qm.createQuery(stmt, Query.XPATH);
assertEquals(num, q.execute().getNodes().getSize());
Index: jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java
===================================================================
--- jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java (Revision 793830)
+++ jackrabbit-core/src/test/java/org/apache/jackrabbit/core/query/lucene/IndexingAggregateTest.java (Arbeitskopie)
@@ -58,7 +58,6 @@
resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray()));
testRootNode.save();
- waitUntilQueueEmpty();
executeSQLQuery(sqlDog, new Node[]{file});
@@ -68,7 +67,6 @@
writer.flush();
resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray()));
testRootNode.save();
- waitUntilQueueEmpty();
executeSQLQuery(sqlCat, new Node[]{file});
@@ -95,22 +93,10 @@
resource.setProperty("jcr:mimeType", "text/plain");
resource.setProperty("jcr:data", new ByteArrayInputStream(out.toByteArray()));
testRootNode.save();
- waitUntilQueueEmpty();
executeSQLQuery(sqlCat, new Node[]{file});
}
- protected void waitUntilQueueEmpty() throws Exception {
- SearchIndex index = (SearchIndex) getQueryHandler();
- IndexingQueue queue = index.getIndex().getIndexingQueue();
- index.getIndex().flush();
- synchronized (index.getIndex()) {
- while (queue.getNumPendingDocuments() > 0) {
- index.getIndex().wait(50);
- }
- }
- }
-
public void testContentLastModified() throws RepositoryException {
List expected = new ArrayList();
long time = System.currentTimeMillis();
Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java
===================================================================
--- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java (Revision 793840)
+++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexingQueue.java (Arbeitskopie)
@@ -150,6 +150,7 @@
queueStore.removeUUID(uuid);
log.debug("removed node {}. New size of indexing queue: {}",
uuid, new Integer(pendingDocuments.size()));
+ notifyIfEmpty();
}
return doc;
}
@@ -189,6 +190,7 @@
it.remove();
}
queueStore.close();
+ notifyIfEmpty();
}
/**
@@ -201,14 +203,36 @@
}
}
- //----------------------------< testing only >------------------------------
+ /**
+ * Notifies all threads waiting for this queue to become empty.
+ * The notification is only sent if this queue actually is empty.
+ */
+ private synchronized void notifyIfEmpty() {
+ if (pendingDocuments.isEmpty()) {
+ notifyAll();
+ }
+ }
/**
- * This method is for testing only!
+ * Waits until this queue is empty.
+ */
+ synchronized void waitUntilEmpty() {
+ while (!pendingDocuments.isEmpty()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Interrupted, check again if we're empty
+ }
+ }
+ }
+
+ /**
+ * Returns the number of pending documents.
*
* @return the number of the currently pending documents.
*/
synchronized int getNumPendingDocuments() {
return pendingDocuments.size();
}
+
}
Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
===================================================================
--- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (Revision 793830)
+++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (Arbeitskopie)
@@ -746,6 +746,21 @@
}
/**
+ * Waits until all pending text extraction tasks have been processed
+ * and the updated index has been flushed to disk.
+ *
+ * @throws RepositoryException if the index update can not be written
+ */
+ public void flush() throws RepositoryException {
+ try {
+ index.getIndexingQueue().waitUntilEmpty();
+ index.flush();
+ } catch (IOException e) {
+ throw new RepositoryException("Failed to flush the index", e);
+ }
+ }
+
+ /**
* Closes this QueryHandler and frees resources attached
* to this handler.
*/
Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java
===================================================================
--- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java (Revision 793830)
+++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorReader.java (Arbeitskopie)
@@ -20,9 +20,6 @@
import java.io.IOException;
import java.io.StringReader;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
-
/**
* TextExtractorReader implements a specialized reader that runs
* the text extractor in a background thread.
@@ -30,11 +27,6 @@
class TextExtractorReader extends Reader {
/**
- * A direct executor in case text extraction is requested for immediate use.
- */
- private static final Executor DIRECT_EXECUTOR = new DirectExecutor();
-
- /**
* Reference to the extracted text. This reference is initially
* null and later set to a valid reader when the text extractor
* finished its work.
@@ -47,33 +39,20 @@
private TextExtractorJob job;
/**
- * The pooled executor.
- */
- private final Executor executor;
-
- /**
* The timeout in milliseconds to wait at most for the text extractor
* when {@link #isExtractorFinished()} is called.
*/
private final long timeout;
/**
- * Set to true when the text extractor job has been started
- * and is running.
- */
- private boolean jobStarted = false;
-
- /**
* Creates a new TextExtractorReader with the given
* job.
*
* @param job the extractor job.
- * @param executor the executor to use when text extraction is requested.
* @param timeout the timeout to wait at most for the text extractor.
*/
- TextExtractorReader(TextExtractorJob job, Executor executor, long timeout) {
+ TextExtractorReader(TextExtractorJob job, long timeout) {
this.job = job;
- this.executor = executor;
this.timeout = timeout;
}
@@ -86,9 +65,6 @@
if (extractedText != null) {
extractedText.close();
}
- if (jobStarted) {
- job.discard();
- }
}
/**
@@ -96,26 +72,11 @@
*/
public int read(char[] cbuf, int off, int len) throws IOException {
if (extractedText == null) {
- // no reader present
- // check if job is started already
- if (jobStarted) {
- // wait until available
- extractedText = job.getReader(Long.MAX_VALUE);
- } else {
- // execute with current thread
- try {
- DIRECT_EXECUTOR.execute(job);
- } catch (InterruptedException e) {
- // current thread is in interrupted state
- // -> ignore (job will not return a reader, which is fine)
- }
- extractedText = job.getReader(0);
+ String text = job.getExtractedText(timeout);
+ if (text == null) {
+ text = "";
}
-
- if (extractedText == null) {
- // exception occurred
- extractedText = new StringReader("");
- }
+ extractedText = new StringReader(text);
}
return extractedText.read(cbuf, off, len);
}
@@ -125,25 +86,6 @@
* finished its work and this reader will return extracted text.
*/
public boolean isExtractorFinished() {
- if (!jobStarted) {
- try {
- executor.execute(job);
- jobStarted = true;
- } catch (InterruptedException e) {
- // this thread is in interrupted state
- return false;
- }
- extractedText = job.getReader(timeout);
- } else {
- // job is already running, check for immediate result
- extractedText = job.getReader(0);
- }
-
- if (extractedText == null && job.getException() != null) {
- // exception occurred
- extractedText = new StringReader("");
- }
-
- return extractedText != null;
+ return job.hasExtractedText();
}
}
Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java
===================================================================
--- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java (Revision 793830)
+++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/TextExtractorJob.java (Arbeitskopie)
@@ -16,47 +16,34 @@
*/
package org.apache.jackrabbit.core.query.lucene;
-import EDU.oswego.cs.dl.util.concurrent.FutureResult;
-import EDU.oswego.cs.dl.util.concurrent.Callable;
-
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.extractor.TextExtractor;
-import org.apache.jackrabbit.util.LazyFileInputStream;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.io.InputStream;
import java.io.Reader;
-import java.io.IOException;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.io.BufferedWriter;
-import java.io.InputStreamReader;
-import java.io.StringReader;
-import java.lang.reflect.InvocationTargetException;
/**
* TextExtractorJob implements a future result and is runnable
* in a background thread.
*/
-public class TextExtractorJob extends FutureResult implements Runnable {
+public class TextExtractorJob implements Runnable {
/**
- * UTF-8 encoding.
+ * The logger instance for this class.
*/
- private static final String ENCODING_UTF8 = "UTF-8";
+ private static final Logger log = LoggerFactory.getLogger(TextExtractorJob.class);
/**
- * The logger instance for this class.
+ * The text extractor.
*/
- private static final Logger log = LoggerFactory.getLogger(TextExtractorJob.class);
+ private final TextExtractor extractor;
/**
- * The command of the future result.
+ * The binary stream.
*/
- private final Runnable cmd;
+ private final InputStream stream;
/**
* The mime type of the resource to extract text from.
@@ -64,14 +51,14 @@
private final String type;
/**
- * Set to true if this job timed out.
+ * The encoding of the binary content, or null.
*/
- private transient boolean timedOut = false;
+ private final String encoding;
/**
- * true if this extractor job has been flaged as discarded.
+ * The extracted text. Set when the text extraction task completes.
*/
- private transient boolean discarded = false;
+ private transient String text = null;
/**
* Creates a new TextExtractorJob with the given
@@ -83,76 +70,40 @@
* @param encoding the encoding of the binary content. May be
* null.
*/
- public TextExtractorJob(final TextExtractor extractor,
- final InputStream stream,
- final String type,
- final String encoding) {
+ public TextExtractorJob(
+ TextExtractor extractor,
+ InputStream stream, String type, String encoding) {
+ this.extractor = extractor;
+ this.stream = stream;
this.type = type;
- this.cmd = setter(new Callable() {
- public Object call() throws Exception {
- Reader r = extractor.extractText(stream, type, encoding);
- if (r != null) {
- if (discarded) {
- r.close();
- r = null;
- } else if (timedOut) {
- // spool a temp file to save memory
- r = getSwappedOutReader(r);
- }
- }
- return r;
- }
- });
+ this.encoding = encoding;
}
+ public boolean hasExtractedText() {
+ return text != null;
+ }
+
/**
* Returns the reader with the extracted text from the input stream passed
- * to the constructor of this TextExtractorJob. The caller of
- * this method is responsible for closing the returned reader. Returns
+ * to the constructor of this TextExtractorJob. Returns
* null if a timeoutoccurs while waiting for the
* text extractor to get the reader.
*
- * @return the Reader with the extracted text. Returns null if
- * a timeout or an exception occured extracting the text.
+ * @return the extracted text, or null if a timeout or
+ * an exception occurred while extracting the text
*/
- public Reader getReader(long timeout) {
- Reader reader = null;
- try {
- reader = (Reader) timedGet(timeout);
- } catch (InterruptedException e) {
- // also covers TimeoutException
- // text not extracted within timeout or interrupted
- if (timeout > 0) {
- log.debug("Text extraction for {} timed out (>{}ms).",
- type, new Long(timeout));
- timedOut = true;
- }
- } catch (InvocationTargetException e) {
- // extraction failed
- log.warn("Exception while indexing binary property: " + e.getCause());
- log.debug("Dump: ", e.getCause());
- }
- return reader;
- }
-
- /**
- * Discards this extractor job. If the reader within this job is ready at
- * the time of this call, it is closed. If the reader is not yet ready this
- * job will be flaged as discarded and any later call to
- * {@link #getReader(long)} will return null. The reader that
- * is about to be constructed by a background thread will be closed
- * automatically as soon as it becomes ready.
- */
- void discard() {
- discarded = true;
- Reader r = (Reader) peek();
- if (r != null) {
+ public synchronized String getExtractedText(long timeout) {
+ if (text == null) {
try {
- r.close();
- } catch (IOException e) {
- log.warn("Exception when trying to discard extractor job: " + e);
+ wait(timeout);
+ } catch (InterruptedException e) {
+ if (text == null) {
+ log.debug("Text extraction for {} timed out (> {}ms)",
+ type, timeout);
+ }
}
}
+ return text;
}
/**
@@ -168,69 +119,20 @@
* Runs the actual text extraction.
*/
public void run() {
- // forward to command
- cmd.run();
- }
-
- //----------------------------< internal >----------------------------------
-
- /**
- * Returns a Reader for r using a temp file.
- *
- * @param r the reader to swap out into a temp file.
- * @return a reader to the temp file.
- */
- private Reader getSwappedOutReader(Reader r) {
- final File temp;
try {
- temp = File.createTempFile("extractor", null);
- } catch (IOException e) {
- // unable to create temp file
- // return reader as is
- return r;
- }
- Writer out;
- try {
- out = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(temp), ENCODING_UTF8));
- } catch (IOException e) {
- // should never happend actually
- if (!temp.delete()) {
- temp.deleteOnExit();
- }
- return r;
- }
-
- // spool into temp file
- InputStream in = null;
- try {
try {
- IOUtils.copy(r, out);
- out.close();
+ Reader reader = extractor.extractText(stream, type, encoding);
+ this.text = IOUtils.toString(reader);
} finally {
- r.close();
+ stream.close();
}
- in = new LazyFileInputStream(temp);
-
- return new InputStreamReader(in, ENCODING_UTF8) {
- public void close() throws IOException {
- super.close();
- // delete file
- if (!temp.delete()) {
- temp.deleteOnExit();
- }
- }
- };
- } catch (IOException e) {
- // do some clean up
- IOUtils.closeQuietly(out);
- IOUtils.closeQuietly(in);
-
- if (!temp.delete()) {
- temp.deleteOnExit();
- }
- // use empty string reader as fallback
- return new StringReader("");
+ } catch (Throwable e) {
+ log.warn("Text extraction failed for type " + type, e);
+ this.text = "";
}
+ synchronized (this) {
+ notifyAll();
+ }
}
+
}
Index: jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java
===================================================================
--- jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java (Revision 793830)
+++ jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/PooledTextExtractor.java (Arbeitskopie)
@@ -20,9 +20,9 @@
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
+import java.io.IOException;
import java.io.Reader;
import java.io.InputStream;
-import java.io.IOException;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.Channel;
@@ -105,11 +105,17 @@
*