Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 942418) +++ lucene/CHANGES.txt (working copy) @@ -163,6 +163,13 @@ applications that have many unique terms, since it reduces how often a new segment must be flushed given a fixed RAM buffer size. +* LUCENE-1585: IndexWriter now accepts a PayloadProcessorProvider which can + return a DirPayloadProcessor for a given Directory, which returns a + PayloadProcessor for a given Term. The PayloadProcessor will be used to + process the payloads of the segments as they are merged (e.g. if one wants to + rewrite payloads of external indexes as they are added, or of local ones). + (Shai Erera, Michael Busch, Mike McCandless) + ======================= Lucene 3.x (not yet released) ======================= Changes in backwards compatibility policy Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 942418) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; import org.apache.lucene.search.Similarity; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; @@ -326,6 +327,9 @@ // to allow users to query an IndexWriter settings. private final IndexWriterConfig config; + // The PayloadProcessorProvider to use when segments are merged + private PayloadProcessorProvider payloadProcessorProvider; + /** * Expert: returns a readonly reader, covering all * committed as well as un-committed changes to the index. @@ -3319,7 +3323,7 @@ try { mergedName = newSegmentName(); - merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs); + merger = new SegmentMerger(directory, termIndexInterval, mergedName, null, codecs, payloadProcessorProvider); SegmentReader sReader = null; synchronized(this) { @@ -4340,7 +4344,7 @@ if (infoStream != null) message("merging " + merge.segString(directory)); - merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs); + merger = new SegmentMerger(directory, termIndexInterval, mergedName, merge, codecs, payloadProcessorProvider); merge.readers = new SegmentReader[numSegments]; merge.readersClone = new SegmentReader[numSegments]; @@ -4974,5 +4978,36 @@ deleter.deletePendingFiles(); deleter.revisitPolicy(); } + + + /** + * Sets the {@link PayloadProcessorProvider} to use when merging payloads. + * Note that the given pcp will be invoked for every segment that + * is merged, not only external ones that are given through + * {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}. + * If you want only the payloads of the external segments to be processed, you + * can return null whenever a {@link DirPayloadProcessor} is + * requested for the {@link Directory} of the {@link IndexWriter}. + *

+ * The default is null which means payloads are processed + * normally (copied) during segment merges. You can also unset it by passing + * null. + *

+ * NOTE: the set {@link PayloadProcessorProvider} will be in effect + * immediately, potentially for already running merges too. If you want to be + * sure it is used for further operations only, such as {@link #addIndexes} or + * {@link #optimize}, you can call {@link #waitForMerges()} before. + */ + public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) { + payloadProcessorProvider = pcp; + } + /** + * Returns the {@link PayloadProcessorProvider} that is used during segment + * merges to process payloads. + */ + public PayloadProcessorProvider getPayloadProcessorProvider() { + return payloadProcessorProvider; + } + } Index: lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) @@ -0,0 +1,81 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; + +/** + * Provides a {@link DirPayloadProcessor} to be used for a {@link Directory}. + * This allows using differnt {@link DirPayloadProcessor}s for different + * directories, for e.g. to perform different processing of payloads of + * different directories. + *

+ * NOTE: to avoid processing payloads of certain directories, you can + * return null in {@link #getDirProcessor}. + *

+ * NOTE: it is possible that the same {@link DirPayloadProcessor} will be + * requested for the same {@link Directory} concurrently. Therefore, to avoid + * concurrency issues you should return different instances for different + * threads. Usually, if your {@link DirPayloadProcessor} does not maintain state + * this is not a problem. The merge code ensures that the + * {@link DirPayloadProcessor} instance you return will be accessed by one + * thread to obtain the {@link PayloadProcessor}s for different terms. + * + * @lucene.experimental + */ +public abstract class PayloadProcessorProvider { + + /** + * Returns a {@link PayloadProcessor} for a given {@link Term} which allows + * processing the payloads of different terms differently. If you intent to + * process all your payloads the same way, then you can ignore the given term. + *

+ * NOTE: if you protect your {@link DirPayloadProcessor} from + * concurrency issues, then you shouldn't worry about any such issues when + * {@link PayloadProcessor}s are requested for different terms. + */ + public static abstract class DirPayloadProcessor { + + /** Returns a {@link PayloadProcessor} for the given term. */ + public abstract PayloadProcessor getProcessor(String field, BytesRef text) throws IOException; + + } + + /** + * Processes the given payload. + * + * @lucene.experimental + */ + public static abstract class PayloadProcessor { + + /** Process the incoming payload and stores the result in the given {@link BytesRef}. */ + public abstract void processPayload(BytesRef payload) throws IOException; + + } + + /** + * Returns a {@link DirPayloadProcessor} for the given {@link Directory}, + * through which {@link PayloadProcessor}s can be obtained for each + * {@link Term}, or null if none should be used. + */ + public abstract DirPayloadProcessor getDirProcessor(Directory dir) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadProcessorProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 942418) +++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -79,7 +79,10 @@ private Codec codec; private SegmentWriteState segmentWriteState; - SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs) { + private PayloadProcessorProvider payloadProcessorProvider; + + SegmentMerger(Directory dir, int termIndexInterval, String name, MergePolicy.OneMerge merge, CodecProvider codecs, PayloadProcessorProvider payloadProcessorProvider) { + this.payloadProcessorProvider = payloadProcessorProvider; directory = dir; this.codecs = codecs; segment = name; @@ -597,6 +600,9 @@ mergeState.delCounts = new int[mergeState.readerCount]; mergeState.docMaps = new int[mergeState.readerCount][]; mergeState.docBase = new int[mergeState.readerCount]; + mergeState.hasPayloadProcessorProvider = payloadProcessorProvider != null; + mergeState.dirPayloadProcessor = new PayloadProcessorProvider.DirPayloadProcessor[mergeState.readerCount]; + mergeState.currentPayloadProcessor = new PayloadProcessorProvider.PayloadProcessor[mergeState.readerCount]; docBase = 0; int inputDocBase = 0; @@ -629,6 +635,10 @@ } assert delCount == mergeState.delCounts[i]: "reader delCount=" + mergeState.delCounts[i] + " vs recomputed delCount=" + delCount; } + + if (payloadProcessorProvider != null) { + mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory()); + } } starts[mergeState.readerCount] = inputDocBase; Index: lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java =================================================================== --- lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java (revision 942418) +++ lucene/src/java/org/apache/lucene/index/codecs/MappingMultiDocsAndPositionsEnum.java (working copy) @@ -110,7 +110,11 @@ @Override public BytesRef getPayload() throws IOException { - return current.getPayload(); + BytesRef payload = current.getPayload(); + if (mergeState.currentPayloadProcessor[upto] != null) { + mergeState.currentPayloadProcessor[upto].processPayload(payload); + } + return payload; } @Override Index: lucene/src/java/org/apache/lucene/index/codecs/MergeState.java =================================================================== --- lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (revision 942418) +++ lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (working copy) @@ -20,6 +20,8 @@ import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; +import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor; import org.apache.lucene.util.Bits; import java.util.List; @@ -38,5 +40,10 @@ // Updated per field; public FieldInfo fieldInfo; + + // Used to process payloads + public boolean hasPayloadProcessorProvider; + public DirPayloadProcessor[] dirPayloadProcessor; + public PayloadProcessor[] currentPayloadProcessor; + } - Index: lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java =================================================================== --- lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java (revision 942418) +++ lucene/src/java/org/apache/lucene/index/codecs/TermsConsumer.java (working copy) @@ -85,6 +85,14 @@ postingsEnumIn = (MultiDocsAndPositionsEnum) termsEnum.docsAndPositions(mergeState.multiDeletedDocs, postingsEnumIn); if (postingsEnumIn != null) { postingsEnum.reset(postingsEnumIn); + // set PayloadProcessor + if (mergeState.hasPayloadProcessorProvider) { + for (int i = 0; i < mergeState.readerCount; i++) { + if (mergeState.dirPayloadProcessor[i] != null) { + mergeState.currentPayloadProcessor[i] = mergeState.dirPayloadProcessor[i].getProcessor(mergeState.fieldInfo.name, term); + } + } + } final PostingsConsumer postingsConsumer = startTerm(term); final int numDocs = postingsConsumer.merge(mergeState, postingsEnum); if (numDocs > 0) { Index: lucene/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestDoc.java (revision 942418) +++ lucene/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -186,7 +186,7 @@ SegmentReader r1 = SegmentReader.get(true, si1, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR); SegmentReader r2 = SegmentReader.get(true, si2, IndexReader.DEFAULT_TERMS_INDEX_DIVISOR); - SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault()); + SegmentMerger merger = new SegmentMerger(si1.dir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, merged, null, CodecProvider.getDefault(), null); merger.add(r1); merger.add(r2); Index: lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0) @@ -0,0 +1,269 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; +import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCaseJ4; +import org.junit.Test; + +public class TestPayloadProcessorProvider extends LuceneTestCaseJ4 { + + private static final class PerDirPayloadProcessor extends PayloadProcessorProvider { + + private Map processors; + + public PerDirPayloadProcessor(Map processors) { + this.processors = processors; + } + + @Override + public DirPayloadProcessor getDirProcessor(Directory dir) throws IOException { + return processors.get(dir); + } + + } + + private static final class PerTermPayloadProcessor extends DirPayloadProcessor { + + @Override + public PayloadProcessor getProcessor(String field, BytesRef text) throws IOException { + // don't process payloads of terms other than "p:p1" + if (!field.equals("p") || !text.bytesEquals(new BytesRef("p1"))) { + return null; + } + + // All other terms are processed the same way + return new DeletePayloadProcessor(); + } + + } + + /** deletes the incoming payload */ + private static final class DeletePayloadProcessor extends PayloadProcessor { + + @Override + public void processPayload(BytesRef payload) throws IOException { + payload.length = 0; + } + + } + + private static final class PayloadTokenStream extends TokenStream { + + private final PayloadAttribute payload = addAttribute(PayloadAttribute.class); + private final CharTermAttribute term = addAttribute(CharTermAttribute.class); + + private boolean called = false; + private String t; + + public PayloadTokenStream(String t) { + this.t = t; + } + + @Override + public boolean incrementToken() throws IOException { + if (called) { + return false; + } + + called = true; + byte[] p = new byte[] { 1 }; + payload.setPayload(new Payload(p)); + term.append(t); + return true; + } + + @Override + public void reset() throws IOException { + super.reset(); + called = false; + term.setEmpty(); + } + } + + private static final int NUM_DOCS = 10; + + private IndexWriterConfig getConfig() { + return new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer( + TEST_VERSION_CURRENT)); + } + + private void populateDirs(Directory[] dirs, boolean multipleCommits) + throws IOException { + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new MockRAMDirectory(); + populateDocs(dirs[i], multipleCommits); + verifyPayloadExists(dirs[i], "p", new BytesRef("p1"), NUM_DOCS); + verifyPayloadExists(dirs[i], "p", new BytesRef("p2"), NUM_DOCS); + } + } + + private void populateDocs(Directory dir, boolean multipleCommits) + throws IOException { + IndexWriter writer = new IndexWriter(dir, getConfig()); + TokenStream payloadTS1 = new PayloadTokenStream("p1"); + TokenStream payloadTS2 = new PayloadTokenStream("p2"); + for (int i = 0; i < NUM_DOCS; i++) { + Document doc = new Document(); + doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); + doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED)); + doc.add(new Field("p", payloadTS1)); + doc.add(new Field("p", payloadTS2)); + writer.addDocument(doc); + if (multipleCommits && (i % 4 == 0)) { + writer.commit(); + } + } + writer.close(); + } + + private void verifyPayloadExists(Directory dir, String field, BytesRef text, int numExpected) + throws IOException { + IndexReader reader = IndexReader.open(dir); + try { + int numPayloads = 0; + DocsAndPositionsEnum tpe = MultiFields.getTermPositionsEnum(reader, null, field, text); + while (tpe.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + tpe.nextPosition(); + if (tpe.hasPayload()) { + BytesRef payload = tpe.getPayload(); + assertEquals(1, payload.length); + assertEquals(1, payload.bytes[0]); + ++numPayloads; + } + } + assertEquals(numExpected, numPayloads); + } finally { + reader.close(); + } + } + + private void doTest(boolean addIndexesNoOptimize, boolean addToEmptyIndex, + int numExpectedPayloads, boolean multipleCommits) throws IOException { + Directory[] dirs = new Directory[2]; + populateDirs(dirs, multipleCommits); + + Directory dir = new MockRAMDirectory(); + if (!addToEmptyIndex) { + populateDocs(dir, multipleCommits); + verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS); + verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS); + } + + // Add two source dirs. By not adding the dest dir, we ensure its payloads + // won't get processed. + Map processors = new HashMap(); + for (Directory d : dirs) { + processors.put(d, new PerTermPayloadProcessor()); + } + IndexWriter writer = new IndexWriter(dir, getConfig()); + writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors)); + + if (!addIndexesNoOptimize) { + IndexReader[] readers = new IndexReader[dirs.length]; + for (int i = 0; i < readers.length; i++) { + readers[i] = IndexReader.open(dirs[i]); + } + try { + writer.addIndexes(readers); + } finally { + for (IndexReader r : readers) { + r.close(); + } + } + } else { + writer.addIndexesNoOptimize(dirs); + } + writer.close(); + verifyPayloadExists(dir, "p", new BytesRef("p1"), numExpectedPayloads); + // the second term should always have all payloads + numExpectedPayloads = NUM_DOCS * dirs.length + + (addToEmptyIndex ? 0 : NUM_DOCS); + verifyPayloadExists(dir, "p", new BytesRef("p2"), numExpectedPayloads); + } + + @Test + public void testAddIndexes() throws Exception { + // addIndexes - single commit in each + doTest(false, true, 0, false); + + // addIndexes - multiple commits in each + doTest(false, true, 0, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, true, 0, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, true, 0, true); + } + + @Test + public void testAddIndexesIntoExisting() throws Exception { + // addIndexes - single commit in each + doTest(false, false, NUM_DOCS, false); + + // addIndexes - multiple commits in each + doTest(false, false, NUM_DOCS, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, false, NUM_DOCS, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, false, NUM_DOCS, true); + } + + @Test + public void testRegularMerges() throws Exception { + Directory dir = new MockRAMDirectory(); + populateDocs(dir, true); + verifyPayloadExists(dir, "p", new BytesRef("p1"), NUM_DOCS); + verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS); + + // Add two source dirs. By not adding the dest dir, we ensure its payloads + // won't get processed. + Map processors = new HashMap(); + processors.put(dir, new PerTermPayloadProcessor()); + IndexWriter writer = new IndexWriter(dir, getConfig()); + writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors)); + writer.optimize(); + writer.close(); + + verifyPayloadExists(dir, "p", new BytesRef("p1"), 0); + verifyPayloadExists(dir, "p", new BytesRef("p2"), NUM_DOCS); + } + +} Property changes on: lucene\src\test\org\apache\lucene\index\TestPayloadProcessorProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 942418) +++ lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -65,7 +65,7 @@ } public void testMerge() throws IOException { - SegmentMerger merger = new SegmentMerger(mergedDir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, mergedSegment, null, CodecProvider.getDefault()); + SegmentMerger merger = new SegmentMerger(mergedDir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, mergedSegment, null, CodecProvider.getDefault(), null); merger.add(reader1); merger.add(reader2); int docsMerged = merger.merge();