Index: contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/java/org/apache/lucene/index/ShardingSplitPolicy.java (revision 0)
@@ -0,0 +1,253 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.Directory;
+
+/**
+ * The ShardingSplitPolicy builds on the
+ * {@link RemotingSplitPolicy}, and treats each sub-index as a shard of the
+ * split index. Optionally, the super-index on which the {@link SplitWriter} is
+ * built may also be considered to be a shard.
+ *
+ * @author Karthick Sankarachary
+ */
+public class ShardingSplitPolicy extends RemotingSplitPolicy implements
+ SplitPolicy {
+
+ // A configuration option specifying the sharding algorithm to use.
+ public static final String SHARDING_POLICY_ALGORITHM = "sharding.policy.algorithm";
+ // A configuration value denoting an algorithm that shards by round-robin.
+ public static final String SHARDING_POLICY_ALGORITHM_ROUND_ROBIN = "sharding.policy.algorithm.by.count";
+ // A configuration value denoting an algorithm that shards by the hash code of
+ // the index object being added.
+ public static final String SHARDING_POLICY_ALGORITHM_BY_HASH = "sharding.policy.algorithm.entity.hash";
+
+ // The sharding algorithm configured for this policy.
+ private ShardingAlgorithm algorithm;
+
+ /**
+ * Construct a sharding split policy with the given options.
+ *
+ * @param options
+ * the configuration options for this policy
+ */
+ public ShardingSplitPolicy(Map options) {
+ super(options);
+ this.algorithm = ShardingAlgorithmFactory.findAlgorithm((String) options
+ .get(SHARDING_POLICY_ALGORITHM));
+ }
+
+ /**
+ * Ensure that the sharding algorithm is one of the registered algorithms.
+ */
+ @Override
+ public Map getOptionBounds() {
+ Map bounds = super.getOptionBounds();
+ bounds.put(SHARDING_POLICY_ALGORITHM, new OptionBound() {
+ public boolean isInbounds(String key, Serializable value) {
+ return ShardingAlgorithmFactory.findAlgorithm((String) value) != null;
+ }
+ });
+ return bounds;
+ }
+
+ /**
+ * By default, switch shards based on a round-robin algorithm.
+ */
+ @Override
+ public Map getOptionDefaults() {
+ Map defaults = super.getOptionDefaults();
+ defaults.put(SHARDING_POLICY_ALGORITHM,
+ SHARDING_POLICY_ALGORITHM_ROUND_ROBIN);
+ return defaults;
+ }
+
+ /**
+ * When a document is added, re-direct it to the sub-index as determined by
+ * the {@link #algorithm}.
+ *
+ * @param document
+ * the document being added to the split index
+ * @param analyzer
+ * the analyzer to use
+ * @return a vote that tells the split index to do nothing if the document was
+ * written to a sub-index, or to carry on if it is to be written to
+ * the super-index.
+ */
+ @Override
+ public SplitVote onAddDocument(Document document, Analyzer analyzer)
+ throws CorruptIndexException, IOException {
+ IndexWriter shardWriter = findShardFor(document);
+ if (shardWriter == this.writer) {
+ return SplitVote.CARRY_ON;
+ }
+ shardWriter.addDocument(document, analyzer);
+ return SplitVote.DO_NOTHING;
+ }
+
+ /**
+ * When one of more indices are added, re-direct it to the sub-index as
+ * determined by the {@link #algorithm}.
+ *
+ * @param readers
+ * the reader(s) being added to the split index
+ * @return a vote that tells the split index to do nothing if the reader(s)
+ * was written to a sub-index, or to carry on if it is to be written
+ * to the super-index.
+ */
+ @Override
+ public SplitVote onAddIndexes(IndexReader... readers)
+ throws CorruptIndexException, IOException {
+ IndexWriter shardWriter = findShardFor(readers);
+ if (shardWriter == this.writer) {
+ return SplitVote.CARRY_ON;
+ }
+ shardWriter.addIndexes(readers);
+ return SplitVote.DO_NOTHING;
+ }
+
+ /**
+ * When one of more directories are added, re-direct it to the sub-index as
+ * determined by the {@link #algorithm}.
+ *
+ * @param directories
+ * the directory(s) being added to the split index
+ * @return a vote that tells the split index to do nothing if the directory(s)
+ * was written to a sub-index, or to carry on if it is to be written
+ * to the super-index.
+ */
+ @Override
+ public SplitVote onAddIndexesNoOptimize(Directory[] directories)
+ throws CorruptIndexException, IOException {
+ IndexWriter shardWriter = findShardFor(directories);
+ if (shardWriter == this.writer) {
+ return SplitVote.CARRY_ON;
+ }
+ shardWriter.addIndexesNoOptimize(directories);
+ return SplitVote.DO_NOTHING;
+ }
+
+ /**
+ * Find the shard for the given entity being added to the split index.
+ *
+ * @param entity
+ * a document, reader(s) or directory(s) being added
+ * @return the shard to which the entity is to be written
+ */
+ private IndexWriter findShardFor(Object entity) {
+ return algorithm.match(entity, getWritables());
+ }
+
+ /**
+ * The ShardingAlgorithm determines the shard to which the entity
+ * being added must be re-directed.
+ *
+ */
+ public interface ShardingAlgorithm {
+ /**
+ * @param entity
+ * a document, reader(s) or directory(s) being added
+ * @param shards
+ * the set of shards from which to pick the matching shard
+ * @return the shard where the entity must be written
+ */
+ public IndexWriter match(Object entity, IndexWriter[] shards);
+ }
+
+ /**
+ * The ShardingAlgorithmFactory keeps track of all sharding
+ * algorithms registered in the system.
+ */
+ public static class ShardingAlgorithmFactory {
+ // The set of currently registered sharding algorithms.
+ private static Map algorithms = new HashMap();
+
+ /**
+ * Register a sharding algorithm against the given name.
+ *
+ * @param name
+ * the name of the sharding algorithm
+ * @param algorithm
+ * an implementation of {@link ShardingAlgorithm}
+ */
+ public static void registerAlgorithm(String name,
+ ShardingAlgorithm algorithm) {
+ algorithms.put(name, algorithm);
+ }
+
+ /**
+ * Finds the sharding algorithm corresponding to the given name.
+ *
+ * @param algorithm
+ * the name of the sharding algorithm being looked up
+ * @return the matching sharding algorithm
+ */
+ public static ShardingAlgorithm findAlgorithm(String algorithm) {
+ return algorithms.get(algorithm);
+ }
+
+ /**
+ * Un-register a sharding algorithm of the given name.
+ *
+ * @param name
+ * the name of the sharding algorithm to remove
+ */
+ public static void unregisterAlgorithm(String name) {
+ algorithms.remove(name);
+ }
+ }
+
+ /**
+ * Register a round-robin and hash-based sharding algorithm with the
+ * {@link ShardingAlgorithmFactory}.
+ */
+ static {
+ ShardingAlgorithmFactory.registerAlgorithm(
+ SHARDING_POLICY_ALGORITHM_ROUND_ROBIN, new ShardingAlgorithm() {
+ private int current = 0;
+
+ public IndexWriter match(Object entity, IndexWriter[] shards) {
+ IndexWriter match = null;
+ if (shards != null && shards.length > 0) {
+ match = shards[current];
+ current = ++current % shards.length;
+ }
+ return match;
+ }
+ });
+
+ ShardingAlgorithmFactory.registerAlgorithm(
+ SHARDING_POLICY_ALGORITHM_BY_HASH, new ShardingAlgorithm() {
+ public IndexWriter match(Object entity, IndexWriter[] shards) {
+ int hash = entity.hashCode();
+ int index = hash & shards.length;
+ return shards != null && index >= 0 && index < shards.length ? shards[index]
+ : null;
+ }
+ });
+ }
+
+}
Index: contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java
===================================================================
--- contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java (revision 0)
+++ contrib/splitindex/src/test/org/apache/lucene/index/TestShardingSplitPolicy.java (revision 0)
@@ -0,0 +1,68 @@
+package org.apache.lucene.index;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+
+/**
+ * Test cases for the {@link ShardingSplitPolicy}.
+ */
+public class TestShardingSplitPolicy extends TestRemotingSplitPolicy {
+
+ /**
+ * Indicate that we're testing the sharding split policy.
+ */
+ @Override
+ protected Class extends SplitPolicy> getSplitPolicyClass() {
+ return ShardingSplitPolicy.class;
+ }
+
+ /**
+ *Verify that only one of the shards has a copy of the initial sample term.
+ *
+ * @throws Exception
+ */
+ public void testReadFromShard() throws Exception {
+ SplitReader splitReader = (SplitReader) getPrimaryReader();
+ int totalHits = 0;
+ for (IndexReader subReader : splitReader.getSequentialSubReaders()) {
+ totalHits += getHits(new TermQuery(new Term("name1", "value1")),
+ Integer.MAX_VALUE, new IndexSearcher(subReader));
+ }
+ assertEquals(1, totalHits);
+ }
+
+ /**
+ * Verify that the split reader returns exactly one hit for the initial term,
+ * which implies that it exists in only one of the splits.
+ *
+ * @throws Exception
+ */
+ public void testReadFromSplit() throws Exception {
+ assertHits(1, new Term("name1", "value1"), getPrimarySearcher());
+ }
+
+ public void testManualSharding() throws Exception {
+ ShardingSplitPolicy splitPolicy = (ShardingSplitPolicy) writer
+ .getSplitPolicy();
+ ManualSplitRule manualSplitRule = (ManualSplitRule) splitPolicy
+ .getRule(ManualSplitRule.class);
+ Map context = new HashMap();
+ // Create a revised set of splits, that includes one new URI directory,
+ // namely "ram:///?id=3".
+ ArrayList uris = new ArrayList();
+ uris.add(new URI("ram:///?id=1"));
+ uris.add(new URI("ram:///?id=2"));
+ uris.add(new URI("ram:///?id=3"));
+ context.put(RemotingSplitPolicy.REMOTING_POLICY_DIRECTORY_URIS, uris);
+ manualSplitRule.split(context);
+
+ assertEquals(3, splitPolicy.getSplits().size());
+ }
+
+}