Index: contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/RotatingSplitPolicy.java (revision 0) @@ -0,0 +1,390 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; + +/** + * A RotatingSplitPolicy maintains a bounded set of sub-indices + * underneath the split index's directory. The motivation for such a policy is + * to be able to control the size of the index in a user-defined manner. It + * comes in handy in scenarios where (a) documents are added at a + * higher-than-normal rate, and (b) documents older than a certain cut-off date + * needn't be searchable. As a matter of fact, this policy is applicable to most + * real-time streams, since they tend to satisfy both of the above properties. + * + *

+ * In particular, when the number of sub-indices hits the maximum allowed number + * (@see {@link ROTATING_POLICY_MAXIMUM_SUB_INDICES}), then it effectively + * forces the last sub-index out of the split index. A sub-index is deemed to be + * the last one if it is considered to be lesser (according to + * {@link #getDirectoryComparator} ) than every other sub-index. + *

+ * + *

+ * The exact point in time at which rotation occurs is determined by the split + * rule(s) in effect for this policy. For example, one may apply a scheduled + * split rule if one wishes the split to occur at fixed intervals. For finer + * control over the periodicity of the interval, a cron split rule may be + * employed. + *

+ * + *

+ * To illustrate the behavior of the rotation policy policy, consider a split + * rule that triggers a split on the hour every hour. Furthermore, let's assume + * that the maximum number of sub-indices allowed is 7. In this case, the + * rotation policy will have no more than 8 hours worth of data (1 hour in the + * super-index and each of the 7 sub-indices). + *

+ * + * @author Karthick Sankarachary + * + */ + +public class RotatingSplitPolicy extends AbstractSplitPolicy implements + SplitPolicy { + + /** + * The option specifying the maximum number of sub-indices allowed in the + * split index (not counting the super-index). + */ + public static final String ROTATING_POLICY_MAXIMUM_SUB_INDICES = "rotating.policy.maximum.sub.indices"; + /** + * The option specifying whether or not to split the index even if no changes + * occurred since the last split. + */ + public static final String ROTATING_POLICY_SPLIT_EVEN_IF_IDLE = "rotating.policy.split.even.if.idle"; + + /** + * The maximum number of sub-indices configured for this policy. + */ + protected long maxSubIndices; + /** + * The flag indicating whether or not to split the index even if no changes + * occurred since the last split. + */ + protected boolean splitEvenIfIdle; + /** + * A running counter that keeps track of the next number to use in the + * sub-index's folder name. Note that if the index is opened for + * {@link OpenMode#APPEND} or {@link OpenMode#CREATE_OR_APPEND}, then this + * counter starts off where the index left off the last time around. + */ + protected int nextSubIndexNumber = 0; + + /** + * The default prefix to be used for the name of a split sub-index. + */ + static final String SUB_INDEX_FOLDER_NAME_PREFIX = "sub_index_"; + + /** + * Construct a rotation policy with the given bound on the number of + * sub-indices. If no split rule was specified, then by default, use a + * scheduled split rule that splits every day. + * + * @param options + */ + public RotatingSplitPolicy(Map options) { + super(options); + if (splitRules.isEmpty()) { + addRule(new ScheduledSplitRule()); + } + + this.maxSubIndices = (Long) getOption(ROTATING_POLICY_MAXIMUM_SUB_INDICES, + Long.class); + this.splitEvenIfIdle = (Boolean) getOption( + ROTATING_POLICY_SPLIT_EVEN_IF_IDLE, Boolean.class); + + } + + /** + * Set the default maximum number of sub-indices to 7, and the flag to split + * even if idle to true. + */ + @Override + public Map getOptionDefaults() { + Map defaults = new HashMap(); + defaults.put(ROTATING_POLICY_MAXIMUM_SUB_INDICES, 7); + defaults.put(ROTATING_POLICY_SPLIT_EVEN_IF_IDLE, Boolean.TRUE); + return defaults; + } + + /** + * Ensure that the maximum number of sub-indices is a positive number. + */ + public Map getOptionBounds() { + Map bounds = new HashMap(); + bounds.put(ROTATING_POLICY_MAXIMUM_SUB_INDICES, new OptionBound() { + public boolean isInbounds(String key, Serializable value) { + return (value instanceof Long && ((Long) value).longValue() >= 0) + || (value instanceof Integer && ((Integer) value).intValue() >= 0); + } + }); + return bounds; + } + + /** + * Compare directories by the sub-index count suffixed in its folder name. + */ + @Override + protected Comparator getDirectoryComparator() { + return new Comparator() { + public int compare(Directory d1, Directory d2) { + if (d1 instanceof FSDirectory && d2 instanceof FSDirectory) { + try { + return compareFileIndices(((FSDirectory) d1).getDirectory(), + ((FSDirectory) d2).getDirectory(), SUB_INDEX_FOLDER_NAME_PREFIX); + } catch (Exception e) {} + } + return RotatingSplitPolicy.super.getDirectoryComparator().compare(d1, + d2); + } + }; + } + + /** + * Compare two files based on the sub-index count in their names, using the + * default prefix. + * + * @param f1 + * the first file to be compared + * @param f2 + * the second file to be compared + * @param prefix + * the common prefix in the file names to be stripped out prior to + * comparison + * @return 0, 1 or -1 as required by {@link Comparator} + */ + protected static int compareFileIndices(File f1, File f2, String prefix) { + if (f1.getName().startsWith(prefix) && f2.getName().startsWith(prefix)) { + String s1 = f1.getName().substring(prefix.length()), s2 = f2.getName() + .substring(prefix.length()); + return Integer.parseInt(s1) - Integer.parseInt(s2); + } + throw new RuntimeException("Could not compare file names"); + } + + /** + * Retrieve the list of sub-indices by listing the files whose names start + * with the default sub-index prefix {@link @DEFAULT_SUB_INDEX_PREFIX}. + * + * @param directory + * @return + * @throws IOException + */ + @Override + protected Collection loadSplits(final Directory directory) + throws IOException { + List subDirectories = new ArrayList(); + if (directory instanceof FSDirectory) { + final FSDirectory fsDirectory = (FSDirectory) directory; + File file = fsDirectory.getDirectory(); + File[] subFiles = file.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return dir.equals(fsDirectory.getDirectory()) + && name.startsWith(SUB_INDEX_FOLDER_NAME_PREFIX); + } + }); + for (File subFile : subFiles) { + subDirectories.add(FSDirectory.open(subFile)); + } + } + return subDirectories; + } + + protected void clearSplits(final Directory directory) throws IOException { + for (Directory subDirectory : loadSplits(directory)) { + if (subDirectory instanceof FSDirectory) { + final FSDirectory fsDirectory = (FSDirectory) subDirectory; + File file = fsDirectory.getDirectory(); + deleteFile(file, true); + } + } + } + + /** + * Remove oldest sub-index from this split index + * + * @throws IOException + */ + protected Directory popSplit() throws IOException { + Directory directory = super.popSplit(); + if (directory instanceof FSDirectory) { + FSDirectory fsDirectory = (FSDirectory) directory; + try { + deleteFile(fsDirectory.getDirectory(), true); + } catch (Exception e) {} + } + return directory; + } + + /** + * Assign a name to the directory for the given sub-index + * + * @param directory + * the super-directory for this split index + * @param subIndexCount + * the count of the sub-index being named + * @return + */ + private static Directory getSubIndex(Directory directory, int subIndexCount) { + Directory subIndexDirectory = null; + if (directory instanceof FSDirectory) { + try { + subIndexDirectory = FSDirectory.open(new File(((FSDirectory) directory) + .getDirectory(), SUB_INDEX_FOLDER_NAME_PREFIX + subIndexCount)); + } catch (IOException e) {} + } + return subIndexDirectory; + } + + /** + * Get the next sub-index directory to add to the policy. + * + * @return the directory of the new sub-index + */ + protected Directory getNextSubIndex() { + return getSubIndex(directory, nextSubIndexNumber++); + } + + /** + * Perform a split by creating a new sub-index, if necessary. If a new + * sub-index was in fact created, then expunge the older sub-index, if + * necessary. + */ + @Override + protected void doSplit(Map context) + throws CorruptIndexException, IOException { + if (maybeCreateNewestSplit(context)) { + maybeExpungeOldestSplit(context); + } + } + + /** + * Create a new sub-index and move the contents of the super-index to it. + * However, skip this step if the split index has been idle since the last + * split occurred, and if {@link #splitEvenIfIdle} is false. + * + * @param context + * the context propagated from the split rule + * @return true iff a new sub-index was created + * @throws CorruptIndexException + * @throws IOException + */ + protected boolean maybeCreateNewestSplit(Map context) + throws CorruptIndexException, IOException { + // Check to see if the writer has been idle during the last partition + String lastSegmentName = (String) context + .get(SplitRule.SPLIT_RULE_LAST_SEGMENT_NAME); + long lastSegmentSize = (Long) context + .get(SplitRule.SPLIT_RULE_LAST_SEGMENT_SIZE); + + // If the writer has in fact been idle, then don't split if so specified. + if (!splitEvenIfIdle + && ((lastSegmentName == null && getNewestSegment(writer) == null) || (((lastSegmentName == getNewestSegment(writer).name) && (lastSegmentSize == getNewestSegment( + writer).sizeInBytes()))))) { + return false; + } + + // Capture the state of the current split in the context for the split rule + // so it can pass it back to us during the next split. + if (getNewestSegment(writer) != null) { + context.put(SplitRule.SPLIT_RULE_LAST_SEGMENT_NAME, + getNewestSegment(writer).name); + context.put(SplitRule.SPLIT_RULE_LAST_SEGMENT_SIZE, getNewestSegment( + writer).sizeInBytes()); + } + + // Push a new sub-index into {@link #splits}. + Directory subDirectory = getNextSubIndex(); + pushSplit(subDirectory); + IndexWriter subWriter = new IndexWriter(subDirectory, getConfig()); + + // Move the context of the super-index into the above sub-index. + subWriter.deleteAll(); + if (!writer.isClosed()) { + subWriter.addIndexesNoOptimize(writer.getDirectory()); + } + subWriter.commit(); + subWriter.close(); + + // Start the super-index off on a clean slate. + writer.deleteAllFromSuper(); + writer.commit(); + + // Set the time at which this particular split occurred. + context.put(SplitRule.SPLIT_RULE_LAST_SPLIT_TIME, new Date().getTime()); + + return true; + } + + /** + * Expunge any old sub-indices from the split index, if we've hit the maximum + * number of sub-indices allowed. + * + * @param context + * the context propagated from the split rule + * @return true iff a sub-index was expunged + * @throws IOException + */ + protected boolean maybeExpungeOldestSplit(Map context) + throws IOException { + if (getSplits().size() > maxSubIndices) { + popSplit(); + return true; + } + return false; + } + + /** + * A safe way of getting the segment info for the split writer. + * + * @param writer + * the split writer + * @return the writer's segment info, if present, or null otherwise. + */ + private SegmentInfo getNewestSegment(SplitWriter writer) { + return writer.getSegmentCount() == 0 ? null : writer.newestSegment(); + } + + /** + * On delete all, clear the contents of all sub-indices. + */ + @Override + public SplitVote onDeleteAll() throws IOException { + while (splits.size() > 0) { + popSplit(); + } + return super.onDeleteAll(); + } + +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestRotatingSplitPolicy.java (revision 0) @@ -0,0 +1,97 @@ +package org.apache.lucene.index; + +import java.util.concurrent.TimeUnit; + +/** + * Test cases for the {@link RotatingSplitPolicy}. + */ +public class TestRotatingSplitPolicy extends SplitTestCase { + + /** + * Indicate that we're testing the rotating split policy. + */ + @Override + protected Class getSplitPolicyClass() { + return RotatingSplitPolicy.class; + } + + /** + * Create a cron-based rule that fires a split based on the given trigger. + * + * @param cronTrigger + * @param subIndices + * @param splitEvenIfIdle + */ + protected void setOptions(String cronTrigger, long subIndices, + boolean splitEvenIfIdle) { + options.put(CronSplitRule.CRON_SPLIT_RULE_TRIGGER, cronTrigger); + options.put(RotatingSplitPolicy.POLICY_SPLIT_RULES, new CronSplitRule( + options)); + options.put(RotatingSplitPolicy.ROTATING_POLICY_MAXIMUM_SUB_INDICES, + subIndices); + options.put(RotatingSplitPolicy.ROTATING_POLICY_SPLIT_EVEN_IF_IDLE, + splitEvenIfIdle); + } + + /** + * Configure the writer such that it splits every four seconds. + */ + protected boolean setupWriter() { + setOptions("0/4 * * * * ?", 3, true); + return true; + } + + /** + * Sleep for 10 seconds, and make sure there are anywhere between 2 and 3 + * splits. Note that the actual number varies since the cron job may start + * during a second that's not an exact multiple of 4. + * + * @throws Exception + */ + public void testNumSplits() throws Exception { + TimeUnit.SECONDS.sleep(10); + writer.pauseSplits(); + assertTrue("The index did not rotate at the expected rate", writer + .getNumSplits() >= 2 + && writer.getNumSplits() <= 3); + } + + /** + * Give the split index to create the maximum number of splits, and make sure + * it creates no more. Also, verify that the initial term has been rotated out + * of the split index (i.e., it no longer exists). + * + * @throws Exception + */ + public void testSplitRotation() throws Exception { + TimeUnit.SECONDS.sleep(17); + writer.pauseSplits(); + assertEquals(3, writer.getNumSplits()); + assertHits(0, new Term("name1", "value1")); + } + + /** + * Sleep just long enough to make sure the initial term is still present in + * the index. + * + * @throws Exception + */ + public void testSplitReads() throws Exception { + TimeUnit.SECONDS.sleep(6); + writer.pauseSplits(); + assertHits(1, new Term("name1", "value1")); + } + + /** + * Make sure that the second term is visible in the new reader obtained + * herein, provided it hasn't been rotated out. + */ + public void testReadSuper() throws Exception { + super.testReadSuper(); + + TimeUnit.SECONDS.sleep(5); + writer.pauseSplits(); + + assertHits(1, new Term("name2", "value2"), 10, getSearcher(createReader())); + } +}