Index: contrib/splitindex/src/java/org/apache/lucene/index/ArchivingSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/ArchivingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/ArchivingSplitPolicy.java (revision 0) @@ -0,0 +1,296 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; + +/** + * The ArchivingSplitPolicy extends the {@link RotatingSplitPolicy} + * by archiving instead of expunging the sub-indices over-flowing from the split + * index. + * + *

+ * It does so by overriding the + * {@link RotatingSplitPolicy#maybeExpungeOldestSplit} method. Specifically, it + * attempts to archive the last sub-index (about to be expunged), before it is + * actually expunged. + *

+ * + * @author Karthick Sankarachary + * + */ +public class ArchivingSplitPolicy extends RotatingSplitPolicy { + + // The option specifying the maximum number of sub-archvies to maintain. + public static final String ARCHIVING_POLICY_MAXIMUM_SUB_ARCHIVES = "archiving.policy.maximum.sub.archives"; + + // The prefix that the name of every archived index will start with. + public static final String SUB_ARCHIVE_FOLDER_NAME_PREFIX = "sub_archive_"; + + // The ordered set of archived directories. + protected Set archives; + + /** + * The maximum number of archives of sub-indices allowed. + */ + protected long maxSubArchives; + + /** + * Construct an archiving split policy with the given bound on the number of + * sub-archives. + * + * @param options + */ + public ArchivingSplitPolicy(Map options) { + super(options); + archives = Collections.synchronizedSortedSet(new TreeSet( + getDirectoryComparator())); + this.maxSubArchives = (Long) getOption( + ARCHIVING_POLICY_MAXIMUM_SUB_ARCHIVES, Long.class); + } + + /** + * Default the maximum number of sub-archives to 0. + * + * @return the default options for the archiving policy + */ + @Override + public Map getOptionDefaults() { + Map defaults = new HashMap(); + defaults.put(ARCHIVING_POLICY_MAXIMUM_SUB_ARCHIVES, 0); + return defaults; + } + + /** + * Ensure that the maximum sub-archive is a positive number. + */ + public Map getOptionBounds() { + Map bounds = new HashMap(); + bounds.put(ARCHIVING_POLICY_MAXIMUM_SUB_ARCHIVES, new OptionBound() { + public boolean isInbounds(String key, Serializable value) { + return (value instanceof Long && ((Long) value).longValue() >= 0) + || (value instanceof Integer && ((Integer) value).intValue() >= 0); + } + }); + return bounds; + } + + /** + * Compare files that start with the {@link #SUB_ARCHIVE_FOLDER_NAME_PREFIX} + * prefix. + * + * @param f1 + * the first (sub-archive) file to be compared + * @param f2 + * the second (sub-archive) file to be compared + * @return 0, 1 or -1 as required by {@link Comparator} + */ + protected static int compareFileIndices(File f1, File f2) { + return compareFileIndices(f1, f2, SUB_ARCHIVE_FOLDER_NAME_PREFIX); + } + + /** + * If the super's comparator does not work, compare directories as archives. + */ + @Override + protected Comparator getDirectoryComparator() { + return new Comparator() { + public int compare(Directory d1, Directory d2) { + if (d1 instanceof FSDirectory && d2 instanceof FSDirectory) { + try { + return compareFileIndices(((FSDirectory) d1).getDirectory(), + ((FSDirectory) d2).getDirectory()); + } catch (Exception e) {} + } + return ArchivingSplitPolicy.super.getDirectoryComparator().compare(d1, + d2); + } + }; + } + + /** + * Archive the last sub-index about to be expunged + * + * @throws IOException + */ + @Override + protected Directory popSplit() throws IOException { + maybeArchiveOldestSplit(); + return super.popSplit(); + } + + /** + * Add the last sub-index in {@link #splits} to the list of archived + * sub-indices. Then, check to see if we need to remove any excess archive + * sub-indices. If so, pick the last sub-archive and remove it. + * + * @throws IOException + */ + private void maybeArchiveOldestSplit() throws IOException { + if (maxSubArchives > 0) { + Directory split = peekSplit(); + if (split instanceof FSDirectory) { + File file = ((FSDirectory) split).getDirectory(); + Directory subArchive = FSDirectory.open(new File(file.getAbsolutePath() + .replace(SUB_INDEX_FOLDER_NAME_PREFIX, + SUB_ARCHIVE_FOLDER_NAME_PREFIX))); + IndexWriter subWriter = new IndexWriter(subArchive, getConfig()); + + subWriter.deleteAll(); + subWriter.addIndexesNoOptimize(split); + subWriter.commit(); + subWriter.close(); + + pushSubArchive(subArchive); + } + } + + while (getArchives().size() > maxSubArchives) { + popSubArchive(); + } + } + + /** + * Load the sub-archives at the time the split index is opened, unless the + * index is being opened in {@link OpenMode#CREATE} mode. + */ + @Override + public SplitVote onOpen(SplitWriter writer, Directory directory) + throws IOException { + super.onOpen(writer, directory); + if (!isOpenModeCreate()) { + archives.addAll(loadArchives(directory)); + } else { + archives.clear(); + clearArchives(directory); + } + return SplitVote.CARRY_ON; + } + + /** + * Load the list of sub-archives by listing the files whose names start with + * the split prefix + * + * @param directory + * @return + * @throws IOException + */ + static List loadArchives(final Directory directory) + throws IOException { + List subDirectories = new ArrayList(); + if (directory instanceof FSDirectory) { + final FSDirectory fsDirectory = (FSDirectory) directory; + File file = fsDirectory.getDirectory(); + File[] subFiles = file.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return dir.equals(fsDirectory.getDirectory()) + && name.startsWith(SUB_ARCHIVE_FOLDER_NAME_PREFIX); + } + }); + for (File subFile : subFiles) { + subDirectories.add(FSDirectory.open(subFile)); + } + } + return subDirectories; + } + + /** + * Remove any archives left over from a previous incarnation of the writer, is + * this one is being opened in a create mode. + * + * @param directory + * @throws IOException + */ + protected void clearArchives(final Directory directory) throws IOException { + for (Directory subDirectory : loadArchives(directory)) { + if (subDirectory instanceof FSDirectory) { + final FSDirectory fsDirectory = (FSDirectory) subDirectory; + File file = fsDirectory.getDirectory(); + deleteFile(file, true); + } + } + } + + /** + * @return the ordered set of directories of sub-archives + */ + @SuppressWarnings("unchecked") + public Set getArchives() { + return archives != null ? Collections.unmodifiableSet(archives) + : Collections.EMPTY_SET; + } + + /** + * @return the number of archives that exist currently + */ + public int getNumArchives() { + return archives != null ? archives.size() : 0; + } + + /** + * Add a sub-archive to this split index. + * + * @param subDirectory + */ + void pushSubArchive(Directory subDirectory) { + if (subDirectory != null) { + archives.add(subDirectory); + } + } + + /** + * Remove last sub-archive from this split index + */ + void popSubArchive() { + Iterator iterator = archives.iterator(); + Directory directory = iterator.next(); + if (directory != null) { + iterator.remove(); + } + if (directory instanceof FSDirectory) { + FSDirectory fsDirectory = (FSDirectory) directory; + try { + deleteFile(fsDirectory.getDirectory(), true); + } catch (Exception e) {} + } + } + + /** + * @return the last sub-archive in the split index. + */ + Directory peekSubArchive() { + return !archives.isEmpty() ? archives.iterator().next() : null; + } + +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestArchivingSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestArchivingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestArchivingSplitPolicy.java (revision 0) @@ -0,0 +1,88 @@ +package org.apache.lucene.index; + +import java.util.concurrent.TimeUnit; + +/** + * Test cases for the {@link ArchivingSplitPolicy}. + */ +public class TestArchivingSplitPolicy extends TestRotatingSplitPolicy { + + /** + * Indicate that we're testing the archiving split policy. + */ + @Override + protected Class getSplitPolicyClass() { + return ArchivingSplitPolicy.class; + } + + /** + * Configure the maximum number of archives. + * + * @param cronTrigger + * @param subIndices + * @param subArchives + * @param splitEvenIfIdle + */ + protected void setOptions(String cronTrigger, long subIndices, + long subArchives, boolean splitEvenIfIdle) { + super.setOptions(cronTrigger, subIndices, splitEvenIfIdle); + options.put(ArchivingSplitPolicy.ARCHIVING_POLICY_MAXIMUM_SUB_ARCHIVES, + subArchives); + } + + /** + * Configure the writer such that it splits every four seconds, maintains 3 + * splits and 2 archives. + */ + protected boolean setupWriter() { + setOptions("0/4 * * * * ?", 3, 2, true); + return true; + } + + protected int getNumArchives() { + return ((ArchivingSplitPolicy) writer.getSplitPolicy()).getNumArchives(); + } + + /** + * Make sure that there are no archives to begin with. + */ + @Override + public void testWriterSetup() throws Exception { + super.testWriterSetup(); + assertEquals(0, getNumArchives()); + } + + /** + * Sleep long enough to give the archiver to create one archive. + * + * @throws Exception + */ + public void testNumArchives() throws Exception { + TimeUnit.SECONDS.sleep(16); + writer.pauseSplits(); + assertEquals(1, getNumArchives()); + } + + /** + * Sleep long enough to give the archiver to create the maximum number of + * archives. + * + * @throws Exception + */ + public void testArchiveRotation() throws Exception { + TimeUnit.SECONDS.sleep(21); + writer.pauseSplits(); + assertEquals(2, getNumArchives()); + } + + /** + * Ensure that archived splits cannot be read from. + * + * @throws Exception + */ + public void testArchivedReads() throws Exception { + TimeUnit.SECONDS.sleep(22); + writer.pauseSplits(); + assertHits(0, new Term("name1", "value1"), 10, getSearcher(createReader())); + } +}