Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 941746) +++ lucene/CHANGES.txt (working copy) @@ -292,6 +292,12 @@ can be used to prevent commits from ever getting deleted from the index. (Shai Erera) +* LUCENE-1585: IndexWriterConfig now accepts a PayloadConsumerProvider which can + return a PayloadConsumder for a given Directory. The PayloadConsumer will be + used to process the payloads of the segments as they are merged (e.g. if one + wants to rewrite payloads of external indexes as they are added, or of local + ones). (Shai Erera) + Optimizations * LUCENE-2075: Terms dict cache is now shared across threads instead Index: lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (revision 941746) +++ lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (working copy) @@ -21,6 +21,7 @@ import org.apache.lucene.index.DocumentsWriter.IndexingChain; import org.apache.lucene.index.IndexWriter.IndexReaderWarmer; import org.apache.lucene.search.Similarity; +import org.apache.lucene.store.Directory; import org.apache.lucene.util.Version; /** @@ -122,6 +123,7 @@ private MergePolicy mergePolicy; private int maxThreadStates; private boolean readerPooling; + private PayloadConsumerProvider payloadConsumerProvider; // required for clone private Version matchVersion; @@ -555,6 +557,31 @@ return readerPooling; } + /** + * Sets the {@link PayloadConsumerProvider} to use when merging payloads. Note + * that the given pcp will be invoked for every segment that is + * merged, not only external ones that are given through + * {@link IndexWriter#addIndexes} or {@link IndexWriter#addIndexesNoOptimize}. + * If you want only the payloads of the external segments to be processed, you + * can return null whenever a {@link PayloadConsumer} is + * requested for the {@link Directory} of the {@link IndexWriter}. + *

+ * The default is null which means payloads are processed + * normally (copied) during segment merges. + */ + public IndexWriterConfig setPayloadConsumerProvider(PayloadConsumerProvider pcp) { + payloadConsumerProvider = pcp; + return this; + } + + /** + * Returns the {@link PayloadConsumerProvider} that is used during segment + * merges to process payloads. + */ + public PayloadConsumerProvider getPayloadConsumerProvider() { + return payloadConsumerProvider; + } + /** Expert: sets the {@link DocConsumer} chain to be used to process documents. */ IndexWriterConfig setIndexingChain(IndexingChain indexingChain) { this.indexingChain = indexingChain == null ? DocumentsWriter.defaultIndexingChain : indexingChain; @@ -565,7 +592,7 @@ IndexingChain getIndexingChain() { return indexingChain; } - + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -587,6 +614,7 @@ sb.append("mergePolicy=").append(mergePolicy).append("\n"); sb.append("maxThreadStates=").append(maxThreadStates).append("\n"); sb.append("readerPooling=").append(readerPooling).append("\n"); + sb.append("payloadConsumerProvider=").append(payloadConsumerProvider).append("\n"); return sb.toString(); } } Index: lucene/src/java/org/apache/lucene/index/PayloadConsumer.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) @@ -0,0 +1,43 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** + * Consumes the payload that is read during merging of external indexes, + * triggered via {@link IndexWriter#addIndexes} or + * {@link IndexWriter#addIndexesNoOptimize}. This allows one to modify an + * incoming payload from an external index before it is written to the target + * directory. + * + * @lucene.experimental + */ +public abstract class PayloadConsumer { + + /** Returns the length of the payload that was returned by {@link #processPayload}. */ + public abstract int payloadLength() throws IOException; + + /** + * Process the incoming payload and returns the resulting byte[]. Note that a + * new array might be allocated if the given array is not big enough. The + * length of the new payload data can be obtained via {@link #payloadLength()}. + */ + public abstract byte[] processPayload(byte[] payload, int start, int length) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/PayloadConsumerProvider.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadConsumerProvider.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadConsumerProvider.java (revision 0) @@ -0,0 +1,42 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.store.Directory; + +/** + * Provides a {@link PayloadConsumer} to be used for a given {@link Directory}. + * This allows using differnt {@link PayloadConsumer}s for different + * directories. + *

+ * NOTE: if you want to avoid processing payloads of certain directories, + * you can return null in {@link #getConsumer}. + * + * @lucene.experimental + */ +public abstract class PayloadConsumerProvider { + + /** + * Returns the {@link PayloadConsumer} to use for the given {@link Directory}, + * or null if none should be used. + */ + public abstract PayloadConsumer getConsumer(Directory dir) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadConsumerProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 941746) +++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -67,12 +67,15 @@ when merging stored fields */ private final static int MAX_RAW_MERGE_DOCS = 4192; + private final PayloadConsumerProvider pcp; + /** This ctor used only by test code. * * @param dir The Directory to merge the other segments into * @param name The name of the new segment */ SegmentMerger(Directory dir, String name) { + pcp = null; directory = dir; segment = name; checkAbort = new CheckAbort(null, null) { @@ -84,6 +87,7 @@ } SegmentMerger(IndexWriter writer, String name, MergePolicy.OneMerge merge) { + pcp = writer.getConfig().getPayloadConsumerProvider(); directory = writer.getDirectory(); segment = name; if (merge != null) { @@ -135,6 +139,7 @@ * into the directory passed to the constructor. * @param mergeDocStores if false, we will not merge the * stored fields nor vectors files + * payloads before they are written * @return The number of documents that were merged * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error @@ -558,15 +563,15 @@ SegmentWriteState state = new SegmentWriteState(null, directory, segment, null, mergedDocs, 0, termIndexInterval); - final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); + final FormatPostingsFieldsConsumer fieldsConsumer = new FormatPostingsFieldsWriter(state, fieldInfos); try { queue = new SegmentMergeQueue(readers.size()); - mergeTermInfos(consumer); + mergeTermInfos(fieldsConsumer); } finally { - consumer.finish(); + fieldsConsumer.finish(); if (queue != null) queue.close(); } } @@ -655,6 +660,7 @@ * * @param smis array of segments * @param n number of cells in the array actually occupied + * @param payloadConsumer if not null, will be used to process the incoming payload before it's written * @return number of documents across all segments where this term was found * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error @@ -672,6 +678,10 @@ int[] docMap = smi.getDocMap(); postings.seek(smi.termEnum); + PayloadConsumer payloadConsumer = null; + if (pcp != null) { + payloadConsumer = pcp.getConsumer(smi.reader.directory()); + } while (postings.next()) { df++; int doc = postings.doc(); @@ -685,11 +695,15 @@ if (!omitTermFreqAndPositions) { for (int j = 0; j < freq; j++) { final int position = postings.nextPosition(); - final int payloadLength = postings.getPayloadLength(); + int payloadLength = postings.getPayloadLength(); if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) payloadBuffer = new byte[payloadLength]; postings.getPayload(payloadBuffer, 0); + if (payloadConsumer != null) { + payloadBuffer = payloadConsumer.processPayload(payloadBuffer, 0, payloadLength); + payloadLength = payloadConsumer.payloadLength(); + } } posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); } Index: lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java (revision 941746) +++ lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java (working copy) @@ -104,6 +104,7 @@ getters.add("getMergePolicy"); getters.add("getMaxThreadStates"); getters.add("getReaderPooling"); + getters.add("getPayloadConsumerProvider"); for (Method m : IndexWriterConfig.class.getDeclaredMethods()) { if (m.getDeclaringClass() == IndexWriterConfig.class && m.getName().startsWith("get")) { assertTrue("method " + m.getName() + " is not tested for defaults", getters.contains(m.getName())); Index: lucene/src/test/org/apache/lucene/index/TestPayloadConsumerProvider.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestPayloadConsumerProvider.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestPayloadConsumerProvider.java (revision 0) @@ -0,0 +1,215 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.LuceneTestCaseJ4; +import org.junit.Test; + +public class TestPayloadConsumerProvider extends LuceneTestCaseJ4 { + + private static final class PerDirPayloadConsumer extends PayloadConsumerProvider { + + private Map consumers; + + public PerDirPayloadConsumer(Map consumers) { + this.consumers = consumers; + } + + @Override + public PayloadConsumer getConsumer(Directory dir) throws IOException { + return consumers.get(dir); + } + + } + + /** deletes the incoming payload */ + private static final class DeletePayloadConsumer extends PayloadConsumer { + + @Override + public int payloadLength() throws IOException { return 0; } + + @Override + public byte[] processPayload(byte[] payload, int start, int length) throws IOException { return payload; } + + } + + private static final class PayloadTokenStream extends TokenStream { + + private final PayloadAttribute payload = addAttribute(PayloadAttribute.class); + private final CharTermAttribute term = addAttribute(CharTermAttribute.class); + + private boolean called = false; + + @Override + public boolean incrementToken() throws IOException { + if (called) { + return false; + } + + called = true; + byte[] p = new byte[] { 1 }; + payload.setPayload(new Payload(p)); + term.append("term"); + return true; + } + + @Override + public void reset() throws IOException { + super.reset(); + called = false; + term.setEmpty(); + } + } + + private static final int NUM_DOCS = 10; + + private IndexWriterConfig getConfig() { + return new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer( + TEST_VERSION_CURRENT)); + } + + private void populateDirs(Directory[] dirs, boolean multipleCommits) throws IOException { + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new MockRAMDirectory(); + populateDocs(dirs[i], multipleCommits); + verifyPayloadExists(dirs[i], NUM_DOCS); + } + } + + private void populateDocs(Directory dir, boolean multipleCommits) throws IOException { + IndexWriter writer = new IndexWriter(dir, getConfig()); + TokenStream payloadTS = new PayloadTokenStream(); + for (int i = 0; i < NUM_DOCS; i++) { + Document doc = new Document(); + doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); + doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED)); + doc.add(new Field("payload", payloadTS)); + writer.addDocument(doc); + if (multipleCommits && (i % 4 == 0)) { + writer.commit(); + } + } + writer.close(); + } + + private void verifyPayloadExists(Directory dir, int numExpected) throws IOException { + IndexReader reader = IndexReader.open(dir); + try { + int numPayloads = 0; + TermPositions tp = reader.termPositions(new Term("payload", "term")); + while (tp.next()) { + tp.nextPosition(); + if (tp.isPayloadAvailable()) { + assertEquals(1, tp.getPayloadLength()); + byte[] p = new byte[tp.getPayloadLength()]; + tp.getPayload(p, 0); + assertEquals(1, p[0]); + ++numPayloads; + } + } + assertEquals(numExpected, numPayloads); + } finally { + reader.close(); + } + } + + private void doTest(boolean addIndexesNoOptimize, boolean addToEmptyIndex, + int numExpectedPayloads, boolean multipleCommits) + throws IOException { + Directory[] dirs = new Directory[2]; + populateDirs(dirs, multipleCommits); + + Directory dir = new MockRAMDirectory(); + if (!addToEmptyIndex) { + populateDocs(dir, false); + verifyPayloadExists(dir, NUM_DOCS); + } + + // Add two source dirs. By not adding the dest dir, we ensure its payloads won't get processed. + Map consumers = new HashMap(); + for (Directory d : dirs) { + consumers.put(d, new DeletePayloadConsumer()); + } + IndexWriter writer = new IndexWriter(dir, getConfig().setPayloadConsumerProvider(new PerDirPayloadConsumer(consumers))); + + if (!addIndexesNoOptimize) { + IndexReader[] readers = new IndexReader[dirs.length]; + for (int i = 0; i < readers.length; i++) { + readers[i] = IndexReader.open(dirs[i]); + } + try { + writer.addIndexes(readers); + } finally { + for (IndexReader r : readers) { + r.close(); + } + } + } else { + writer.addIndexesNoOptimize(dirs); + } + writer.close(); + verifyPayloadExists(dir, numExpectedPayloads); + } + + @Test + public void testAddIndexes() throws Exception { + // addIndexes - single commit in each + doTest(false, true, 0, false); + + // addIndexes - multiple commits in each + doTest(false, true, 0, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, true, 0, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, true, 0, true); + } + + @Test + public void testAddIndexesIntoExisting() throws Exception { + // addIndexes - single commit in each + doTest(false, false, NUM_DOCS, false); + + // addIndexes - multiple commits in each + doTest(false, false, NUM_DOCS, true); + + // addIndexesNoOptimize - single commit in each + doTest(true, false, NUM_DOCS, false); + + // addIndexesNoOptimize - multiple commits in each + doTest(true, false, NUM_DOCS, true); + } + +} Property changes on: lucene\src\test\org\apache\lucene\index\TestPayloadConsumerProvider.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native