Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 942498)
+++ lucene/CHANGES.txt (working copy)
@@ -292,6 +292,13 @@
can be used to prevent commits from ever getting deleted from the index.
(Shai Erera)
+* LUCENE-1585: IndexWriterConfig now accepts a PayloadProcessorProvider which
+ can return a DirPayloadProcessor for a given Directory, which returns a
+ PayloadProcessor for a given Term. The PayloadProcessor will be used to
+ process the payloads of the segments as they are merged (e.g. if one wants to
+ rewrite payloads of external indexes as they are added, or of local ones).
+ (Shai Erera, Michael Busch)
+
Optimizations
* LUCENE-2075: Terms dict cache is now shared across threads instead
Index: lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (revision 942498)
+++ lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (working copy)
@@ -20,7 +20,9 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.DocumentsWriter.IndexingChain;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Version;
/**
@@ -122,6 +124,7 @@
private MergePolicy mergePolicy;
private int maxThreadStates;
private boolean readerPooling;
+ private PayloadProcessorProvider payloadProcessorProvider;
// required for clone
private Version matchVersion;
@@ -555,6 +558,31 @@
return readerPooling;
}
+ /**
+ * Sets the {@link PayloadProcessorProvider} to use when merging payloads.
+ * Note that the given pcp will be invoked for every segment that
+ * is merged, not only external ones that are given through
+ * {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}.
+ * If you want only the payloads of the external segments to be processed, you
+ * can return null whenever a {@link DirPayloadProcessor} is
+ * requested for the {@link Directory} of the {@link IndexWriter}.
+ *
+ * The default is null which means payloads are processed
+ * normally (copied) during segment merges.
+ */
+ public IndexWriterConfig setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
+ payloadProcessorProvider = pcp;
+ return this;
+ }
+
+ /**
+ * Returns the {@link PayloadProcessorProvider} that is used during segment
+ * merges to process payloads.
+ */
+ public PayloadProcessorProvider getPayloadProcessorProvider() {
+ return payloadProcessorProvider;
+ }
+
/** Expert: sets the {@link DocConsumer} chain to be used to process documents. */
IndexWriterConfig setIndexingChain(IndexingChain indexingChain) {
this.indexingChain = indexingChain == null ? DocumentsWriter.defaultIndexingChain : indexingChain;
@@ -565,7 +593,7 @@
IndexingChain getIndexingChain() {
return indexingChain;
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -587,6 +615,7 @@
sb.append("mergePolicy=").append(mergePolicy).append("\n");
sb.append("maxThreadStates=").append(maxThreadStates).append("\n");
sb.append("readerPooling=").append(readerPooling).append("\n");
+ sb.append("payloadProcessorProvider=").append(payloadProcessorProvider).append("\n");
return sb.toString();
}
}
Index: lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0)
+++ lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0)
@@ -0,0 +1,89 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+
+/**
+ * Provides a {@link DirPayloadProcessor} to be used for a {@link Directory}.
+ * This allows using differnt {@link DirPayloadProcessor}s for different
+ * directories, for e.g. to perform different processing of payloads of
+ * different directories.
+ *
+ * NOTE: to avoid processing payloads of certain directories, you can
+ * return null in {@link #getDirProcessor}.
+ *
+ * NOTE: it is possible that the same {@link DirPayloadProcessor} will be + * requested for the same {@link Directory} concurrently. Therefore, to avoid + * concurrency issues you should return different instances for different + * threads. Usually, if your {@link DirPayloadProcessor} does not maintain state + * this is not a problem. The merge code ensures that the + * {@link DirPayloadProcessor} instance you return will be accessed by one + * thread to obtain the {@link PayloadProcessor}s for different terms. + * + * @lucene.experimental + */ +public abstract class PayloadProcessorProvider { + + /** + * Returns a {@link PayloadProcessor} for a given {@link Term} which allows + * processing the payloads of different terms differently. If you intent to + * process all your payloads the same way, then you can ignore the given term. + *
+ * NOTE: if you protect your {@link DirPayloadProcessor} from
+ * concurrency issues, then you shouldn't worry about any such issues when
+ * {@link PayloadProcessor}s are requested for different terms.
+ */
+ public static abstract class DirPayloadProcessor {
+
+ /** Returns a {@link PayloadProcessor} for the given term. */
+ public abstract PayloadProcessor getProcessor(Term term) throws IOException;
+
+ }
+
+ /**
+ * Processes the given payload. One should call {@link #payloadLength()} to
+ * get the length of the processed payload.
+ *
+ * @lucene.experimental
+ */
+ public static abstract class PayloadProcessor {
+
+ /** Returns the length of the payload that was returned by {@link #processPayload}. */
+ public abstract int payloadLength() throws IOException;
+
+ /**
+ * Process the incoming payload and returns the resulting byte[]. Note that
+ * a new array might be allocated if the given array is not big enough. The
+ * length of the new payload data can be obtained via
+ * {@link #payloadLength()}.
+ */
+ public abstract byte[] processPayload(byte[] payload, int start, int length) throws IOException;
+
+ }
+
+ /**
+ * Returns a {@link DirPayloadProcessor} for the given {@link Directory},
+ * through which {@link PayloadProcessor}s can be obtained for each
+ * {@link Term}, or null if none should be used.
+ */
+ public abstract DirPayloadProcessor getDirProcessor(Directory dir) throws IOException;
+
+}
Property changes on: lucene\src\java\org\apache\lucene\index\PayloadProcessorProvider.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 942498)
+++ lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy)
@@ -19,6 +19,8 @@
import java.io.IOException;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
+
final class SegmentMergeInfo {
Term term;
int base;
@@ -28,7 +30,8 @@
int delCount;
private TermPositions postings; // use getPositions()
private int[] docMap; // use getDocMap()
-
+ DirPayloadProcessor dirPayloadProcessor;
+
SegmentMergeInfo(int b, TermEnum te, IndexReader r)
throws IOException {
base = b;
Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 942498)
+++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader.FieldOption;
import org.apache.lucene.index.MergePolicy.MergeAbortedException;
+import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@@ -67,12 +68,15 @@
when merging stored fields */
private final static int MAX_RAW_MERGE_DOCS = 4192;
+ private final PayloadProcessorProvider pcp;
+
/** This ctor used only by test code.
*
* @param dir The Directory to merge the other segments into
* @param name The name of the new segment
*/
SegmentMerger(Directory dir, String name) {
+ pcp = null;
directory = dir;
segment = name;
checkAbort = new CheckAbort(null, null) {
@@ -84,6 +88,7 @@
}
SegmentMerger(IndexWriter writer, String name, MergePolicy.OneMerge merge) {
+ pcp = writer.getConfig().getPayloadProcessorProvider();
directory = writer.getDirectory();
segment = name;
if (merge != null) {
@@ -135,6 +140,7 @@
* into the directory passed to the constructor.
* @param mergeDocStores if false, we will not merge the
* stored fields nor vectors files
+ * payloads before they are written
* @return The number of documents that were merged
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
@@ -558,15 +564,15 @@
SegmentWriteState state = new SegmentWriteState(null, directory, segment, null, mergedDocs, 0, termIndexInterval);
- final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos);
+ final FormatPostingsFieldsConsumer fieldsConsumer = new FormatPostingsFieldsWriter(state, fieldInfos);
try {
queue = new SegmentMergeQueue(readers.size());
- mergeTermInfos(consumer);
+ mergeTermInfos(fieldsConsumer);
} finally {
- consumer.finish();
+ fieldsConsumer.finish();
if (queue != null) queue.close();
}
}
@@ -580,6 +586,9 @@
IndexReader reader = readers.get(i);
TermEnum termEnum = reader.terms();
SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader);
+ if (pcp != null) {
+ smi.dirPayloadProcessor = pcp.getDirProcessor(reader.directory());
+ }
int[] docMap = smi.getDocMap();
if (docMap != null) {
if (docMaps == null) {
@@ -672,6 +681,10 @@
int[] docMap = smi.getDocMap();
postings.seek(smi.termEnum);
+ PayloadProcessor payloadProcessor = null;
+ if (smi.dirPayloadProcessor != null) {
+ payloadProcessor = smi.dirPayloadProcessor.getProcessor(smi.term);
+ }
while (postings.next()) {
df++;
int doc = postings.doc();
@@ -685,11 +698,15 @@
if (!omitTermFreqAndPositions) {
for (int j = 0; j < freq; j++) {
final int position = postings.nextPosition();
- final int payloadLength = postings.getPayloadLength();
+ int payloadLength = postings.getPayloadLength();
if (payloadLength > 0) {
if (payloadBuffer == null || payloadBuffer.length < payloadLength)
payloadBuffer = new byte[payloadLength];
postings.getPayload(payloadBuffer, 0);
+ if (payloadProcessor != null) {
+ payloadBuffer = payloadProcessor.processPayload(payloadBuffer, 0, payloadLength);
+ payloadLength = payloadProcessor.payloadLength();
+ }
}
posConsumer.addPosition(position, payloadBuffer, 0, payloadLength);
}
Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java (revision 942498)
+++ lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java (working copy)
@@ -104,6 +104,7 @@
getters.add("getMergePolicy");
getters.add("getMaxThreadStates");
getters.add("getReaderPooling");
+ getters.add("getPayloadProcessorProvider");
for (Method m : IndexWriterConfig.class.getDeclaredMethods()) {
if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) {
assertTrue("method " + m.getName() + " is not tested for defaults", getters.contains(m.getName()));
Index: lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0)
+++ lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0)
@@ -0,0 +1,273 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
+import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.util.LuceneTestCaseJ4;
+import org.junit.Test;
+
+public class TestPayloadProcessorProvider extends LuceneTestCaseJ4 {
+
+ private static final class PerDirPayloadProcessor extends PayloadProcessorProvider {
+
+ private Map