Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 941608) +++ lucene/CHANGES.txt (working copy) @@ -292,6 +292,10 @@ can be used to prevent commits from ever getting deleted from the index. (Shai Erera) +* LUCENE-1585: IndexWriter.addIndexes* now accept a PayloadConsumer which will + be used to process the incoming payloads before they are written to the + Directory. (Shai Erera, Michael Busch) + Optimizations * LUCENE-2075: Terms dict cache is now shared across threads instead Index: lucene/backwards/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/backwards/src/java/org/apache/lucene/index/IndexWriter.java (revision 941608) +++ lucene/backwards/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -3225,7 +3225,7 @@ for (int i = 0; i < readers.length; i++) // add new indexes merger.add(readers[i]); - int docCount = merger.merge(); // merge 'em + int docCount = merger.merge(null); // merge 'em synchronized(this) { segmentInfos.clear(); // pop old infos & add new @@ -4269,7 +4269,7 @@ } // This is where all the work happens: - mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); + mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores, null); assert mergedDocCount == totDocCount; Index: lucene/backwards/src/java/org/apache/lucene/index/PayloadConsumer.java =================================================================== --- lucene/backwards/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) +++ lucene/backwards/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) @@ -0,0 +1,29 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** A FAKE PayloadConsumer -- NEEDED for TESTS ONLY !! */ +public abstract class PayloadConsumer { + + public abstract int payloadLength() throws IOException; + + public abstract byte[] processPayload(byte[] payload, int start, int length) throws IOException; + +} Property changes on: lucene\backwards\src\java\org\apache\lucene\index\PayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/backwards/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/backwards/src/java/org/apache/lucene/index/SegmentMerger.java (revision 941608) +++ lucene/backwards/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -24,6 +24,7 @@ import java.util.List; import org.apache.lucene.document.Document; +import org.apache.lucene.index.PayloadConsumer; import org.apache.lucene.index.IndexReader.FieldOption; import org.apache.lucene.index.MergePolicy.MergeAbortedException; import org.apache.lucene.store.Directory; @@ -126,8 +127,8 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge() throws CorruptIndexException, IOException { - return merge(true); + final int merge(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { + return merge(true, payloadConsumer); } /** @@ -139,7 +140,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge(boolean mergeDocStores) throws CorruptIndexException, IOException { + final int merge(boolean mergeDocStores, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { this.mergeDocStores = mergeDocStores; Index: lucene/backwards/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/backwards/src/test/org/apache/lucene/index/TestDoc.java (revision 941608) +++ lucene/backwards/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -184,7 +184,7 @@ merger.add(r1); merger.add(r2); - merger.merge(); + merger.merge(null); merger.closeReaders(); if (useCompoundFile) { Index: lucene/backwards/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/backwards/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 941608) +++ lucene/backwards/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -66,7 +66,7 @@ SegmentMerger merger = new SegmentMerger(mergedDir, mergedSegment); merger.add(reader1); merger.add(reader2); - int docsMerged = merger.merge(); + int docsMerged = merger.merge(null); merger.closeReaders(); assertTrue(docsMerged == 2); //Should be able to open a new SegmentReader against the new directory Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 941608) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -3095,6 +3095,16 @@ * @throws IOException if there is a low-level IO error */ public void addIndexesNoOptimize(Directory... dirs) + throws CorruptIndexException, IOException { + addIndexesNoOptimize(null, dirs); + } + + /** + * Like {@link #addIndexesNoOptimize(Directory...)}, only allows passing a + * {@link PayloadConsumer} which will be used to process the payloads before + * they are written. + */ + public void addIndexesNoOptimize(PayloadConsumer payloadConsumer, Directory... dirs) throws CorruptIndexException, IOException { ensureOpen(); @@ -3148,7 +3158,7 @@ // over into our index. This is necessary (before // finishing the transaction) to avoid leaving the // index in an unusable (inconsistent) state. - resolveExternalSegments(); + resolveExternalSegments(payloadConsumer); ensureOpen(); @@ -3181,7 +3191,7 @@ * We don't return until the SegmentInfos has no more * external segments. Currently this is only used by * addIndexesNoOptimize(). */ - private void resolveExternalSegments() throws CorruptIndexException, IOException { + private void resolveExternalSegments(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { boolean any = false; @@ -3239,7 +3249,7 @@ if (merge != null) { any = true; - merge(merge); + merge(merge, payloadConsumer); } } @@ -3270,6 +3280,16 @@ * @throws IOException if there is a low-level IO error */ public void addIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + addIndexes(null, readers); + } + + /** + * Just like {@link #addIndexes(IndexReader...)}, only it allows passing a + * {@link PayloadConsumer} which will be used to process to incoming payloads + * before they are written. + */ + public void addIndexes(PayloadConsumer payloadConsumer, IndexReader... readers) throws CorruptIndexException, IOException { ensureOpen(); @@ -3330,7 +3350,7 @@ for (int i = 0; i < readers.length; i++) // add new indexes merger.add(readers[i]); - int docCount = merger.merge(); // merge 'em + int docCount = merger.merge(payloadConsumer); // merge 'em synchronized(this) { segmentInfos.clear(); // pop old infos & add new @@ -4010,8 +4030,18 @@ * Merges the indicated segments, replacing them in the stack with a * single segment. */ + final void merge(MergePolicy.OneMerge merge) + throws CorruptIndexException, IOException { + merge(merge, null); + } - final void merge(MergePolicy.OneMerge merge) + /** + * Merges the indicated segments, replacing them in the stack with a single + * segment. Also allows passing a {@link PayloadConsumer} which will be used + * to process the payloads before they are written (used by + * {@link #addIndexesNoOptimize}). + */ + final void merge(MergePolicy.OneMerge merge, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { boolean success = false; @@ -4026,7 +4056,7 @@ if (infoStream != null) message("now merge\n merge=" + merge.segString(directory) + "\n merge=" + merge + "\n index=" + segString()); - mergeMiddle(merge); + mergeMiddle(merge, payloadConsumer); mergeSuccess(merge); success = true; } catch (Throwable t) { @@ -4313,7 +4343,7 @@ /** Does the actual (time-consuming) work of the merge, * but without holding synchronized lock on IndexWriter * instance */ - final private int mergeMiddle(MergePolicy.OneMerge merge) + final private int mergeMiddle(MergePolicy.OneMerge merge, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { merge.checkAborted(directory); @@ -4401,7 +4431,7 @@ } // This is where all the work happens: - mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); + mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores, payloadConsumer); assert mergedDocCount == totDocCount; Index: lucene/src/java/org/apache/lucene/index/PayloadConsumer.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) @@ -0,0 +1,43 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** + * Consumes the payload that is read during merging of external indexes, + * triggered via {@link IndexWriter#addIndexes} or + * {@link IndexWriter#addIndexesNoOptimize}. This allows one to modify an + * incoming payload from an external index before it is written to the target + * directory. + * + * @lucene.experimental + */ +public abstract class PayloadConsumer { + + /** Returns the length of the payload that was returned by {@link #processPayload}. */ + public abstract int payloadLength() throws IOException; + + /** + * Process the incoming payload and returns the resulting byte[]. Note that a + * new array might be allocated if the given array is not big enough. The + * length of the new payload data can be obtained via {@link #payloadLength()}. + */ + public abstract byte[] processPayload(byte[] payload, int start, int length) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 941608) +++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -126,8 +126,8 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge() throws CorruptIndexException, IOException { - return merge(true); + final int merge(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { + return merge(true, payloadConsumer); } /** @@ -135,11 +135,13 @@ * into the directory passed to the constructor. * @param mergeDocStores if false, we will not merge the * stored fields nor vectors files + * @param payloadConsumer if not null, will be used to process the incoming + * payloads before they are written * @return The number of documents that were merged * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge(boolean mergeDocStores) throws CorruptIndexException, IOException { + final int merge(boolean mergeDocStores, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { this.mergeDocStores = mergeDocStores; @@ -151,7 +153,7 @@ // threads. mergedDocs = mergeFields(); - mergeTerms(); + mergeTerms(payloadConsumer); mergeNorms(); if (mergeDocStores && fieldInfos.hasVectors()) @@ -554,26 +556,26 @@ private SegmentMergeQueue queue = null; - private final void mergeTerms() throws CorruptIndexException, IOException { + private final void mergeTerms(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { SegmentWriteState state = new SegmentWriteState(null, directory, segment, null, mergedDocs, 0, termIndexInterval); - final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); + final FormatPostingsFieldsConsumer fieldsConsumer = new FormatPostingsFieldsWriter(state, fieldInfos); try { queue = new SegmentMergeQueue(readers.size()); - mergeTermInfos(consumer); + mergeTermInfos(fieldsConsumer, payloadConsumer); } finally { - consumer.finish(); + fieldsConsumer.finish(); if (queue != null) queue.close(); } } boolean omitTermFreqAndPositions; - private final void mergeTermInfos(final FormatPostingsFieldsConsumer consumer) throws CorruptIndexException, IOException { + private final void mergeTermInfos(final FormatPostingsFieldsConsumer consumer, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { int base = 0; final int readerCount = readers.size(); for (int i = 0; i < readerCount; i++) { @@ -625,7 +627,7 @@ omitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions; } - int df = appendPostings(termsConsumer, match, matchSize); // add new TermInfo + int df = appendPostings(termsConsumer, match, matchSize, payloadConsumer); // add new TermInfo checkAbort.work(df/3.0); @@ -655,11 +657,12 @@ * * @param smis array of segments * @param n number of cells in the array actually occupied + * @param payloadConsumer if not null, will be used to process the incoming payload before it's written * @return number of documents across all segments where this term was found * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n) + private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(smis[0].term.text); @@ -685,11 +688,15 @@ if (!omitTermFreqAndPositions) { for (int j = 0; j < freq; j++) { final int position = postings.nextPosition(); - final int payloadLength = postings.getPayloadLength(); + int payloadLength = postings.getPayloadLength(); if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) payloadBuffer = new byte[payloadLength]; postings.getPayload(payloadBuffer, 0); + if (payloadConsumer != null) { + payloadBuffer = payloadConsumer.processPayload(payloadBuffer, 0, payloadLength); + payloadLength = payloadConsumer.payloadLength(); + } } posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); } Index: lucene/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestDoc.java (revision 941608) +++ lucene/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -189,7 +189,7 @@ merger.add(r1); merger.add(r2); - merger.merge(); + merger.merge(null); merger.closeReaders(); if (useCompoundFile) { Index: lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java (revision 0) @@ -0,0 +1,157 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.LuceneTestCaseJ4; +import org.junit.Test; + +public class TestPayloadConsumer extends LuceneTestCaseJ4 { + + /** deletes the incoming payload */ + private static final class DeletePayloadConsumer extends PayloadConsumer { + + @Override + public int payloadLength() throws IOException { return 0; } + + @Override + public byte[] processPayload(byte[] payload, int start, int length) throws IOException { return payload; } + + } + + private static final class PayloadTokenStream extends TokenStream { + + private final PayloadAttribute payload = addAttribute(PayloadAttribute.class); + private final CharTermAttribute term = addAttribute(CharTermAttribute.class); + + private boolean called = false; + + @Override + public boolean incrementToken() throws IOException { + if (called) { + return false; + } + + called = true; + byte[] p = new byte[] { 1 }; + payload.setPayload(new Payload(p)); + term.append("term"); + return true; + } + + @Override + public void reset() throws IOException { + super.reset(); + called = false; + term.setEmpty(); + } + } + + private void createIndex(Directory dir) throws IOException { + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + TokenStream payloadTS = new PayloadTokenStream(); + for (int i = 0; i < 10; i++) { + Document doc = new Document(); + doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); + doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED)); + if ((i%2) == 0) { + doc.add(new Field("payload", payloadTS)); + } + writer.addDocument(doc); + } + writer.close(); + } + + private void verifyPayloadExists(Directory dir, boolean shouldExist) throws IOException { + IndexReader reader = IndexReader.open(dir); + try { + TermPositions tp = reader.termPositions(new Term("payload", "term")); + while (tp.next()) { + int freq = tp.freq(); + for (int i = 0; i < freq; i++) { + tp.nextPosition(); + if (shouldExist) { + assertTrue(tp.isPayloadAvailable()); + assertEquals(1, tp.getPayloadLength()); + byte[] p = new byte[tp.getPayloadLength()]; + tp.getPayload(p, 0); + assertEquals(1, p[0]); + } else { + assertFalse(tp.isPayloadAvailable()); + } + } + } + } finally { + reader.close(); + } + } + + @Test + public void testAddIndexes() throws Exception { + // Tests that when addIndexes is called w/ a payload consumer, it is called. + Directory[] dirs = new Directory[] { new MockRAMDirectory(), new MockRAMDirectory() }; + createIndex(dirs[0]); + createIndex(dirs[1]); + verifyPayloadExists(dirs[0], true); + verifyPayloadExists(dirs[1], true); + + Directory dir2 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir2, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + writer.commit(); + + IndexReader[] readers = new IndexReader[] { IndexReader.open(dirs[0]), + IndexReader.open(dirs[1]) }; + writer.addIndexes(new DeletePayloadConsumer(), readers); + readers[0].close(); + readers[1].close(); + writer.close(); + verifyPayloadExists(dir2, false); + } + + @Test + public void testAddIndexesNoOptimize() throws Exception { + // Tests that when addIndexes is called w/ a payload consumer, it is called. + Directory[] dirs = new Directory[] { new MockRAMDirectory(), new MockRAMDirectory() }; + createIndex(dirs[0]); + createIndex(dirs[1]); + verifyPayloadExists(dirs[0], true); + verifyPayloadExists(dirs[1], true); + + Directory dir2 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir2, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + writer.commit(); + + writer.addIndexesNoOptimize(new DeletePayloadConsumer(), dirs); + writer.close(); + verifyPayloadExists(dir2, false); + } + +} Property changes on: lucene\src\test\org\apache\lucene\index\TestPayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 941608) +++ lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -66,7 +66,7 @@ SegmentMerger merger = new SegmentMerger(mergedDir, mergedSegment); merger.add(reader1); merger.add(reader2); - int docsMerged = merger.merge(); + int docsMerged = merger.merge(null); merger.closeReaders(); assertTrue(docsMerged == 2); //Should be able to open a new SegmentReader against the new directory