Index: contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java =================================================================== --- contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java (revision 0) @@ -0,0 +1,253 @@ +package org.apache.lucene.index; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.store.Directory; + +/** + * The ShardingSplitPolicy builds on the + * {@link RemotingSplitPolicy}, and treats each sub-index as a shard of the + * split index. Optionally, the super-index on which the {@link SplitWriter} is + * built may also be considered to be a shard. + * + * @author Karthick Sankarachary + */ +public class ShardingSplitPolicy extends RemotingSplitPolicy implements + SplitPolicy { + + // A configuration option specifying the sharding algorithm to use. + public static final String SHARDING_POLICY_ALGORITHM = "sharding.policy.algorithm"; + // A configuration value denoting an algorithm that shards by round-robin. + public static final String SHARDING_POLICY_ALGORITHM_ROUND_ROBIN = "sharding.policy.algorithm.by.count"; + // A configuration value denoting an algorithm that shards by the hash code of + // the index object being added. + public static final String SHARDING_POLICY_ALGORITHM_BY_HASH = "sharding.policy.algorithm.entity.hash"; + + // The sharding algorithm configured for this policy. + private ShardingAlgorithm algorithm; + + /** + * Construct a sharding split policy with the given options. + * + * @param options + * the configuration options for this policy + */ + public ShardingSplitPolicy(Map options) { + super(options); + this.algorithm = ShardingAlgorithmFactory.findAlgorithm((String) options + .get(SHARDING_POLICY_ALGORITHM)); + } + + /** + * Ensure that the sharding algorithm is one of the registered algorithms. + */ + @Override + public Map getOptionBounds() { + Map bounds = super.getOptionBounds(); + bounds.put(SHARDING_POLICY_ALGORITHM, new OptionBound() { + public boolean isInbounds(String key, Serializable value) { + return ShardingAlgorithmFactory.findAlgorithm((String) value) != null; + } + }); + return bounds; + } + + /** + * By default, switch shards based on a round-robin algorithm. + */ + @Override + public Map getOptionDefaults() { + Map defaults = super.getOptionDefaults(); + defaults.put(SHARDING_POLICY_ALGORITHM, + SHARDING_POLICY_ALGORITHM_ROUND_ROBIN); + return defaults; + } + + /** + * When a document is added, re-direct it to the sub-index as determined by + * the {@link #algorithm}. + * + * @param document + * the document being added to the split index + * @param analyzer + * the analyzer to use + * @return a vote that tells the split index to do nothing if the document was + * written to a sub-index, or to carry on if it is to be written to + * the super-index. + */ + @Override + public SplitVote onAddDocument(Document document, Analyzer analyzer) + throws CorruptIndexException, IOException { + IndexWriter shardWriter = findShardFor(document); + if (shardWriter == this.writer) { + return SplitVote.CARRY_ON; + } + shardWriter.addDocument(document, analyzer); + return SplitVote.DO_NOTHING; + } + + /** + * When one of more indices are added, re-direct it to the sub-index as + * determined by the {@link #algorithm}. + * + * @param readers + * the reader(s) being added to the split index + * @return a vote that tells the split index to do nothing if the reader(s) + * was written to a sub-index, or to carry on if it is to be written + * to the super-index. + */ + @Override + public SplitVote onAddIndexes(IndexReader... readers) + throws CorruptIndexException, IOException { + IndexWriter shardWriter = findShardFor(readers); + if (shardWriter == this.writer) { + return SplitVote.CARRY_ON; + } + shardWriter.addIndexes(readers); + return SplitVote.DO_NOTHING; + } + + /** + * When one of more directories are added, re-direct it to the sub-index as + * determined by the {@link #algorithm}. + * + * @param directories + * the directory(s) being added to the split index + * @return a vote that tells the split index to do nothing if the directory(s) + * was written to a sub-index, or to carry on if it is to be written + * to the super-index. + */ + @Override + public SplitVote onAddIndexesNoOptimize(Directory[] directories) + throws CorruptIndexException, IOException { + IndexWriter shardWriter = findShardFor(directories); + if (shardWriter == this.writer) { + return SplitVote.CARRY_ON; + } + shardWriter.addIndexesNoOptimize(directories); + return SplitVote.DO_NOTHING; + } + + /** + * Find the shard for the given entity being added to the split index. + * + * @param entity + * a document, reader(s) or directory(s) being added + * @return the shard to which the entity is to be written + */ + private IndexWriter findShardFor(Object entity) { + return algorithm.match(entity, getWritables()); + } + + /** + * The ShardingAlgorithm determines the shard to which the entity + * being added must be re-directed. + * + */ + public interface ShardingAlgorithm { + /** + * @param entity + * a document, reader(s) or directory(s) being added + * @param shards + * the set of shards from which to pick the matching shard + * @return the shard where the entity must be written + */ + public IndexWriter match(Object entity, IndexWriter[] shards); + } + + /** + * The ShardingAlgorithmFactory keeps track of all sharding + * algorithms registered in the system. + */ + public static class ShardingAlgorithmFactory { + // The set of currently registered sharding algorithms. + private static Map algorithms = new HashMap(); + + /** + * Register a sharding algorithm against the given name. + * + * @param name + * the name of the sharding algorithm + * @param algorithm + * an implementation of {@link ShardingAlgorithm} + */ + public static void registerAlgorithm(String name, + ShardingAlgorithm algorithm) { + algorithms.put(name, algorithm); + } + + /** + * Finds the sharding algorithm corresponding to the given name. + * + * @param algorithm + * the name of the sharding algorithm being looked up + * @return the matching sharding algorithm + */ + public static ShardingAlgorithm findAlgorithm(String algorithm) { + return algorithms.get(algorithm); + } + + /** + * Un-register a sharding algorithm of the given name. + * + * @param name + * the name of the sharding algorithm to remove + */ + public static void unregisterAlgorithm(String name) { + algorithms.remove(name); + } + } + + /** + * Register a round-robin and hash-based sharding algorithm with the + * {@link ShardingAlgorithmFactory}. + */ + static { + ShardingAlgorithmFactory.registerAlgorithm( + SHARDING_POLICY_ALGORITHM_ROUND_ROBIN, new ShardingAlgorithm() { + private int current = 0; + + public IndexWriter match(Object entity, IndexWriter[] shards) { + IndexWriter match = null; + if (shards != null && shards.length > 0) { + match = shards[current]; + current = ++current % shards.length; + } + return match; + } + }); + + ShardingAlgorithmFactory.registerAlgorithm( + SHARDING_POLICY_ALGORITHM_BY_HASH, new ShardingAlgorithm() { + public IndexWriter match(Object entity, IndexWriter[] shards) { + int hash = entity.hashCode(); + int index = hash & shards.length; + return shards != null && index >= 0 && index < shards.length ? shards[index] + : null; + } + }); + } + +} Index: contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java =================================================================== --- contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java (revision 0) +++ contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java (revision 0) @@ -0,0 +1,68 @@ +package org.apache.lucene.index; + +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; + +/** + * Test cases for the {@link ShardingSplitPolicy}. + */ +public class TestShardingSplitPolicy extends TestRemotingSplitPolicy { + + /** + * Indicate that we're testing the sharding split policy. + */ + @Override + protected Class getSplitPolicyClass() { + return ShardingSplitPolicy.class; + } + + /** + *Verify that only one of the shards has a copy of the initial sample term. + * + * @throws Exception + */ + public void testReadFromShard() throws Exception { + SplitReader splitReader = (SplitReader) getPrimaryReader(); + int totalHits = 0; + for (IndexReader subReader : splitReader.getSequentialSubReaders()) { + totalHits += getHits(new TermQuery(new Term("name1", "value1")), + Integer.MAX_VALUE, new IndexSearcher(subReader)); + } + assertEquals(1, totalHits); + } + + /** + * Verify that the split reader returns exactly one hit for the initial term, + * which implies that it exists in only one of the splits. + * + * @throws Exception + */ + public void testReadFromSplit() throws Exception { + assertHits(1, new Term("name1", "value1"), getPrimarySearcher()); + } + + public void testManualSharding() throws Exception { + ShardingSplitPolicy splitPolicy = (ShardingSplitPolicy) writer + .getSplitPolicy(); + ManualSplitRule manualSplitRule = (ManualSplitRule) splitPolicy + .getRule(ManualSplitRule.class); + Map context = new HashMap(); + // Create a revised set of splits, that includes one new URI directory, + // namely "ram:///?id=3". + ArrayList uris = new ArrayList(); + uris.add(new URI("ram:///?id=1")); + uris.add(new URI("ram:///?id=2")); + uris.add(new URI("ram:///?id=3")); + context.put(RemotingSplitPolicy.REMOTING_POLICY_DIRECTORY_URIS, uris); + manualSplitRule.split(context); + + assertEquals(3, splitPolicy.getSplits().size()); + } + +}