Index: contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/java/org/apache/lucene/index/CachingSplitPolicy.java (revision 0)
@@ -0,0 +1,196 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.Directory;
+
+/**
+ * The CachingSplitPolicy builds on the {@link RealTimeSplitPolicy}
+ * and provides a caching mechanism that serves to buffer writes to the split
+ * index.
+ *
+ *
+ * As one might expect, this policy delays writes to the index, placing them in
+ * the cache-based splits instead. When a flush is forced (typically through a
+ * commit, but in general, through a split rule), then all un-flushed changes
+ * (splits) are written to the super-directory.
+ *
+ *
+ *
+ *
+ * @author Karthick Sankarachary
+ *
+ */
+public class CachingSplitPolicy extends RealTimeSplitPolicy implements
+ SplitPolicy {
+
+ /**
+ * Construct a caching split policy based on the given configuration options.
+ * By default, force the cache to flush on a commit (using a
+ * {@link CommitSplitRule}).
+ *
+ * @param options
+ * the configuration options for this policy
+ */
+ public CachingSplitPolicy(Map options) {
+ super(options);
+ if (!containsRule(CommitSplitRule.class)) {
+ addRule(new CommitSplitRule());
+ }
+ }
+
+ // A flag indicating whether a split is occurring.
+ protected boolean flushing = false;
+
+ /**
+ * Force the index to open in a non-create mode, so that we get a chance to
+ * create a real-time ram-based directory, to capture writes made after the
+ * writer was created but before any readers were obtained from it.
+ */
+ protected boolean isOpenModeCreate() {
+ return false;
+ }
+
+ /**
+ * Initialize the real-time writer and reader so as to start buffering changes
+ * as soon as the index opens.
+ *
+ * @param directory
+ * the super-directory
+ */
+ @Override
+ protected Collection extends Directory> loadSplits(Directory directory)
+ throws IOException {
+ setupRealTimeSplit();
+ List splits = new ArrayList();
+ splits.add(realTimeDirectory);
+ return splits;
+ }
+
+ // Keeps track of the splits flushed already to the super-directory.
+ protected Set splitsFlushed = new HashSet();
+
+ /**
+ * When a new reader is obtained, let the {@link RealTimeSplitPolicy} do the
+ * split. Otherwise (if a commit is taking place), then flush the contents of
+ * the ram-based directory to the (super-)directory of the split index.
+ */
+ @Override
+ protected synchronized void doSplit(Map context)
+ throws CorruptIndexException, IOException {
+ flushing = true;
+ if (!isActiveRule(context, ReaderSplitRule.class)) {
+ realTimeWriter.commit();
+ if (writer != null) {
+ for (Directory split : splits) {
+ if (!splitsFlushed.contains(split.getLockID())) {
+ writer.addIndexesNoOptimize(split);
+ splitsFlushed.add(split.getLockID());
+ }
+ }
+ }
+ }
+ super.doSplit(context);
+ flushing = false;
+ }
+
+ /**
+ * @return the number of documents in this split policy not already flushed
+ */
+ public int numDocs() throws CorruptIndexException, IOException {
+ if (splits.size() == splitsFlushed.size()) {
+ return 0;
+ }
+ int[] splitsToInclude = new int[splits.size() - splitsFlushed.size()];
+ int splitToInclude = 0, currentSplitIndex = 0;
+ for (Directory split : splits) {
+ if (!splitsFlushed.contains(split.getLockID())) {
+ splitsToInclude[currentSplitIndex++] = splitToInclude;
+ }
+ splitToInclude++;
+ }
+ return new SplitReader(writer, true, splitsToInclude).numDocs();
+ };
+
+ /**
+ * Add documents to the underlying {@link #realTimeWriter}, but not to the
+ * {@link #directory} underlying the split-index.
+ */
+ @Override
+ public SplitVote onAddDocument(Document document, Analyzer analyzer)
+ throws CorruptIndexException, IOException {
+ super.onAddDocument(document, analyzer);
+ return SplitVote.DO_NOTHING;
+ }
+
+ /**
+ * Add index readers to the underlying {@link #realTimeWriter}, but not to the
+ * {@link #directory} underlying the split-index.
+ */
+ @Override
+ public SplitVote onAddIndexes(IndexReader... readers)
+ throws CorruptIndexException, IOException {
+ if (flushing) {
+ return SplitVote.CARRY_ON;
+ } else {
+ super.onAddIndexes(readers);
+ return SplitVote.DO_NOTHING;
+ }
+ }
+
+ /**
+ * Add directories to the underlying {@link #realTimeWriter}, but not to the
+ * {@link #directory} underlying the split-index.
+ */
+ @Override
+ public SplitVote onAddIndexesNoOptimize(Directory[] directories)
+ throws CorruptIndexException, IOException {
+ if (flushing) {
+ return SplitVote.CARRY_ON;
+ } else {
+ super.onAddIndexesNoOptimize(directories);
+ return SplitVote.DO_NOTHING;
+ }
+ }
+
+ /**
+ * When a new reader is obtained, remove any flushed splits from it, since
+ * those documents are already reflected in the super-reader.
+ */
+ @Override
+ public SplitVote onGetReader(SplitReader splitReader)
+ throws CorruptIndexException, IOException {
+ for (IndexReader subReader : splitReader.getSequentialSubReaders()) {
+ if (splitsFlushed.contains(subReader.directory().getLockID())) {
+ splitReader.removeSplit(subReader);
+ }
+ }
+ return super.onGetReader(splitReader);
+ }
+}
Index: contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java
===================================================================
--- contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java (revision 0)
+++ contrib/splitindex/src/java/org/apache/lucene/index/CommitSplitRule.java (revision 0)
@@ -0,0 +1,52 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The CommitSplitRule causes a split to occur if and when a commit
+ * occurs on the {@link SplitWriter} or {@link SplitReader} associated with the
+ * {@link SplitPolicy} to which this rule applies.
+ *
+ * @author Karthick Sankarachary
+ *
+ */
+public class CommitSplitRule extends SplitRuleAdapter implements SplitRule {
+ private static final long serialVersionUID = -7936841488136272644L;
+
+ /**
+ * Construct a commit split rule with no specific configuration options.
+ */
+ public CommitSplitRule() {
+ super();
+ }
+
+ /**
+ * Cause a split to occur in the split index when changes to it are committed.
+ *
+ * @return a vote that tells the split index to complete it's commit
+ */
+ @Override
+ public SplitVote onCommit(Map commitUserData)
+ throws CorruptIndexException, IOException {
+ splitPolicy.split(context);
+ return SplitVote.CARRY_ON;
+ }
+}
Index: contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/test/org/apache/lucene/index/TestCachingSplitPolicy.java (revision 0)
@@ -0,0 +1,144 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.document.Field;
+import org.apache.lucene.store.LockObtainFailedException;
+
+/**
+ * Test cases for the {@link CachingSplitPolicy}.
+ */
+public class TestCachingSplitPolicy extends TestRealTimeSplitPolicy {
+
+ /**
+ * Indicate that we're testing the caching split policy.
+ */
+ @Override
+ protected Class extends SplitPolicy> getSplitPolicyClass() {
+ return CachingSplitPolicy.class;
+ }
+
+ /**
+ * If testing a time-based split, create a scheduled split rule.
+ */
+ @Override
+ protected boolean setupWriter() throws CorruptIndexException,
+ LockObtainFailedException, IOException {
+ super.setupWriter();
+ if ("testTimedCache".equals(getName())
+ || "testMultipleReaders".equals(getName())) {
+ options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_INITIAL_DELAY, 0);
+ options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_TIME_PERIOD, 5);
+ options.put(ScheduledSplitRule.SCHEDULED_SPLIT_RULE_TIME_UNIT,
+ TimeUnit.SECONDS);
+ options.put(RotatingSplitPolicy.POLICY_SPLIT_RULES,
+ new ScheduledSplitRule(options));
+ }
+ return true;
+ }
+
+ /**
+ * First of all, check to see that a change is reflected in the real-time
+ * split reader, but not in the underlying super-directory. In other words,
+ * the super-index should contain 1 less document than the split-index, since
+ * the second document exists in a cache that has not yet been flushed.
+ *
+ * Second of all, check to see that a commit forces a flush of all and nothing
+ * but the un-flushed changes to the super-directory. In other words, both the
+ * super-index and the split-index should contain 2 documents after the
+ * commit.
+ */
+ public void testCommitCache() throws Exception {
+ assertEquals(1, writer.numDocs());
+
+ assertHits(1, new Term("name1", "value1"));
+
+ addDocument(new Field("name2", "value2", Field.Store.YES,
+ Field.Index.ANALYZED));
+
+ assertHits(1, new Term("name2", "value2"));
+
+ assertEquals(1, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+
+ writer.commit();
+
+ assertEquals(2, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+ }
+
+ /**
+ * First of all, check to see that a change is reflected in the real-time
+ * split reader, but not in the underlying super-directory. In other words,
+ * the super-index should contain 1 less document than the split-index, since
+ * the second document exists in a cache that has not yet been flushed.
+ *
+ * Second of all, check to see that a scheduled split forces a flush of all
+ * and nothing but the un-flushed changes to the super-directory. In other
+ * words, both the super-index and the split-index should contain 2 documents
+ * after the scheduled split.
+ */
+ public void testTimedCache() throws Exception {
+ assertHits(1, new Term("name1", "value1"));
+
+ addDocument(new Field("name2", "value2", Field.Store.YES,
+ Field.Index.ANALYZED));
+
+ assertHits(1, new Term("name2", "value2"));
+
+ assertEquals(1, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+
+ TimeUnit.SECONDS.sleep(6);
+
+ assertEquals(2, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+ }
+
+ /**
+ * Check to see that there's a split for every commit, and reader obtained,
+ * plus the split that acts as the initial cache.
+ */
+ @Override
+ public void testWriterSetup() throws Exception {
+ // 1 initial split + 1 split for the reader + 2 splits for each commit
+ assertEquals(4, writer.getNumSplits());
+ }
+
+ /**
+ * Check to see that multiple readers does not throw the caching policy off.
+ */
+ public void testMultipleReaders() throws Exception {
+ super.testMultipleReaders();
+
+ assertHits(1, new Term("name1", "value1"));
+
+ assertHits(1, new Term("name2", "value2"));
+
+ assertEquals(1, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+
+ TimeUnit.SECONDS.sleep(6);
+
+ assertEquals(2, writer.numSuperDocs());
+
+ assertEquals(2, writer.numDocs());
+ }
+
+ /**
+ * Check that the initial document exists in both the cache (split policy) and
+ * the super-directory. However, as the super-class' test case proves, the
+ * split reader still only returns one hit, as expected.
+ */
+ public void testReaderSetup() throws Exception {
+ super.testReaderSetup();
+ assertHits(1, new Term("name1", "value1"), writer.getReader(true));
+ assertHits(2, new Term("name1", "value1"), writer.getReader(false));
+ }
+}