Index: contrib/splitindex/src/java/org/apache/lucene/index/RealTimeSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/RealTimeSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/RealTimeSplitPolicy.java (revision 0) @@ -0,0 +1,252 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.RealTimeSearcher; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.RAMDirectory; + +/** + * The RealTimeSplitPolicy makes it possible to search and read + * from a (Lucene) split index in a real-time manner. Specifically, changes made + * to the {@link SplitWriter} will immediately be reflected in all readers that + * were obtained from it. + * + *

+ * Note that while the readers based on this policy are real-time, in order to + * truly achieve real-time searching, one must use a {@link RealTimeSearcher} , + * which in turn builds upon the {@link IndexSearcher}. + *

+ * + *

+ * Each split corresponds to a reader, and tracks the changes made to the split + * index in between the time that and the next reader was obtained. When a new + * reader is obtained, the corresponding split is added to it, and all the + * readers that came before it. Note that a reader does not need to (nor does + * it) see the splits that came before it, since the snapshot of the + * super-reader that it obtains is current at the time of its inception. + *

+ * + * @author Karthick Sankarachary + * + */ +public class RealTimeSplitPolicy extends AbstractSplitPolicy implements + SplitPolicy { + // The set of {@link SplitReader}s obtained from the {@link SplitWriter} with + // which this policy is associated. + protected Set splitReaders = new HashSet(); + + // A ram-based directory that keeps track of changes since the last {@link + // SplitReader} was obtained from the {@link #writer}. + protected Directory realTimeDirectory; + // An index writer for the real-time directory defined above. + protected IndexWriter realTimeWriter; + // An index reader obtained from the real-time writer defined above. + protected IndexReader realTimeReader; + + /** + * Construct a real-time split policy with the given options. It assigns a + * {@link ReaderSplitRule} with itself, if one was not explicitly configured. + * + * @param options + * the configuration options for this policy + */ + public RealTimeSplitPolicy(Map options) { + super(options); + if (!containsRule(ReaderSplitRule.class)) { + addRule(new ReaderSplitRule()); + } + } + + /** + * Load nothing into {@link splits} when the split index is opened, since no + * readers exist yet. + */ + @Override + protected Collection loadSplits(Directory directory) + throws IOException { + return Collections.emptySet(); + } + + /** + * There's nothing to clear, since all splits are volatile in nature. + */ + @Override + protected void clearSplits(Directory directory) throws IOException {} + + /** + * When a new reader is obtained from the {@link SplitWriter}, that triggers a + * split. Corresponding to that new reader, we create a new ram-based + * directory to keep track of changes made to the index after it was obtained + * from the {@link SplitWriter}. That ram-based directory is then added to the + * set of splits ({@link #splits}), taking care to close the one created for + * the previous reader, if one exists. + */ + @Override + protected synchronized void doSplit(Map context) + throws CorruptIndexException, IOException { + // Commit the real-time sub-index corresponding to the last reader. + if (realTimeWriter != null) { + realTimeWriter.commit(); + realTimeWriter.close(); + } + setupRealTimeSplit(); + // Treat the ram-based directory as one of the {@link #splits}. + splits.add(realTimeDirectory); + // Associate this new ram-based directory with the readers obtained + // previously, plus the newn reader that was just obtained. + for (SplitReader activeReader : splitReaders) { + activeReader.addSplit(realTimeWriter.getReader()); + } + } + + protected void setupRealTimeSplit() throws CorruptIndexException, + LockObtainFailedException, IOException { + // Allocate a new ram-based directory corresponding to the new reader. + realTimeDirectory = new RAMDirectory(); + // Assign a index writer for the ram-based directory. + realTimeWriter = new IndexWriter(realTimeDirectory, getConfig()); + // Obtain an index reader directly from the above index writer. + realTimeReader = realTimeWriter.getReader(); + + } + + /** + * @return zero, since all the documents in this policy's cache are already in + * the super-directory + */ + @Override + public int numDocs() throws CorruptIndexException, IOException { + return 0; + } + + /** + * When a reader is obtained from the {@link SplitWriter}, add it to our set + * of {@link #splitReaders}. + * + * @param splitReader + * the split reader being obtained from the {@link SplitWriter} + */ + @Override + public SplitVote onGetReader(SplitReader splitReader) + throws CorruptIndexException, IOException { + splitReaders.add(splitReader); + return SplitVote.CARRY_ON; + } + + /** + * When a document is added to the split index, add it to this policy's + * real-time writer as well. More importantly, transparently re-open the + * real-time readers opened so far. + * + * @param document + * the document being added to the split index + * @param analyzer + * the analyzer being used to process the document + */ + @Override + public SplitVote onAddDocument(Document document, Analyzer analyzer) + throws CorruptIndexException, IOException { + if (realTimeWriter != null) { + realTimeWriter.addDocument(document, analyzer); + reopenRealTimeReaders(); + } + return super.onAddDocument(document, analyzer); + } + + /** + * When one or more readers are added to the split index, add it to this + * policy's real-time writer as well. More importantly, transparently re-open + * the real-time readers opened so far. + * + * @param readers + * the readers being added to the split index + */ + @Override + public SplitVote onAddIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + if (realTimeWriter != null) { + realTimeWriter.addIndexes(readers); + reopenRealTimeReaders(); + } + return super.onAddIndexes(readers); + } + + /** + * When one or more directories are added to the split index, add it to this + * policy's real-time writer as well. More importantly, transparently re-open + * the real-time readers opened so far. + * + * @param directories + * the directories being added to the split index + */ + @Override + public SplitVote onAddIndexesNoOptimize(Directory[] directories) + throws CorruptIndexException, IOException { + if (realTimeWriter != null) { + realTimeWriter.addIndexesNoOptimize(directories); + reopenRealTimeReaders(); + } + return super.onAddIndexesNoOptimize(directories); + } + + /** + * Commit changes to the real-time cache-based directory (i.e., the last + * split), in addition to those made to the super-directory. + */ + @Override + public SplitVote onCommit(Map commitUserData) + throws CorruptIndexException, IOException { + if (realTimeWriter != null) { + realTimeWriter.commit(commitUserData); + } + return super.onCommit(commitUserData); + } + + /** + * Re-open all the real-time readers associated with this policy, after + * committing the real-time writer. + * + * @throws CorruptIndexException + * @throws IOException + */ + protected void reopenRealTimeReaders() throws CorruptIndexException, + IOException { + realTimeWriter.commit(); + if (realTimeReader != null) { + if (!realTimeReader.isCurrent()) { + realTimeReader = realTimeReader.reopen(); + for (SplitReader splitReader : splitReaders) { + splitReader.reopenSplit(realTimeDirectory); + } + } + } + } +} \ No newline at end of file Index: contrib/splitindex/src/java/org/apache/lucene/search/RealTimeSearcher.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/search/RealTimeSearcher.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/search/RealTimeSearcher.java (revision 0) @@ -0,0 +1,259 @@ +package org.apache.lucene.search; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.SplitException; +import org.apache.lucene.index.SplitReader; +import org.apache.lucene.index.SplitVote; +import org.apache.lucene.index.SplitVoter; +import org.apache.lucene.index.SplitVoterAdapter; +import org.apache.lucene.index.SplitWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.Directory; + +/** + * The ReaLTimeSearcher builds on the {@link IndexSearcher} to make + * it aware of real-time changes to the index as and when they occur. In other + * words, the searcher does not need to be re-created in order for real-time + * changes (document additions/deletions, etc.) to be reflected in its searches. + * + *

+ * When the real-time searcher is created, it is registered as a voter with the + * {@link SplitReader} and {@link SplitWriter} thereof (if one exists). This + * way, the searcher gets a chance to re-initialize it's state of + * {@link IndexSearcher#subReaders} as soon as a change occurs. + *

+ * + *

+ * Down the road, we need to optimize the process described above so that it + * causes the least amount of change in the underlying searcher. In particular, + * we need to be able to re-initialize the underlying searcher one sub-reader at + * a time. That might require re-factoring of the main constructor for the + * {@link IndexSearcher}. + *

+ * + * @author Karthick Sankarachary + * + */ +public class RealTimeSearcher extends IndexSearcher implements SplitVoter { + protected SplitVoterAdapter splitVoterAdapter = new SplitVoterAdapter(); + + /** + * Construct a real-time searcher using a read-only split-reader based on the + * given path, and register self as a voter with the reader. + * + * @param path + * the path to the super-directory + * @throws CorruptIndexException + * @throws IOException + */ + public RealTimeSearcher(Directory path) throws CorruptIndexException, + IOException { + this(SplitReader.open(path, true), true); + } + + /** + * Construct a real-time searcher using a split-reader based on the given + * reader, and register self as a voter with the reader. + * + * @param reader + */ + public RealTimeSearcher(IndexReader reader) { + this(reader, false); + } + + /** + * + * Construct a real-time searcher using a split-reader based on the given + * reader, and register self as a voter with the reader. + * + * @param path + * @param readOnly + * @throws CorruptIndexException + * @throws IOException + */ + public RealTimeSearcher(Directory path, boolean readOnly) + throws CorruptIndexException, IOException { + this(SplitReader.open(path, readOnly), true); + } + + /** + * Construct a real-time searcher using a split-reader based on the given + * reader and sub-readers, and register self as a voter with the reader. + * + * @param reader + * @param subReaders + * @param docStarts + */ + public RealTimeSearcher(IndexReader reader, IndexReader[] subReaders, + int[] docStarts) { + super(reader, subReaders, docStarts); + addReaderVoter(); + } + + /** + * Construct a real-time searcher using a split-reader based on the given + * reader (forcing a cascading close if required), and register self as a + * voter with the reader. + * + * @param reader + * @param closeReader + */ + protected RealTimeSearcher(IndexReader reader, boolean closeReader) { + super(reader, closeReader); + addReaderVoter(); + } + + /** + * Add ourselves as a voter with the split reader, provided it is a split + * reader to begin with. + */ + protected void addReaderVoter() { + if (reader instanceof SplitReader) { + ((SplitReader) reader).addVoter(this); + } + } + + /** + * Re-initialize the sub-readers and doc-starts in the searcher. There is room + * for optimization here, given that we may know which sub-reader is being + * changed. + */ + protected void initialize() { + List subReadersList = new ArrayList(); + gatherSubReaders(subReadersList, reader); + subReaders = subReadersList.toArray(new IndexReader[subReadersList.size()]); + docStarts = new int[subReaders.length]; + int maxDoc = 0; + for (int i = 0; i < subReaders.length; i++) { + docStarts[i] = maxDoc; + maxDoc += subReaders[i].maxDoc(); + } + } + + /** + * When a document is added, re-initialize the sub-readers of the searcher. + */ + public SplitVote onAddDocument(Document document, Analyzer analyzer) + throws CorruptIndexException, IOException { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * When a document is added, re-initialize the sub-readers of the searcher. + */ + public SplitVote onAddIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * When a document is added, re-initialize the sub-readers of the searcher. + */ + public SplitVote onAddIndexesNoOptimize(Directory[] dirs) + throws CorruptIndexException, IOException { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on a close. + */ + public SplitVote onClose() throws CorruptIndexException, IOException { + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on a commit. + */ + public SplitVote onCommit(Map commitUserData) + throws CorruptIndexException, IOException { + return SplitVote.CARRY_ON; + } + + /** + * When all documents are deleted, re-initialize the sub-readers of the + * searcher. + */ + public SplitVote onDeleteAll() throws IOException { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * When a document is added, re-initialize the sub-readers of the searcher. + */ + public SplitVote onDeleteDocument(int docNum) { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * When a document is added, re-initialize the sub-readers of the searcher. + */ + public SplitVote onDeleteDocument(Term term) { + initialize(); + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on reader creation. + */ + public SplitVote onGetReader(SplitReader reader) + throws CorruptIndexException, IOException { + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on an open. + */ + public SplitVote onOpen(SplitWriter writer, Directory directory) + throws IOException { + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on a pause. + */ + public SplitVote onPause() throws SplitException { + return SplitVote.CARRY_ON; + } + + /** + * Nothing to do on a resume. + */ + public SplitVote onResume() throws SplitException { + return SplitVote.CARRY_ON; + } + + public SplitVote onUndeleteAll() { + initialize(); + return SplitVote.CARRY_ON; + } +} \ No newline at end of file Index: contrib/splitindex/src/java/org/apache/lucene/index/ReaderSplitRule.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/ReaderSplitRule.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/ReaderSplitRule.java (revision 0) @@ -0,0 +1,51 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +/** + * The ReaderSplitRule causes a split to occur if and when a reader + * is obtained from the {@link SplitWriter} associated with the + * {@link SplitPolicy} to which this rule applies. + * + * @author Karthick Sankarachary + */ +public class ReaderSplitRule extends SplitRuleAdapter implements SplitRule { + private static final long serialVersionUID = -7936841488136272644L; + + /** + * Construct a reader split rule with no specific configuration options. + */ + public ReaderSplitRule() { + super(); + } + + /** + * Cause a split to occur in the split index when a reader is acquired from + * it's {@link SplitWriter}. + * + * @return a vote that tells the split index what index reader to return + */ + @Override + public SplitVote onGetReader(SplitReader reader) + throws CorruptIndexException, IOException { + splitPolicy.split(context); + return SplitVote.CARRY_ON; + } +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestRealTimeSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestRealTimeSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestRealTimeSplitPolicy.java (revision 0) @@ -0,0 +1,147 @@ +package org.apache.lucene.index; + +import java.io.IOException; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.RealTimeSearcher; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.RAMDirectory; + +/** + * Test cases for the {@link RealTimeSplitPolicy}. + */ +public class TestRealTimeSplitPolicy extends SplitTestCase { + + /** + * Indicate that we're testing the real time split policy. + */ + @Override + protected Class getSplitPolicyClass() { + return RealTimeSplitPolicy.class; + } + + // A regular index writer, which currently works in a near real-time fashion. + protected IndexWriter nearRealTimeWriter; + + /** + * If exposing the near real-time limitation, let us create a vanilla writer + * instead of a one that's real-time. + */ + @Override + protected boolean setupWriter() throws CorruptIndexException, + LockObtainFailedException, IOException { + if ("testNearRealTimeLimitation".equals(getName())) { + return false; + } + return true; + } + + /** + * If testing multiple readers, create 10 of them. + */ + protected boolean setupReader() throws CorruptIndexException, IOException { + super.setupReader(); + if ("testMultipleReaders".equals(getName())) { + for (int i = 0; i < 10; i++) { + createReader(); + } + } + return true; + } + + /** + * Use a real-time searcher in conjunction with a real-time reader. This way, + * you can perform real-time searches on the same searcher without having to + * re-create either the searcher or the reader. + */ + @Override + protected IndexSearcher createSearcher(IndexReader reader) { + return new RealTimeSearcher(reader); + } + + /** + * Illustrate the limitation that exists with current index writers, readers + * and searchers that the real-time split policy attempts to address. + * + * Specifically, we show that an index searcher can find terms written prior + * to when the underlying reader was obtained from the writer, but none of the + * terms written since. + * + * @throws Exception + */ + public void testNearRealTimeLimitation() throws Exception { + IndexWriter nearRealTimeWriter = new IndexWriter(new RAMDirectory(), + getConfig()); + addDocument(new Field("name1", "value1", Field.Store.YES, + Field.Index.ANALYZED), nearRealTimeWriter); + nearRealTimeWriter.commit(); + IndexReader nearRealTimeReader = nearRealTimeWriter.getReader(); + addDocument(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED), nearRealTimeWriter); + nearRealTimeWriter.commit(); + IndexSearcher searcher = new IndexSearcher(nearRealTimeReader); + assertHits(1, new Term("name1", "value1"), searcher); + assertHits(0, new Term("name2", "value2"), searcher); + assertHits(1, new Term("name2", "value2"), new IndexSearcher( + nearRealTimeWriter.getReader())); + } + + /** + * Ensure that there is are as many splits as readers obtained from the split + * writer. + */ + @Override + public void testWriterSetup() throws Exception { + assertEquals(readers.size(), writer.getNumSplits()); + } + + /** + * Check to see that a field written after the reader was obtained is + * immediately reflected in it. + * + * @throws Exception + */ + public void testSingleReader() throws Exception { + IndexSearcher nearRealTimeSearcher = new IndexSearcher(getPrimaryReader()); + addDocument(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED)); + + assertHits(1, new Term("name1", "value1")); + assertHits(0, new Term("name2", "value2"), nearRealTimeSearcher); + assertHits(1, new Term("name2", "value2"), getPrimarySearcher()); + assertHits(1, new Term("name2", "value2"), new IndexSearcher( + getPrimaryReader())); + } + + /** + * Same as the above, except here we check against multiple readers. + * + * @throws Exception + */ + public void testMultipleReaders() throws Exception { + addDocument(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED)); + + for (IndexSearcher searcher : searchers.values()) { + assertHits(1, new Term("name1", "value1"), searcher); + assertHits(1, new Term("name2", "value2"), searcher); + } + } + + /** + * Given that additions are reflected immediately, adjust our basic + * assumptions about the index accordingly. + */ + @Override + public void testReadSuper() throws Exception { + Document document = new Document(); + document.add(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED)); + writer.addDocument(document); + writer.commit(); + + assertHits(1, new Term("name2", "value2")); + } +}