Index: lucene/CHANGES.txt =================================================================== --- lucene/CHANGES.txt (revision 941676) +++ lucene/CHANGES.txt (working copy) @@ -166,6 +166,10 @@ * LUCENE-2398: Improve tests to work better from IDEs such as Eclipse. (Paolo Castagna via Robert Muir) +* LUCENE-1585: IndexWriter.addIndexes* now accept a PayloadConsumer which will + be used to process the incoming payloads before they are written to the + Directory. (Shai Erera, Michael Busch) + ======================= Lucene 3.x (not yet released) ======================= Changes in backwards compatibility policy Index: lucene/src/java/org/apache/lucene/index/IndexWriter.java =================================================================== --- lucene/src/java/org/apache/lucene/index/IndexWriter.java (revision 941676) +++ lucene/src/java/org/apache/lucene/index/IndexWriter.java (working copy) @@ -3103,7 +3103,17 @@ */ public void addIndexesNoOptimize(Directory... dirs) throws CorruptIndexException, IOException { + addIndexesNoOptimize(null, dirs); + } + /** + * Like {@link #addIndexesNoOptimize(Directory...)}, only allows passing a + * {@link PayloadConsumer} which will be used to process the payloads before + * they are written. + */ + public void addIndexesNoOptimize(PayloadConsumer payloadConsumer, Directory... dirs) + throws CorruptIndexException, IOException { + ensureOpen(); noDupDirs(dirs); @@ -3155,7 +3165,7 @@ // over into our index. This is necessary (before // finishing the transaction) to avoid leaving the // index in an unusable (inconsistent) state. - resolveExternalSegments(); + resolveExternalSegments(payloadConsumer); ensureOpen(); @@ -3188,7 +3198,7 @@ * We don't return until the SegmentInfos has no more * external segments. Currently this is only used by * addIndexesNoOptimize(). */ - private void resolveExternalSegments() throws CorruptIndexException, IOException { + private void resolveExternalSegments(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { boolean any = false; @@ -3246,7 +3256,7 @@ if (merge != null) { any = true; - merge(merge); + merge(merge, payloadConsumer); } } @@ -3276,7 +3286,16 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - public void addIndexes(IndexReader... readers) + public void addIndexes(IndexReader... readers) throws CorruptIndexException, IOException { + addIndexes(null, readers); + } + + /** + * Like {@link #addIndexes(IndexReader...)}, only it allows passing a + * {@link PayloadConsumer} which will be used to process to incoming payloads + * before they are written. + */ + public void addIndexes(PayloadConsumer payloadConsumer, IndexReader... readers) throws CorruptIndexException, IOException { ensureOpen(); @@ -3337,7 +3356,7 @@ for (int i = 0; i < readers.length; i++) // add new indexes merger.add(readers[i]); - int docCount = merger.merge(); // merge 'em + int docCount = merger.merge(payloadConsumer); // merge 'em synchronized(this) { segmentInfos.clear(); // pop old infos & add new @@ -4019,8 +4038,18 @@ * Merges the indicated segments, replacing them in the stack with a * single segment. */ - final void merge(MergePolicy.OneMerge merge) + throws CorruptIndexException, IOException { + merge(merge, null); + } + + /** + * Merges the indicated segments, replacing them in the stack with a + * single segment. Also allows passing a {@link PayloadConsumer} which will be used + * to process the payloads before they are written (used by + * {@link #addIndexesNoOptimize}). + */ + final void merge(MergePolicy.OneMerge merge, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { boolean success = false; @@ -4035,7 +4064,7 @@ if (infoStream != null) message("now merge\n merge=" + merge.segString(directory) + "\n index=" + segString()); - mergeMiddle(merge); + mergeMiddle(merge, payloadConsumer); mergeSuccess(merge); success = true; } catch (Throwable t) { @@ -4323,7 +4352,7 @@ /** Does the actual (time-consuming) work of the merge, * but without holding synchronized lock on IndexWriter * instance */ - final private int mergeMiddle(MergePolicy.OneMerge merge) + final private int mergeMiddle(MergePolicy.OneMerge merge, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { merge.checkAborted(directory); @@ -4411,7 +4440,7 @@ } // This is where all the work happens: - mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores); + mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores, payloadConsumer); // Record which codec was used to write the segment merge.info.setCodec(merger.getCodec()); Index: lucene/src/java/org/apache/lucene/index/PayloadConsumer.java =================================================================== --- lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) +++ lucene/src/java/org/apache/lucene/index/PayloadConsumer.java (revision 0) @@ -0,0 +1,38 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.lucene.util.BytesRef; + +/** + * Consumes the payload that is read during merging of external indexes, + * triggered via {@link IndexWriter#addIndexes} or + * {@link IndexWriter#addIndexesNoOptimize}. This allows one to modify an + * incoming payload from an external index before it is written to the target + * directory. + * + * @lucene.experimental + */ +public abstract class PayloadConsumer { + + /** Process the incoming payload and stores the result in the given {@link BytesRef}. */ + public abstract void processPayload(BytesRef payload) throws IOException; + +} Property changes on: lucene\src\java\org\apache\lucene\index\PayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/java/org/apache/lucene/index/SegmentMerger.java =================================================================== --- lucene/src/java/org/apache/lucene/index/SegmentMerger.java (revision 941676) +++ lucene/src/java/org/apache/lucene/index/SegmentMerger.java (working copy) @@ -123,8 +123,8 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge() throws CorruptIndexException, IOException { - return merge(true); + final int merge(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { + return merge(true, payloadConsumer); } /** @@ -136,7 +136,7 @@ * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ - final int merge(boolean mergeDocStores) throws CorruptIndexException, IOException { + final int merge(boolean mergeDocStores, PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { this.mergeDocStores = mergeDocStores; @@ -148,7 +148,7 @@ // threads. mergedDocs = mergeFields(); - mergeTerms(); + mergeTerms(payloadConsumer); mergeNorms(); if (mergeDocStores && fieldInfos.hasVectors()) @@ -553,7 +553,7 @@ return codec; } - private final void mergeTerms() throws CorruptIndexException, IOException { + private final void mergeTerms(PayloadConsumer payloadConsumer) throws CorruptIndexException, IOException { // Let CodecProvider decide which codec will be used to write // the new segment: @@ -644,6 +644,7 @@ mergeState.multiDeletedDocs = new MultiBits(bits, bitsStarts); try { + mergeState.payloadConsumer = payloadConsumer; consumer.merge(mergeState, new MultiFields(fields.toArray(Fields.EMPTY_ARRAY), slices.toArray(ReaderUtil.Slice.EMPTY_ARRAY))); Index: lucene/src/java/org/apache/lucene/index/codecs/MergeState.java =================================================================== --- lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (revision 941676) +++ lucene/src/java/org/apache/lucene/index/codecs/MergeState.java (working copy) @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.Bits; import java.util.List; +import org.apache.lucene.index.PayloadConsumer; /** Holds common state used during segment merging * @@ -38,5 +39,8 @@ // Updated per field; public FieldInfo fieldInfo; + + // Used to consume incoming payload + public PayloadConsumer payloadConsumer; } Index: lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java =================================================================== --- lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java (revision 941676) +++ lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java (working copy) @@ -83,6 +83,9 @@ final BytesRef payload; if (payloadLength > 0) { payload = postingsEnum.getPayload(); + if (mergeState.payloadConsumer != null) { + mergeState.payloadConsumer.processPayload(payload); + } } else { payload = null; } Index: lucene/src/test/org/apache/lucene/index/TestDoc.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestDoc.java (revision 941676) +++ lucene/src/test/org/apache/lucene/index/TestDoc.java (working copy) @@ -190,7 +190,7 @@ merger.add(r1); merger.add(r2); - merger.merge(); + merger.merge(null); merger.closeReaders(); final SegmentInfo info = new SegmentInfo(merged, si1.docCount + si2.docCount, si1.dir, Index: lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java (revision 0) +++ lucene/src/test/org/apache/lucene/index/TestPayloadConsumer.java (revision 0) @@ -0,0 +1,157 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.WhitespaceAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.tokenattributes.PayloadAttribute; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Index; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockRAMDirectory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LuceneTestCaseJ4; +import org.junit.Test; + +public class TestPayloadConsumer extends LuceneTestCaseJ4 { + + /** deletes the incoming payload */ + private static final class DeletePayloadConsumer extends PayloadConsumer { + + @Override + public void processPayload(BytesRef payload) throws IOException { + payload.length = 0; + } + + } + + private static final class PayloadTokenStream extends TokenStream { + + private final PayloadAttribute payload = addAttribute(PayloadAttribute.class); + private final CharTermAttribute term = addAttribute(CharTermAttribute.class); + + private boolean called = false; + + @Override + public boolean incrementToken() throws IOException { + if (called) { + return false; + } + + called = true; + byte[] p = new byte[] { 1 }; + payload.setPayload(new Payload(p)); + term.append("term"); + return true; + } + + @Override + public void reset() throws IOException { + super.reset(); + called = false; + term.setEmpty(); + } + } + + private void createIndex(Directory dir) throws IOException { + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + TokenStream payloadTS = new PayloadTokenStream(); + for (int i = 0; i < 10; i++) { + Document doc = new Document(); + doc.add(new Field("id", "doc" + i, Store.NO, Index.NOT_ANALYZED_NO_NORMS)); + doc.add(new Field("content", "doc content " + i, Store.NO, Index.ANALYZED)); + if ((i%2) == 0) { + doc.add(new Field("payload", payloadTS)); + } + writer.addDocument(doc); + } + writer.close(); + } + + private void verifyPayloadExists(Directory dir, boolean shouldExist) throws IOException { + IndexReader reader = IndexReader.open(dir); + try { + DocsAndPositionsEnum tpe = MultiFields.getTermPositionsEnum(reader, null, "payload", new BytesRef("term")); + while (tpe.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + int freq = tpe.freq(); + for (int i = 0; i < freq; i++) { + tpe.nextPosition(); + if (shouldExist) { + assertTrue(tpe.hasPayload()); + assertEquals(1, tpe.getPayloadLength()); + BytesRef p = tpe.getPayload(); + assertEquals(1, p.bytes[0]); + } else { + assertFalse("payload should not exist", tpe.hasPayload()); + } + } + } + } finally { + reader.close(); + } + } + + @Test + public void testAddIndexes() throws Exception { + // Tests that when addIndexes is called w/ a payload consumer, it is called. + Directory[] dirs = new Directory[] { new MockRAMDirectory(), new MockRAMDirectory() }; + createIndex(dirs[0]); + createIndex(dirs[1]); + verifyPayloadExists(dirs[0], true); + verifyPayloadExists(dirs[1], true); + + Directory dir2 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir2, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + writer.commit(); + + IndexReader[] readers = new IndexReader[] { IndexReader.open(dirs[0]), + IndexReader.open(dirs[1]) }; + writer.addIndexes(new DeletePayloadConsumer(), readers); + readers[0].close(); + readers[1].close(); + writer.close(); + verifyPayloadExists(dir2, false); + } + + @Test + public void testAddIndexesNoOptimize() throws Exception { + // Tests that when addIndexes is called w/ a payload consumer, it is called. + Directory[] dirs = new Directory[] { new MockRAMDirectory(), new MockRAMDirectory() }; + createIndex(dirs[0]); + createIndex(dirs[1]); + verifyPayloadExists(dirs[0], true); + verifyPayloadExists(dirs[1], true); + + Directory dir2 = new MockRAMDirectory(); + IndexWriter writer = new IndexWriter(dir2, new IndexWriterConfig(TEST_VERSION_CURRENT, new WhitespaceAnalyzer(TEST_VERSION_CURRENT))); + writer.commit(); + + writer.addIndexesNoOptimize(new DeletePayloadConsumer(), dirs); + writer.close(); + verifyPayloadExists(dir2, false); + } + +} Property changes on: lucene\src\test\org\apache\lucene\index\TestPayloadConsumer.java ___________________________________________________________________ Added: svn:keywords + Date Author Id Revision HeadURL Added: svn:eol-style + native Index: lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java =================================================================== --- lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (revision 941676) +++ lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java (working copy) @@ -68,7 +68,7 @@ SegmentMerger merger = new SegmentMerger(mergedDir, IndexWriter.DEFAULT_TERM_INDEX_INTERVAL, mergedSegment, null, CodecProvider.getDefault()); merger.add(reader1); merger.add(reader2); - int docsMerged = merger.merge(); + int docsMerged = merger.merge(null); merger.closeReaders(); assertTrue(docsMerged == 2); //Should be able to open a new SegmentReader against the new directory