Index: contrib/splitindex/src/java/org/apache/lucene/index/RemotingSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/RemotingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/RemotingSplitPolicy.java (revision 0) @@ -0,0 +1,366 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.RAMDirectory; + +/** + * The RemotingSplitPolicy is an abstraction of the + * {@link SplitPolicy} interface, where each sub-index points to a (potentially + * remote) URI-based location. + * + * @author Karthick Sankarachary + * + */ +public abstract class RemotingSplitPolicy extends AbstractSplitPolicy implements + SplitPolicy { + + // A configuration option specifying whether or not to include the super-index + // in the mix. + public static final String REMOTING_POLICY_INCLUDE_SUPER_DIR = "remoting.policy.include.super.directory"; + // A configuration option specifying the collection of URI-based directories. + public static final String REMOTING_POLICY_DIRECTORY_URIS = "remoting.policy.directory.uri.list"; + + // The flag indicating whether ot not to include the super-index in this + // policy. + protected boolean includeSuper; + // The collection of URIs pointing to the (potentially remote) directories on + // which the {@link #splits} will be based. + protected Collection directoryUris; + // A map of directory lock id's to their corresponding URIs. + protected Map lockidsUri; + + // A map of directory to their corresponding index writers. + protected Map subWriters = new HashMap(); + + /** + * Construct a remoting split policy with the given options. + * + * @param options + * the configuration options for this split policy + */ + public RemotingSplitPolicy(Map options) { + super(options); + includeSuper = (Boolean) options.get(REMOTING_POLICY_INCLUDE_SUPER_DIR); + lockidsUri = new HashMap(); + addRule(new ManualSplitRule()); + } + + /** + * By default, include the super-directory but no sub-indices. + */ + @Override + public Map getOptionDefaults() { + Map defaults = super.getOptionDefaults(); + defaults.put(REMOTING_POLICY_INCLUDE_SUPER_DIR, Boolean.TRUE); + defaults.put(REMOTING_POLICY_DIRECTORY_URIS, new ArrayList()); + return defaults; + } + + /** + * Ensure that the value corresponding to the directory URIs is a collection. + */ + public Map getOptionBounds() { + Map bounds = super.getOptionBounds(); + bounds.put(REMOTING_POLICY_DIRECTORY_URIS, new OptionBound() { + public boolean isInbounds(String key, Serializable value) { + return value instanceof Collection; + } + }); + return bounds; + } + + /** + * Load the sub-indices by opening the directories corresponding to the + * {@link #directoryUris}. + * + * @param directory + * the directory on which the split index is based + * @return the collection of directories corresponding to the sub-indices + * @throws IOException + */ + @SuppressWarnings("unchecked") + @Override + protected Collection loadSplits(final Directory directory) + throws IOException { + List subDirectories = new ArrayList(); + directoryUris = (Collection) getOptions().get( + REMOTING_POLICY_DIRECTORY_URIS); + if (directoryUris != null) { + for (URI uri : directoryUris) { + Directory subDirectory = URIDirectoryFactory.open(uri); + lockidsUri.put(subDirectory.getLockID(), uri); + subDirectories.add(subDirectory); + } + } + return subDirectories; + } + + /** + * Let's not remove directories that are not co-located with the + * super-directory. + */ + @Override + protected void clearSplits(Directory directory) throws IOException {} + + // The list of index writers that will participate in this split index. + protected IndexWriter[] writables; + + /** + * @return the index writers for each of the sub-indices, and optionally the + * super-index. + */ + protected IndexWriter[] getWritables() { + if (writables == null) { + if (includeSuper) { + writables = new IndexWriter[splits.size() + 1]; + int index = 0; + writables[index++] = this.writer; + for (IndexWriter subWriter : subWriters.values()) { + writables[index++] = subWriter; + } + } else { + writables = splits.toArray(new IndexWriter[0]); + } + } + return writables; + } + + /** + * When the split index is opened, create the index writers for the + * sub-indices. + */ + @Override + public SplitVote onOpen(SplitWriter writer, Directory directory) + throws IOException { + SplitVote vote = super.onOpen(writer, directory); + for (Directory split : splits) { + subWriters.put(split.getLockID(), new IndexWriter(split, getConfig())); + } + return vote; + } + + /** + * When the split index is closed, close the index writers for the + * sub-indices. + */ + @Override + public SplitVote onClose() throws CorruptIndexException, IOException, + SplitException { + for (IndexWriter writer : subWriters.values()) { + writer.close(); + } + return super.onClose(); + } + + /** + * When one or more reader(s) are added, don't write it to the super-directory + * if {@link includSuper} is configured to be false. + */ + @Override + public SplitVote onAddIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + return includeSuper ? SplitVote.CARRY_ON : SplitVote.DO_NOTHING; + } + + /** + * When a document is added, don't write it to the super-directory if + * {@link includSuper} is configured to be false. + */ + @Override + public SplitVote onAddDocument(Document document, Analyzer analyzer) + throws CorruptIndexException, IOException { + return includeSuper ? SplitVote.CARRY_ON : SplitVote.DO_NOTHING; + } + + /** + * When one or more directory(s) are added, don't write it to the + * super-directory if {@link includSuper} is configured to be false. + */ + @Override + public SplitVote onAddIndexesNoOptimize(Directory[] directories) + throws CorruptIndexException, IOException { + return includeSuper ? SplitVote.CARRY_ON : SplitVote.DO_NOTHING; + } + + /** + * When the split index is cleared, clear each of its sub-indices as well. + */ + public SplitVote onDeleteAll() throws IOException { + for (IndexWriter subWriter : subWriters.values()) { + subWriter.deleteAll(); + } + return SplitVote.CARRY_ON; + } + + /** + * When changes to the split index are committed, commit each of the index + * writers for the sub-indices as well. + */ + @Override + public SplitVote onCommit(Map commitUserData) + throws CorruptIndexException, IOException { + for (IndexWriter subWriter : subWriters.values()) { + subWriter.commit(commitUserData); + } + return super.onCommit(commitUserData); + } + + /** + * When a split is set to occur, remove any sub-directories that do not belong + * in the new set of directory URIs, and add any that do not belong in the + * existing set of directory URIs. + */ + @SuppressWarnings("unchecked") + @Override + protected void doSplit(Map context) + throws CorruptIndexException, IOException { + context = checkAndBalanceOptions(context); + Collection directoryUris = (Collection) context + .get(REMOTING_POLICY_DIRECTORY_URIS); + boolean splitsChanged = false; + if (directoryUris != null) { + for (URI directoryUri : this.directoryUris) { + if (!directoryUris.contains(directoryUri)) { + Directory deletedSplit = null; + for (Directory split : splits) { + if (directoryUri.equals(lockidsUri.get(split.getLockID()))) { + deletedSplit = split; + } + } + if (deletedSplit != null) { + splits.remove(deletedSplit); + splitsChanged = true; + } + } + } + for (URI directoryUri : directoryUris) { + if (!this.directoryUris.contains(directoryUri)) { + splits.add(URIDirectoryFactory.open(directoryUri)); + splitsChanged = true; + } + } + if (splitsChanged) { + this.directoryUris = directoryUris; + writables = null; + } + } + } + + /** + * The URIDirectory maps a {@link URI} to a {@link Directory}. + * + */ + public interface URIDirectory { + /** + * Create a {@link Directory} object for the given {@link URI}. + * + * @param uri + * the URI pointing to the directory + * @return the directory for that URI + * @throws IOException + */ + public Directory open(URI uri) throws IOException; + } + + /** + * The URIDirectoryFactory keeps track of all URI directories + * registered in the system. + */ + public static class URIDirectoryFactory { + // The set of currently registered URI directories. + private static Map uriDirectories = new HashMap(); + + /** + * Open a directory based on its URI + * + * @param uri + * the URI pointing to the directory + * @return the corresponding directory + * @throws IOException + */ + public static Directory open(URI uri) throws IOException { + String scheme = uri.getScheme(); + URIDirectory uriDirectory = uriDirectories.get(scheme); + return uriDirectory.open(uri); + } + + /** + * Register a URI directory with the given URI scheme. + * + * @param scheme + * the scheme of the URI type being registered + * @param uriDirectory + * the URI directory for that URI scheme + * @see {@link URI#getScheme()} + */ + public static void registerURIDirectory(String scheme, + URIDirectory uriDirectory) { + uriDirectories.put(scheme, uriDirectory); + } + + /** + * Un-register a URI directory based on its scheme. + * + * @param scheme + * the scheme of the URI being removed + */ + public static void unregisterURIDirectory(String scheme) { + uriDirectories.remove(scheme); + } + } + + /** + * Register a file-based and ram-based URI directory with the + * {@link URIDirectoryFactory}. + */ + static { + // Cause the file scheme to create a file-system-based directory. + URIDirectoryFactory.registerURIDirectory("file", new URIDirectory() { + public Directory open(URI uri) throws IOException { + return FSDirectory.open(new File(uri.getPath())); + } + }); + + // Cause the ram scheme to create a ram-based directory. Note that if a path + // is specified, it will be treated as a file-system-based directory that + // the ram-based directory will then be backed by. + URIDirectoryFactory.registerURIDirectory("ram", new URIDirectory() { + public Directory open(URI uri) throws IOException { + String path = uri.getPath(); + return (path == null || path.equals("/")) ? new RAMDirectory() + : new RAMDirectory(FSDirectory.open(new File(path))); + } + }); + } + +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestRemotingSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestRemotingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestRemotingSplitPolicy.java (revision 0) @@ -0,0 +1,47 @@ +package org.apache.lucene.index; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; + +import org.apache.lucene.index.IndexWriterConfig.OpenMode; + +/** + * An abstract test case for the {@link RemotingSplitPolicy}. + */ + +public abstract class TestRemotingSplitPolicy extends SplitTestCase { + + /** + * Add a couple of RAM-based URI directory to the remoting split policy. + */ + protected boolean setupWriter() { + try { + ArrayList uris = new ArrayList(); + uris.add(new URI("ram:///?id=1")); + uris.add(new URI("ram:///?id=2")); + options.put(RemotingSplitPolicy.REMOTING_POLICY_DIRECTORY_URIS, uris); + } catch (URISyntaxException e) { + return false; + } + return true; + } + + /** + * Ensure that the writer includes the above RAM-based URI directories. + */ + public void testWriterSetup() throws Exception { + assertEquals(2, writer.getNumSplits()); + } + + /** + * Create writer in append mode. Note that using the default create mode will + * not cause the removal of the remote directories, since strictly speaking, + * they do not belong exclusively to the split index. + */ + @Override + protected OpenMode getOpenMode() { + return OpenMode.CREATE_OR_APPEND; + } + +} Index: contrib/splitindex/src/java/org/apache/lucene/index/ManualSplitRule.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/ManualSplitRule.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/ManualSplitRule.java (revision 0) @@ -0,0 +1,66 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +/** + * The ManualSplitRule causes a split to occur upon request (i.e., + * if and only if it is explicitly told to do so. This rule comes in handy if + * the split policy wants to allow consumers of the split index to add/remove + * sub-indices. + * + * @author Karthick Sankarachary + */ +public class ManualSplitRule extends SplitRuleAdapter implements SplitRule { + + private static final long serialVersionUID = -2770799501612301579L; + + /** + * Construct a manual split rule with no specific configuration options. + */ + public ManualSplitRule() { + super(); + } + + /** + * Cause a split to occur but pass no context. + * + * @throws CorruptIndexException + * @throws IOException + */ + public void split() throws CorruptIndexException, IOException { + splitPolicy.split(); + } + + /** + * Cause a split to occur using the given context + * + * @param context + * a map to propagate to the split policy's {@link doSplit} method + * @throws CorruptIndexException + * @throws IOException + */ + public void split(Map context) + throws CorruptIndexException, IOException { + splitPolicy.split(context); + } + +}