Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 943975) +++ lucene/CHANGES.txt (working copy) @@ -292,6 +292,13 @@ can be used to prevent commits from ever getting deleted from the index. (Shai Erera) +* LUCENE-1585: IndexWriter now accepts a PayloadProcessorProvider which can + return a DirPayloadProcessor for a given Directory, which returns a + PayloadProcessor for a given Term. The PayloadProcessor will be used to + process the payloads of the segments as they are merged (e.g. if one wants to + rewrite payloads of external indexes as they are added, or of local ones). + (Shai Erera, Michael Busch) + Optimizations * LUCENE-2075: Terms dict cache is now shared across threads instead Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 943975) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; import org.apache.lucene.search.Similarity; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; @@ -324,6 +325,9 @@ // to allow users to query an IndexWriter settings. private final IndexWriterConfig config; + // The PayloadProcessorProvider to use when segments are merged + private PayloadProcessorProvider payloadProcessorProvider; + /** * Expert: returns a readonly reader, covering all * committed as well as un-committed changes to the index. @@ -4954,5 +4958,35 @@ deleter.deletePendingFiles(); deleter.revisitPolicy(); } + + /** + * Sets the {@link PayloadProcessorProvider} to use when merging payloads. + * Note that the given pcp will be invoked for every segment that + * is merged, not only external ones that are given through + * {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}. + * If you want only the payloads of the external segments to be processed, you + * can return null whenever a {@link DirPayloadProcessor} is + * requested for the {@link Directory} of the {@link IndexWriter}. + *

+ * The default is null which means payloads are processed + * normally (copied) during segment merges. You can also unset it by passing + * null. + *

+ * NOTE: the set {@link PayloadProcessorProvider} will be in effect + * immediately, potentially for already running merges too. If you want to be + * sure it is used for further operations only, such as {@link #addIndexes} or + * {@link #optimize}, you can call {@link #waitForMerges()} before. + */ + public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) { + payloadProcessorProvider = pcp; + } + /** + * Returns the {@link PayloadProcessorProvider} that is used during segment + * merges to process payloads. + */ + public PayloadProcessorProvider getPayloadProcessorProvider() { + return payloadProcessorProvider; + } + } Index: lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadProcessorProvider.java (revision 0) @@ -0,0 +1,89 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.Directory; + +/** + * Provides a {@link DirPayloadProcessor} to be used for a {@link Directory}. + * This allows using differnt {@link DirPayloadProcessor}s for different + * directories, for e.g. to perform different processing of payloads of + * different directories. + *

+ * NOTE: to avoid processing payloads of certain directories, you can + * return null in {@link #getDirProcessor}. + *

+ * NOTE: it is possible that the same {@link DirPayloadProcessor} will be + * requested for the same {@link Directory} concurrently. Therefore, to avoid + * concurrency issues you should return different instances for different + * threads. Usually, if your {@link DirPayloadProcessor} does not maintain state + * this is not a problem. The merge code ensures that the + * {@link DirPayloadProcessor} instance you return will be accessed by one + * thread to obtain the {@link PayloadProcessor}s for different terms. + * + * @lucene.experimental + */ +public abstract class PayloadProcessorProvider { + + /** + * Returns a {@link PayloadProcessor} for a given {@link Term} which allows + * processing the payloads of different terms differently. If you intent to + * process all your payloads the same way, then you can ignore the given term. + *

+ * NOTE: if you protect your {@link DirPayloadProcessor} from + * concurrency issues, then you shouldn't worry about any such issues when + * {@link PayloadProcessor}s are requested for different terms. + */ + public static abstract class DirPayloadProcessor { + + /** Returns a {@link PayloadProcessor} for the given term. */ + public abstract PayloadProcessor getProcessor(Term term) throws IOException; + + } + + /** + * Processes the given payload. One should call {@link #payloadLength()} to + * get the length of the processed payload. + * + * @lucene.experimental + */ + public static abstract class PayloadProcessor { + + /** Returns the length of the payload that was returned by {@link #processPayload}. */ + public abstract int payloadLength() throws IOException; + + /** + * Process the incoming payload and returns the resulting byte[]. Note that + * a new array might be allocated if the given array is not big enough. The + * length of the new payload data can be obtained via + * {@link #payloadLength()}. + */ + public abstract byte[] processPayload(byte[] payload, int start, int length) throws IOException; + + } + + /** + * Returns a {@link DirPayloadProcessor} for the given {@link Directory}, + * through which {@link PayloadProcessor}s can be obtained for each + * {@link Term}, or null if none should be used. + */ + public abstract DirPayloadProcessor getDirProcessor(Directory dir) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadProcessorProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java (revision 943975) +++ lucene/src/java/org/apache/lucene/index/SegmentMergeInfo.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; + final class SegmentMergeInfo { Term term; int base; @@ -28,7 +30,8 @@ int delCount; private TermPositions postings; // use getPositions() private int[] docMap; // use getDocMap() - + DirPayloadProcessor dirPayloadProcessor; + SegmentMergeInfo(int b, TermEnum te, IndexReader r) throws IOException { base = b; Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 943975) +++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -26,6 +26,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader.FieldOption; import org.apache.lucene.index.MergePolicy.MergeAbortedException; +import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -67,12 +68,15 @@ when merging stored fields */ private final static int MAX_RAW_MERGE_DOCS = 4192; + private final PayloadProcessorProvider pcp; + /** This ctor used only by test code. * * @param dir The Directory to merge the other segments into * @param name The name of the new segment */ SegmentMerger(Directory dir, String name) { + pcp = null; directory = dir; segment = name; checkAbort = new CheckAbort(null, null) { @@ -84,6 +88,7 @@ } SegmentMerger(IndexWriter writer, String name, MergePolicy.OneMerge merge) { + pcp = writer.getPayloadProcessorProvider(); directory = writer.getDirectory(); segment = name; if (merge != null) { @@ -135,6 +140,7 @@ * into the directory passed to the constructor. * @param mergeDocStores if false, we will not merge the * stored fields nor vectors files + * payloads before they are written * @return The number of documents that were merged * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error @@ -558,15 +564,15 @@ SegmentWriteState state = new SegmentWriteState(null, directory, segment, null, mergedDocs, 0, termIndexInterval); - final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); + final FormatPostingsFieldsConsumer fieldsConsumer = new FormatPostingsFieldsWriter(state, fieldInfos); try { queue = new SegmentMergeQueue(readers.size()); - mergeTermInfos(consumer); + mergeTermInfos(fieldsConsumer); } finally { - consumer.finish(); + fieldsConsumer.finish(); if (queue != null) queue.close(); } } @@ -580,6 +586,9 @@ IndexReader reader = readers.get(i); TermEnum termEnum = reader.terms(); SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); + if (pcp != null) { + smi.dirPayloadProcessor = pcp.getDirProcessor(reader.directory()); + } int[] docMap = smi.getDocMap(); if (docMap != null) { if (docMaps == null) { @@ -672,6 +681,10 @@ int[] docMap = smi.getDocMap(); postings.seek(smi.termEnum); + PayloadProcessor payloadProcessor = null; + if (smi.dirPayloadProcessor != null) { + payloadProcessor = smi.dirPayloadProcessor.getProcessor(smi.term); + } while (postings.next()) { df++; int doc = postings.doc(); @@ -685,11 +698,15 @@ if (!omitTermFreqAndPositions) { for (int j = 0; j < freq; j++) { final int position = postings.nextPosition(); - final int payloadLength = postings.getPayloadLength(); + int payloadLength = postings.getPayloadLength(); if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) payloadBuffer = new byte[payloadLength]; postings.getPayload(payloadBuffer, 0); + if (payloadProcessor != null) { + payloadBuffer = payloadProcessor.processPayload(payloadBuffer, 0, payloadLength); + payloadLength = payloadProcessor.payloadLength(); + } } posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); } Index: lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestPayloadProcessorProvider.java (revision 0) @@ -0,0 +1,273 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor; +import org.apache.lucene.index.PayloadProcessorProvider.PayloadProcessor; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.LuceneTestCaseJ4; +import org.junit.Test; + +public class TestPayloadProcessorProvider extends LuceneTestCaseJ4 { + + private static final class PerDirPayloadProcessor extends PayloadProcessorProvider { + + private Map processors; + + public PerDirPayloadProcessor(Map processors) { + this.processors = processors; + } + + @Override + public DirPayloadProcessor getDirProcessor(Directory dir) throws IOException { + return processors.get(dir); + } + + } + + private static final class PerTermPayloadProcessor extends DirPayloadProcessor { + + @Override + public PayloadProcessor getProcessor(Term term) throws IOException { + // don't process payloads of terms other than "p:p1" + if (!term.field().equals("p") || !term.text().equals("p1")) { + return null; + } + + // All other terms are processed the same way + return new DeletePayloadProcessor(); + } + + } + + /** deletes the incoming payload */ + private static final class DeletePayloadProcessor extends PayloadProcessor { + + @Override + public int payloadLength() throws IOException { + return 0; + } + + @Override + public byte[] processPayload(byte[] payload, int start, int length) throws IOException { + return payload; + } + + } + + private static final class PayloadTokenStream extends TokenStream { + + private final PayloadAttribute payload = addAttribute(PayloadAttribute.class); + private final CharTermAttribute term = addAttribute(CharTermAttribute.class); + + private boolean called = false; + private String t; + + public PayloadTokenStream(String t) { + this.t = t; + } + + @Override + public boolean incrementToken() throws IOException { + if (called) { + return false; + } + + called = true; + byte[] p = new byte[] { 1 }; + payload.setPayload(new Payload(p)); + term.append(t); + return true; + } + + @Override + public void reset() throws IOException { + super.reset(); + called = false; + term.setEmpty(); + } + } + + private static final int NUM_DOCS = 10; + + private IndexWriterConfig getConfig() { + return new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer( + TEST_VERSION_CURRENT)); + } + + private void populateDirs(Directory[] dirs, boolean multipleCommits) + throws IOException { + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new MockRAMDirectory(); + populateDocs(dirs[i], multipleCommits); + verifyPayloadExists(dirs[i], new Term("p", "p1"), NUM_DOCS); + verifyPayloadExists(dirs[i], new Term("p", "p2"), NUM_DOCS); + } + } + + private void populateDocs(Directory dir, boolean multipleCommits) + throws IOException { + IndexWriter writer = new IndexWriter(dir, getConfig()); + TokenStream payloadTS1 = new PayloadTokenStream("p1"); + TokenStream payloadTS2 = new PayloadTokenStream("p2"); + for (int i = 0; i < NUM_DOCS; i++) { + Document doc = new Document(); + doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); + doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED)); + doc.add(new Field("p", payloadTS1)); + doc.add(new Field("p", payloadTS2)); + writer.addDocument(doc); + if (multipleCommits && (i % 4 == 0)) { + writer.commit(); + } + } + writer.close(); + } + + private void verifyPayloadExists(Directory dir, Term term, int numExpected) + throws IOException { + IndexReader reader = IndexReader.open(dir); + try { + int numPayloads = 0; + TermPositions tp = reader.termPositions(term); + while (tp.next()) { + tp.nextPosition(); + if (tp.isPayloadAvailable()) { + assertEquals(1, tp.getPayloadLength()); + byte[] p = new byte[tp.getPayloadLength()]; + tp.getPayload(p, 0); + assertEquals(1, p[0]); + ++numPayloads; + } + } + assertEquals(numExpected, numPayloads); + } finally { + reader.close(); + } + } + + private void doTest(boolean addIndexesNoOptimize, boolean addToEmptyIndex, + int numExpectedPayloads, boolean multipleCommits) throws IOException { + Directory[] dirs = new Directory[2]; + populateDirs(dirs, multipleCommits); + + Directory dir = new MockRAMDirectory(); + if (!addToEmptyIndex) { + populateDocs(dir, multipleCommits); + verifyPayloadExists(dir, new Term("p", "p1"), NUM_DOCS); + verifyPayloadExists(dir, new Term("p", "p2"), NUM_DOCS); + } + + // Add two source dirs. By not adding the dest dir, we ensure its payloads + // won't get processed. + Map processors = new HashMap(); + for (Directory d : dirs) { + processors.put(d, new PerTermPayloadProcessor()); + } + IndexWriter writer = new IndexWriter(dir, getConfig()); + writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors)); + + if (!addIndexesNoOptimize) { + IndexReader[] readers = new IndexReader[dirs.length]; + for (int i = 0; i < readers.length; i++) { + readers[i] = IndexReader.open(dirs[i]); + } + try { + writer.addIndexes(readers); + } finally { + for (IndexReader r : readers) { + r.close(); + } + } + } else { + writer.addIndexesNoOptimize(dirs); + } + writer.close(); + verifyPayloadExists(dir, new Term("p", "p1"), numExpectedPayloads); + // the second term should always have all payloads + numExpectedPayloads = NUM_DOCS * dirs.length + + (addToEmptyIndex ? 0 : NUM_DOCS); + verifyPayloadExists(dir, new Term("p", "p2"), numExpectedPayloads); + } + + @Test + public void testAddIndexes() throws Exception { + // addIndexes - single commit in each + doTest(false, true, 0, false); + + // addIndexes - multiple commits in each + doTest(false, true, 0, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, true, 0, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, true, 0, true); + } + + @Test + public void testAddIndexesIntoExisting() throws Exception { + // addIndexes - single commit in each + doTest(false, false, NUM_DOCS, false); + + // addIndexes - multiple commits in each + doTest(false, false, NUM_DOCS, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, false, NUM_DOCS, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, false, NUM_DOCS, true); + } + + @Test + public void testRegularMerges() throws Exception { + Directory dir = new MockRAMDirectory(); + populateDocs(dir, true); + verifyPayloadExists(dir, new Term("p", "p1"), NUM_DOCS); + verifyPayloadExists(dir, new Term("p", "p2"), NUM_DOCS); + + // Add two source dirs. By not adding the dest dir, we ensure its payloads + // won't get processed. + Map processors = new HashMap(); + processors.put(dir, new PerTermPayloadProcessor()); + IndexWriter writer = new IndexWriter(dir, getConfig()); + writer.setPayloadProcessorProvider(new PerDirPayloadProcessor(processors)); + writer.optimize(); + writer.close(); + + verifyPayloadExists(dir, new Term("p", "p1"), 0); + verifyPayloadExists(dir, new Term("p", "p2"), NUM_DOCS); + } + +} Property changes on: lucene\src\test\org\apache\lucene\index\TestPayloadProcessorProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native