Index: contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java (revision 0)
@@ -0,0 +1,390 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+
+/**
+ * A RotatingSplitPolicy maintains a bounded set of sub-indices
+ * underneath the split index's directory. The motivation for such a policy is
+ * to be able to control the size of the index in a user-defined manner. It
+ * comes in handy in scenarios where (a) documents are added at a
+ * higher-than-normal rate, and (b) documents older than a certain cut-off date
+ * needn't be searchable. As a matter of fact, this policy is applicable to most
+ * real-time streams, since they tend to satisfy both of the above properties.
+ *
+ *
+ * In particular, when the number of sub-indices hits the maximum allowed number
+ * (@see {@link ROTATING_POLICY_MAXIMUM_SUB_INDICES}), then it effectively
+ * forces the last sub-index out of the split index. A sub-index is deemed to be
+ * the last one if it is considered to be lesser (according to
+ * {@link #getDirectoryComparator} ) than every other sub-index.
+ *
+ *
+ *
+ * The exact point in time at which rotation occurs is determined by the split
+ * rule(s) in effect for this policy. For example, one may apply a scheduled
+ * split rule if one wishes the split to occur at fixed intervals. For finer
+ * control over the periodicity of the interval, a cron split rule may be
+ * employed.
+ *
+ *
+ *
+ * To illustrate the behavior of the rotation policy policy, consider a split
+ * rule that triggers a split on the hour every hour. Furthermore, let's assume
+ * that the maximum number of sub-indices allowed is 7. In this case, the
+ * rotation policy will have no more than 8 hours worth of data (1 hour in the
+ * super-index and each of the 7 sub-indices).
+ *
+ *
+ * @author Karthick Sankarachary
+ *
+ */
+
+public class RotatingSplitPolicy extends AbstractSplitPolicy implements
+ SplitPolicy {
+
+ /**
+ * The option specifying the maximum number of sub-indices allowed in the
+ * split index (not counting the super-index).
+ */
+ public static final String ROTATING_POLICY_MAXIMUM_SUB_INDICES = "rotating.policy.maximum.sub.indices";
+ /**
+ * The option specifying whether or not to split the index even if no changes
+ * occurred since the last split.
+ */
+ public static final String ROTATING_POLICY_SPLIT_EVEN_IF_IDLE = "rotating.policy.split.even.if.idle";
+
+ /**
+ * The maximum number of sub-indices configured for this policy.
+ */
+ protected long maxSubIndices;
+ /**
+ * The flag indicating whether or not to split the index even if no changes
+ * occurred since the last split.
+ */
+ protected boolean splitEvenIfIdle;
+ /**
+ * A running counter that keeps track of the next number to use in the
+ * sub-index's folder name. Note that if the index is opened for
+ * {@link OpenMode#APPEND} or {@link OpenMode#CREATE_OR_APPEND}, then this
+ * counter starts off where the index left off the last time around.
+ */
+ protected int nextSubIndexNumber = 0;
+
+ /**
+ * The default prefix to be used for the name of a split sub-index.
+ */
+ static final String SUB_INDEX_FOLDER_NAME_PREFIX = "sub_index_";
+
+ /**
+ * Construct a rotation policy with the given bound on the number of
+ * sub-indices. If no split rule was specified, then by default, use a
+ * scheduled split rule that splits every day.
+ *
+ * @param options
+ */
+ public RotatingSplitPolicy(Map options) {
+ super(options);
+ if (splitRules.isEmpty()) {
+ addRule(new ScheduledSplitRule());
+ }
+
+ this.maxSubIndices = (Long) getOption(ROTATING_POLICY_MAXIMUM_SUB_INDICES,
+ Long.class);
+ this.splitEvenIfIdle = (Boolean) getOption(
+ ROTATING_POLICY_SPLIT_EVEN_IF_IDLE, Boolean.class);
+
+ }
+
+ /**
+ * Set the default maximum number of sub-indices to 7, and the flag to split
+ * even if idle to true.
+ */
+ @Override
+ public Map getOptionDefaults() {
+ Map defaults = new HashMap();
+ defaults.put(ROTATING_POLICY_MAXIMUM_SUB_INDICES, 7);
+ defaults.put(ROTATING_POLICY_SPLIT_EVEN_IF_IDLE, Boolean.TRUE);
+ return defaults;
+ }
+
+ /**
+ * Ensure that the maximum number of sub-indices is a positive number.
+ */
+ public Map getOptionBounds() {
+ Map bounds = new HashMap();
+ bounds.put(ROTATING_POLICY_MAXIMUM_SUB_INDICES, new OptionBound() {
+ public boolean isInbounds(String key, Serializable value) {
+ return (value instanceof Long && ((Long) value).longValue() >= 0)
+ || (value instanceof Integer && ((Integer) value).intValue() >= 0);
+ }
+ });
+ return bounds;
+ }
+
+ /**
+ * Compare directories by the sub-index count suffixed in its folder name.
+ */
+ @Override
+ protected Comparator super Directory> getDirectoryComparator() {
+ return new Comparator() {
+ public int compare(Directory d1, Directory d2) {
+ if (d1 instanceof FSDirectory && d2 instanceof FSDirectory) {
+ try {
+ return compareFileIndices(((FSDirectory) d1).getDirectory(),
+ ((FSDirectory) d2).getDirectory(), SUB_INDEX_FOLDER_NAME_PREFIX);
+ } catch (Exception e) {}
+ }
+ return RotatingSplitPolicy.super.getDirectoryComparator().compare(d1,
+ d2);
+ }
+ };
+ }
+
+ /**
+ * Compare two files based on the sub-index count in their names, using the
+ * default prefix.
+ *
+ * @param f1
+ * the first file to be compared
+ * @param f2
+ * the second file to be compared
+ * @param prefix
+ * the common prefix in the file names to be stripped out prior to
+ * comparison
+ * @return 0, 1 or -1 as required by {@link Comparator}
+ */
+ protected static int compareFileIndices(File f1, File f2, String prefix) {
+ if (f1.getName().startsWith(prefix) && f2.getName().startsWith(prefix)) {
+ String s1 = f1.getName().substring(prefix.length()), s2 = f2.getName()
+ .substring(prefix.length());
+ return Integer.parseInt(s1) - Integer.parseInt(s2);
+ }
+ throw new RuntimeException("Could not compare file names");
+ }
+
+ /**
+ * Retrieve the list of sub-indices by listing the files whose names start
+ * with the default sub-index prefix {@link @DEFAULT_SUB_INDEX_PREFIX}.
+ *
+ * @param directory
+ * @return
+ * @throws IOException
+ */
+ @Override
+ protected Collection extends Directory> loadSplits(final Directory directory)
+ throws IOException {
+ List subDirectories = new ArrayList();
+ if (directory instanceof FSDirectory) {
+ final FSDirectory fsDirectory = (FSDirectory) directory;
+ File file = fsDirectory.getDirectory();
+ File[] subFiles = file.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return dir.equals(fsDirectory.getDirectory())
+ && name.startsWith(SUB_INDEX_FOLDER_NAME_PREFIX);
+ }
+ });
+ for (File subFile : subFiles) {
+ subDirectories.add(FSDirectory.open(subFile));
+ }
+ }
+ return subDirectories;
+ }
+
+ protected void clearSplits(final Directory directory) throws IOException {
+ for (Directory subDirectory : loadSplits(directory)) {
+ if (subDirectory instanceof FSDirectory) {
+ final FSDirectory fsDirectory = (FSDirectory) subDirectory;
+ File file = fsDirectory.getDirectory();
+ deleteFile(file, true);
+ }
+ }
+ }
+
+ /**
+ * Remove oldest sub-index from this split index
+ *
+ * @throws IOException
+ */
+ protected Directory popSplit() throws IOException {
+ Directory directory = super.popSplit();
+ if (directory instanceof FSDirectory) {
+ FSDirectory fsDirectory = (FSDirectory) directory;
+ try {
+ deleteFile(fsDirectory.getDirectory(), true);
+ } catch (Exception e) {}
+ }
+ return directory;
+ }
+
+ /**
+ * Assign a name to the directory for the given sub-index
+ *
+ * @param directory
+ * the super-directory for this split index
+ * @param subIndexCount
+ * the count of the sub-index being named
+ * @return
+ */
+ private static Directory getSubIndex(Directory directory, int subIndexCount) {
+ Directory subIndexDirectory = null;
+ if (directory instanceof FSDirectory) {
+ try {
+ subIndexDirectory = FSDirectory.open(new File(((FSDirectory) directory)
+ .getDirectory(), SUB_INDEX_FOLDER_NAME_PREFIX + subIndexCount));
+ } catch (IOException e) {}
+ }
+ return subIndexDirectory;
+ }
+
+ /**
+ * Get the next sub-index directory to add to the policy.
+ *
+ * @return the directory of the new sub-index
+ */
+ protected Directory getNextSubIndex() {
+ return getSubIndex(directory, nextSubIndexNumber++);
+ }
+
+ /**
+ * Perform a split by creating a new sub-index, if necessary. If a new
+ * sub-index was in fact created, then expunge the older sub-index, if
+ * necessary.
+ */
+ @Override
+ protected void doSplit(Map context)
+ throws CorruptIndexException, IOException {
+ if (maybeCreateNewestSplit(context)) {
+ maybeExpungeOldestSplit(context);
+ }
+ }
+
+ /**
+ * Create a new sub-index and move the contents of the super-index to it.
+ * However, skip this step if the split index has been idle since the last
+ * split occurred, and if {@link #splitEvenIfIdle} is false.
+ *
+ * @param context
+ * the context propagated from the split rule
+ * @return true iff a new sub-index was created
+ * @throws CorruptIndexException
+ * @throws IOException
+ */
+ protected boolean maybeCreateNewestSplit(Map context)
+ throws CorruptIndexException, IOException {
+ // Check to see if the writer has been idle during the last partition
+ String lastSegmentName = (String) context
+ .get(SplitRule.SPLIT_RULE_LAST_SEGMENT_NAME);
+ long lastSegmentSize = (Long) context
+ .get(SplitRule.SPLIT_RULE_LAST_SEGMENT_SIZE);
+
+ // If the writer has in fact been idle, then don't split if so specified.
+ if (!splitEvenIfIdle
+ && ((lastSegmentName == null && getNewestSegment(writer) == null) || (((lastSegmentName == getNewestSegment(writer).name) && (lastSegmentSize == getNewestSegment(
+ writer).sizeInBytes()))))) {
+ return false;
+ }
+
+ // Capture the state of the current split in the context for the split rule
+ // so it can pass it back to us during the next split.
+ if (getNewestSegment(writer) != null) {
+ context.put(SplitRule.SPLIT_RULE_LAST_SEGMENT_NAME,
+ getNewestSegment(writer).name);
+ context.put(SplitRule.SPLIT_RULE_LAST_SEGMENT_SIZE, getNewestSegment(
+ writer).sizeInBytes());
+ }
+
+ // Push a new sub-index into {@link #splits}.
+ Directory subDirectory = getNextSubIndex();
+ pushSplit(subDirectory);
+ IndexWriter subWriter = new IndexWriter(subDirectory, getConfig());
+
+ // Move the context of the super-index into the above sub-index.
+ subWriter.deleteAll();
+ if (!writer.isClosed()) {
+ subWriter.addIndexesNoOptimize(writer.getDirectory());
+ }
+ subWriter.commit();
+ subWriter.close();
+
+ // Start the super-index off on a clean slate.
+ writer.deleteAllFromSuper();
+ writer.commit();
+
+ // Set the time at which this particular split occurred.
+ context.put(SplitRule.SPLIT_RULE_LAST_SPLIT_TIME, new Date().getTime());
+
+ return true;
+ }
+
+ /**
+ * Expunge any old sub-indices from the split index, if we've hit the maximum
+ * number of sub-indices allowed.
+ *
+ * @param context
+ * the context propagated from the split rule
+ * @return true iff a sub-index was expunged
+ * @throws IOException
+ */
+ protected boolean maybeExpungeOldestSplit(Map context)
+ throws IOException {
+ if (getSplits().size() > maxSubIndices) {
+ popSplit();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * A safe way of getting the segment info for the split writer.
+ *
+ * @param writer
+ * the split writer
+ * @return the writer's segment info, if present, or null otherwise.
+ */
+ private SegmentInfo getNewestSegment(SplitWriter writer) {
+ return writer.getSegmentCount() == 0 ? null : writer.newestSegment();
+ }
+
+ /**
+ * On delete all, clear the contents of all sub-indices.
+ */
+ @Override
+ public SplitVote onDeleteAll() throws IOException {
+ while (splits.size() > 0) {
+ popSplit();
+ }
+ return super.onDeleteAll();
+ }
+
+}
Index: contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java (revision 0)
@@ -0,0 +1,97 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test cases for the {@link RotatingSplitPolicy}.
+ */
+public class TestRotatingSplitPolicy extends SplitTestCase {
+
+ /**
+ * Indicate that we're testing the rotating split policy.
+ */
+ @Override
+ protected Class extends SplitPolicy> getSplitPolicyClass() {
+ return RotatingSplitPolicy.class;
+ }
+
+ /**
+ * Create a cron-based rule that fires a split based on the given trigger.
+ *
+ * @param cronTrigger
+ * @param subIndices
+ * @param splitEvenIfIdle
+ */
+ protected void setOptions(String cronTrigger, long subIndices,
+ boolean splitEvenIfIdle) {
+ options.put(CronSplitRule.CRON_SPLIT_RULE_TRIGGER, cronTrigger);
+ options.put(RotatingSplitPolicy.POLICY_SPLIT_RULES, new CronSplitRule(
+ options));
+ options.put(RotatingSplitPolicy.ROTATING_POLICY_MAXIMUM_SUB_INDICES,
+ subIndices);
+ options.put(RotatingSplitPolicy.ROTATING_POLICY_SPLIT_EVEN_IF_IDLE,
+ splitEvenIfIdle);
+ }
+
+ /**
+ * Configure the writer such that it splits every four seconds.
+ */
+ protected boolean setupWriter() {
+ setOptions("0/4 * * * * ?", 3, true);
+ return true;
+ }
+
+ /**
+ * Sleep for 10 seconds, and make sure there are anywhere between 2 and 3
+ * splits. Note that the actual number varies since the cron job may start
+ * during a second that's not an exact multiple of 4.
+ *
+ * @throws Exception
+ */
+ public void testNumSplits() throws Exception {
+ TimeUnit.SECONDS.sleep(10);
+ writer.pauseSplits();
+ assertTrue("The index did not rotate at the expected rate", writer
+ .getNumSplits() >= 2
+ && writer.getNumSplits() <= 3);
+ }
+
+ /**
+ * Give the split index to create the maximum number of splits, and make sure
+ * it creates no more. Also, verify that the initial term has been rotated out
+ * of the split index (i.e., it no longer exists).
+ *
+ * @throws Exception
+ */
+ public void testSplitRotation() throws Exception {
+ TimeUnit.SECONDS.sleep(17);
+ writer.pauseSplits();
+ assertEquals(3, writer.getNumSplits());
+ assertHits(0, new Term("name1", "value1"));
+ }
+
+ /**
+ * Sleep just long enough to make sure the initial term is still present in
+ * the index.
+ *
+ * @throws Exception
+ */
+ public void testSplitReads() throws Exception {
+ TimeUnit.SECONDS.sleep(6);
+ writer.pauseSplits();
+ assertHits(1, new Term("name1", "value1"));
+ }
+
+ /**
+ * Make sure that the second term is visible in the new reader obtained
+ * herein, provided it hasn't been rotated out.
+ */
+ public void testReadSuper() throws Exception {
+ super.testReadSuper();
+
+ TimeUnit.SECONDS.sleep(5);
+ writer.pauseSplits();
+
+ assertHits(1, new Term("name2", "value2"), 10, getSearcher(createReader()));
+ }
+}