Index: lucene/CHANGES.txt
===================================================================
--- lucene/CHANGES.txt (revision 942418)
+++ lucene/CHANGES.txt (working copy)
@@ -163,6 +163,13 @@
applications that have many unique terms, since it reduces how often
a new segment must be flushed given a fixed RAM buffer size.
+* LUCENE-1585: IndexWriter now accepts a PayloadProcessorProvider which can
+ return a DirPayloadProcessor for a given Directory, which returns a
+ PayloadProcessor for a given Term. The PayloadProcessor will be used to
+ process the payloads of the segments as they are merged (e.g. if one wants to
+ rewrite payloads of external indexes as they are added, or of local ones).
+ (Shai Erera, Michael Busch, Mike McCandless)
+
======================= Lucene 3.x (not yet released) =======================
Changes in backwards compatibility policy
Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 942418)
+++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy)
@@ -20,6 +20,7 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
@@ -326,6 +327,9 @@
// to allow users to query an IndexWriter settings.
private final IndexWriterConfig config;
+ // The PayloadProcessorProvider to use when segments are merged
+ private PayloadProcessorProvider payloadProcessorProvider;
+
/**
* Expert: returns a readonly reader, covering all
* committed as well as un-committed changes to the index.
@@ -3319,7 +3323,7 @@
try {
mergedName = newSegmentName();
- merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs);
+ merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs, payloadProcessorProvider);
SegmentReader sReader = null;
synchronized(this) {
@@ -4340,7 +4344,7 @@
if (infoStream != null)
message("merging " + merge.segString(directory));
- merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs);
+ merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs, payloadProcessorProvider);
merge.readers = new SegmentReader[numSegments];
merge.readersClone = new SegmentReader[numSegments];
@@ -4974,5 +4978,36 @@
deleter.deletePendingFiles();
deleter.revisitPolicy();
}
+
+
+ /**
+ * Sets the {@link PayloadProcessorProvider} to use when merging payloads.
+ * Note that the given pcp will be invoked for every segment that
+ * is merged, not only external ones that are given through
+ * {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}.
+ * If you want only the payloads of the external segments to be processed, you
+ * can return null whenever a {@link DirPayloadProcessor} is
+ * requested for the {@link Directory} of the {@link IndexWriter}.
+ *
+ * The default is null which means payloads are processed
+ * normally (copied) during segment merges. You can also unset it by passing
+ * null.
+ *
+ * NOTE: the set {@link PayloadProcessorProvider} will be in effect + * immediately, potentially for already running merges too. If you want to be + * sure it is used for further operations only, such as {@link #addIndexes} or + * {@link #optimize}, you can call {@link #waitForMerges()} before. + */ + public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) { + payloadProcessorProvider = pcp; + } + /** + * Returns the {@link PayloadProcessorProvider} that is used during segment + * merges to process payloads. + */ + public PayloadProcessorProvider getPayloadProcessorProvider() { + return payloadProcessorProvider; + } + } Index: lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) @@ -0,0 +1,81 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; + +/** + * Provides a {@link DirPayloadProcessor} to be used for a {@link Directory}. + * This allows using differnt {@link DirPayloadProcessor}s for different + * directories, for e.g. to perform different processing of payloads of + * different directories. + *
+ * NOTE: to avoid processing payloads of certain directories, you can
+ * return null in {@link #getDirProcessor}.
+ *
+ * NOTE: it is possible that the same {@link DirPayloadProcessor} will be + * requested for the same {@link Directory} concurrently. Therefore, to avoid + * concurrency issues you should return different instances for different + * threads. Usually, if your {@link DirPayloadProcessor} does not maintain state + * this is not a problem. The merge code ensures that the + * {@link DirPayloadProcessor} instance you return will be accessed by one + * thread to obtain the {@link PayloadProcessor}s for different terms. + * + * @lucene.experimental + */ +public abstract class PayloadProcessorProvider { + + /** + * Returns a {@link PayloadProcessor} for a given {@link Term} which allows + * processing the payloads of different terms differently. If you intent to + * process all your payloads the same way, then you can ignore the given term. + *
+ * NOTE: if you protect your {@link DirPayloadProcessor} from
+ * concurrency issues, then you shouldn't worry about any such issues when
+ * {@link PayloadProcessor}s are requested for different terms.
+ */
+ public static abstract class DirPayloadProcessor {
+
+ /** Returns a {@link PayloadProcessor} for the given term. */
+ public abstract PayloadProcessor getProcessor(String field, BytesRef text) throws IOException;
+
+ }
+
+ /**
+ * Processes the given payload.
+ *
+ * @lucene.experimental
+ */
+ public static abstract class PayloadProcessor {
+
+ /** Process the incoming payload and stores the result in the given {@link BytesRef}. */
+ public abstract void processPayload(BytesRef payload) throws IOException;
+
+ }
+
+ /**
+ * Returns a {@link DirPayloadProcessor} for the given {@link Directory},
+ * through which {@link PayloadProcessor}s can be obtained for each
+ * {@link Term}, or null if none should be used.
+ */
+ public abstract DirPayloadProcessor getDirProcessor(Directory dir) throws IOException;
+
+}
Property changes on: lucene\src\java\org\apache\lucene\index\PayloadProcessorProvider.java
___________________________________________________________________
Added: svn:keywords
+ Date Author Id Revision HeadURL
Added: svn:eol-style
+ native
Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 942418)
+++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy)
@@ -79,7 +79,10 @@
private Codec codec;
private SegmentWriteState segmentWriteState;
- SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs) {
+ private PayloadProcessorProvider payloadProcessorProvider;
+
+ SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs, PayloadProcessorProvider payloadProcessorProvider) {
+ this.payloadProcessorProvider = payloadProcessorProvider;
directory = dir;
this.codecs = codecs;
segment = name;
@@ -597,6 +600,8 @@
mergeState.delCounts = new int[mergeState.readerCount];
mergeState.docMaps = new int[mergeState.readerCount][];
mergeState.docBase = new int[mergeState.readerCount];
+ mergeState.dirPayloadProcessor = new PayloadProcessorProvider.DirPayloadProcessor[mergeState.readerCount];
+ mergeState.currentPayloadProcessor = new PayloadProcessorProvider.PayloadProcessor[mergeState.readerCount];
docBase = 0;
int inputDocBase = 0;
@@ -629,6 +634,10 @@
}
assert delCount == mergeState.delCounts[i]: "reader delCount=" + mergeState.delCounts[i] + " vs recomputed delCount=" + delCount;
}
+
+ if (payloadProcessorProvider != null) {
+ mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory());
+ }
}
starts[mergeState.readerCount] = inputDocBase;
Index: lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java (revision 942418)
+++ lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java (working copy)
@@ -110,7 +110,11 @@
@Override
public BytesRef getPayload() throws IOException {
- return current.getPayload();
+ BytesRef payload = current.getPayload();
+ if (mergeState.currentPayloadProcessor[upto] != null) {
+ mergeState.currentPayloadProcessor[upto].processPayload(payload);
+ }
+ return payload;
}
@Override
Index: lucene/src/java/org/apache/lucene/index/codecs/MergeState.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (revision 942418)
+++ lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (working copy)
@@ -20,6 +20,8 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
+import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
import org.apache.lucene.util.Bits;
import java.util.List;
@@ -38,5 +40,9 @@
// Updated per field;
public FieldInfo fieldInfo;
+
+ // Used to process payloads
+ public DirPayloadProcessor[] dirPayloadProcessor;
+ public PayloadProcessor[] currentPayloadProcessor;
+
}
-
Index: lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java
===================================================================
--- lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java (revision 942418)
+++ lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java (working copy)
@@ -85,6 +85,12 @@
postingsEnumIn = (MultiDocsAndPositionsEnum) termsEnum.docsAndPositions(mergeState.multiDeletedDocs, postingsEnumIn);
if (postingsEnumIn != null) {
postingsEnum.reset(postingsEnumIn);
+ // set PayloadProcessor
+ for (int i = 0; i < mergeState.readerCount; i++) {
+ if (mergeState.dirPayloadProcessor[i] != null) {
+ mergeState.currentPayloadProcessor[i] = mergeState.dirPayloadProcessor[i].getProcessor(mergeState.fieldInfo.name, term);
+ }
+ }
final PostingsConsumer postingsConsumer = startTerm(term);
final int numDocs = postingsConsumer.merge(mergeState, postingsEnum);
if (numDocs > 0) {
Index: lucene/src/test/org/apache/lucene/index/TestDoc.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestDoc.java (revision 942418)
+++ lucene/src/test/org/apache/lucene/index/TestDoc.java (working copy)
@@ -186,7 +186,7 @@
SegmentReader r1 = SegmentReader.get(true, si1, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR);
SegmentReader r2 = SegmentReader.get(true, si2, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR);
- SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault());
+ SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault(), null);
merger.add(r1);
merger.add(r2);
Index: lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java
===================================================================
--- lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0)
+++ lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0)
@@ -0,0 +1,269 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
+import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor;
+import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockRAMDirectory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCaseJ4;
+import org.junit.Test;
+
+public class TestPayloadProcessorProvider extends LuceneTestCaseJ4 {
+
+ private static final class PerDirPayloadProcessor extends PayloadProcessorProvider {
+
+ private Map