Index: contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java (revision 0) @@ -0,0 +1,196 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; + +/** + * The CachingSplitPolicy builds on the {@link RealTimeSplitPolicy} + * and provides a caching mechanism that serves to buffer writes to the split + * index. + * + *

+ * As one might expect, this policy delays writes to the index, placing them in + * the cache-based splits instead. When a flush is forced (typically through a + * commit, but in general, through a split rule), then all un-flushed changes + * (splits) are written to the super-directory. + *

+ * + * + * + * @author Karthick Sankarachary + * + */ +public class CachingSplitPolicy extends RealTimeSplitPolicy implements + SplitPolicy { + + /** + * Construct a caching split policy based on the given configuration options. + * By default, force the cache to flush on a commit (using a + * {@link CommitSplitRule}). + * + * @param options + * the configuration options for this policy + */ + public CachingSplitPolicy(Map options) { + super(options); + if (!containsRule(CommitSplitRule.class)) { + addRule(new CommitSplitRule()); + } + } + + // A flag indicating whether a split is occurring. + protected boolean flushing = false; + + /** + * Force the index to open in a non-create mode, so that we get a chance to + * create a real-time ram-based directory, to capture writes made after the + * writer was created but before any readers were obtained from it. + */ + protected boolean isOpenModeCreate() { + return false; + } + + /** + * Initialize the real-time writer and reader so as to start buffering changes + * as soon as the index opens. + * + * @param directory + * the super-directory + */ + @Override + protected Collection loadSplits(Directory directory) + throws IOException { + setupRealTimeSplit(); + List splits = new ArrayList(); + splits.add(realTimeDirectory); + return splits; + } + + // Keeps track of the splits flushed already to the super-directory. + protected Set splitsFlushed = new HashSet(); + + /** + * When a new reader is obtained, let the {@link RealTimeSplitPolicy} do the + * split. Otherwise (if a commit is taking place), then flush the contents of + * the ram-based directory to the (super-)directory of the split index. + */ + @Override + protected synchronized void doSplit(Map context) + throws CorruptIndexException, IOException { + flushing = true; + if (!isActiveRule(context, ReaderSplitRule.class)) { + realTimeWriter.commit(); + if (writer != null) { + for (Directory split : splits) { + if (!splitsFlushed.contains(split.getLockID())) { + writer.addIndexesNoOptimize(split); + splitsFlushed.add(split.getLockID()); + } + } + } + } + super.doSplit(context); + flushing = false; + } + + /** + * @return the number of documents in this split policy not already flushed + */ + public int numDocs() throws CorruptIndexException, IOException { + if (splits.size() == splitsFlushed.size()) { + return 0; + } + int[] splitsToInclude = new int[splits.size() - splitsFlushed.size()]; + int splitToInclude = 0, currentSplitIndex = 0; + for (Directory split : splits) { + if (!splitsFlushed.contains(split.getLockID())) { + splitsToInclude[currentSplitIndex++] = splitToInclude; + } + splitToInclude++; + } + return new SplitReader(writer, true, splitsToInclude).numDocs(); + }; + + /** + * Add documents to the underlying {@link #realTimeWriter}, but not to the + * {@link #directory} underlying the split-index. + */ + @Override + public SplitVote onAddDocument(Document document, Analyzer analyzer) + throws CorruptIndexException, IOException { + super.onAddDocument(document, analyzer); + return SplitVote.DO_NOTHING; + } + + /** + * Add index readers to the underlying {@link #realTimeWriter}, but not to the + * {@link #directory} underlying the split-index. + */ + @Override + public SplitVote onAddIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + if (flushing) { + return SplitVote.CARRY_ON; + } else { + super.onAddIndexes(readers); + return SplitVote.DO_NOTHING; + } + } + + /** + * Add directories to the underlying {@link #realTimeWriter}, but not to the + * {@link #directory} underlying the split-index. + */ + @Override + public SplitVote onAddIndexesNoOptimize(Directory[] directories) + throws CorruptIndexException, IOException { + if (flushing) { + return SplitVote.CARRY_ON; + } else { + super.onAddIndexesNoOptimize(directories); + return SplitVote.DO_NOTHING; + } + } + + /** + * When a new reader is obtained, remove any flushed splits from it, since + * those documents are already reflected in the super-reader. + */ + @Override + public SplitVote onGetReader(SplitReader splitReader) + throws CorruptIndexException, IOException { + for (IndexReader subReader : splitReader.getSequentialSubReaders()) { + if (splitsFlushed.contains(subReader.directory().getLockID())) { + splitReader.removeSplit(subReader); + } + } + return super.onGetReader(splitReader); + } +} Index: contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java (revision 0) @@ -0,0 +1,52 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Map; + +/** + * The CommitSplitRule causes a split to occur if and when a commit + * occurs on the {@link SplitWriter} or {@link SplitReader} associated with the + * {@link SplitPolicy} to which this rule applies. + * + * @author Karthick Sankarachary + * + */ +public class CommitSplitRule extends SplitRuleAdapter implements SplitRule { + private static final long serialVersionUID = -7936841488136272644L; + + /** + * Construct a commit split rule with no specific configuration options. + */ + public CommitSplitRule() { + super(); + } + + /** + * Cause a split to occur in the split index when changes to it are committed. + * + * @return a vote that tells the split index to complete it's commit + */ + @Override + public SplitVote onCommit(Map commitUserData) + throws CorruptIndexException, IOException { + splitPolicy.split(context); + return SplitVote.CARRY_ON; + } +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java (revision 0) @@ -0,0 +1,144 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.document.Field; +import org.apache.lucene.store.LockObtainFailedException; + +/** + * Test cases for the {@link CachingSplitPolicy}. + */ +public class TestCachingSplitPolicy extends TestRealTimeSplitPolicy { + + /** + * Indicate that we're testing the caching split policy. + */ + @Override + protected Class getSplitPolicyClass() { + return CachingSplitPolicy.class; + } + + /** + * If testing a time-based split, create a scheduled split rule. + */ + @Override + protected boolean setupWriter() throws CorruptIndexException, + LockObtainFailedException, IOException { + super.setupWriter(); + if ("testTimedCache".equals(getName()) + || "testMultipleReaders".equals(getName())) { + options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_INITIAL_DELAY, 0); + options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_TIME_PERIOD, 5); + options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_TIME_UNIT, + TimeUnit.SECONDS); + options.put(RotatingSplitPolicy.POLICY_SPLIT_RULES, + new ScheduledSplitRule(options)); + } + return true; + } + + /** + * First of all, check to see that a change is reflected in the real-time + * split reader, but not in the underlying super-directory. In other words, + * the super-index should contain 1 less document than the split-index, since + * the second document exists in a cache that has not yet been flushed. + * + * Second of all, check to see that a commit forces a flush of all and nothing + * but the un-flushed changes to the super-directory. In other words, both the + * super-index and the split-index should contain 2 documents after the + * commit. + */ + public void testCommitCache() throws Exception { + assertEquals(1, writer.numDocs()); + + assertHits(1, new Term("name1", "value1")); + + addDocument(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED)); + + assertHits(1, new Term("name2", "value2")); + + assertEquals(1, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + + writer.commit(); + + assertEquals(2, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + } + + /** + * First of all, check to see that a change is reflected in the real-time + * split reader, but not in the underlying super-directory. In other words, + * the super-index should contain 1 less document than the split-index, since + * the second document exists in a cache that has not yet been flushed. + * + * Second of all, check to see that a scheduled split forces a flush of all + * and nothing but the un-flushed changes to the super-directory. In other + * words, both the super-index and the split-index should contain 2 documents + * after the scheduled split. + */ + public void testTimedCache() throws Exception { + assertHits(1, new Term("name1", "value1")); + + addDocument(new Field("name2", "value2", Field.Store.YES, + Field.Index.ANALYZED)); + + assertHits(1, new Term("name2", "value2")); + + assertEquals(1, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + + TimeUnit.SECONDS.sleep(6); + + assertEquals(2, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + } + + /** + * Check to see that there's a split for every commit, and reader obtained, + * plus the split that acts as the initial cache. + */ + @Override + public void testWriterSetup() throws Exception { + // 1 initial split + 1 split for the reader + 2 splits for each commit + assertEquals(4, writer.getNumSplits()); + } + + /** + * Check to see that multiple readers does not throw the caching policy off. + */ + public void testMultipleReaders() throws Exception { + super.testMultipleReaders(); + + assertHits(1, new Term("name1", "value1")); + + assertHits(1, new Term("name2", "value2")); + + assertEquals(1, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + + TimeUnit.SECONDS.sleep(6); + + assertEquals(2, writer.numSuperDocs()); + + assertEquals(2, writer.numDocs()); + } + + /** + * Check that the initial document exists in both the cache (split policy) and + * the super-directory. However, as the super-class' test case proves, the + * split reader still only returns one hit, as expected. + */ + public void testReaderSetup() throws Exception { + super.testReaderSetup(); + assertHits(1, new Term("name1", "value1"), writer.getReader(true)); + assertHits(2, new Term("name1", "value1"), writer.getReader(false)); + } +}