diff --git a/src/java/org/apache/hcatalog/common/HCatConstants.java b/src/java/org/apache/hcatalog/common/HCatConstants.java index 264ecbd..21af8c0 100644 --- a/src/java/org/apache/hcatalog/common/HCatConstants.java +++ b/src/java/org/apache/hcatalog/common/HCatConstants.java @@ -62,6 +62,16 @@ public final class HCatConstants { public static final String HCAT_METASTORE_PRINCIPAL = HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname; + /** + * The desired number of input splits produced for each partition. When the + * input files are large and few, we want to split them into many splits, + * so as to increase the parallelizm of loading the splits. Try also two + * other parameters, mapred.min.split.size and mapred.max.split.size, to + * control the number of input splits. + */ + public static final String HCAT_DESIRED_PARTITION_NUM_SPLITS = + "hcat.desired.partition.num.splits"; + // IMPORTANT IMPORTANT IMPORTANT!!!!! //The keys used to store info into the job Configuration. //If any new keys are added, the HCatStorer needs to be updated. The HCatStorer diff --git a/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java b/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java index 268167e..da67dcf 100644 --- a/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -146,11 +146,17 @@ public abstract class HCatBaseInputFormat org.apache.hadoop.mapred.InputFormat inputFormat = getMapRedInputFormat(jobConf, inputFormatClass); - //Call getSplit on the InputFormat, create an - //HCatSplit for each underlying split - //NumSplits is 0 for our purposes + //Call getSplit on the InputFormat, create an HCatSplit for each + //underlying split. When the desired number of input splits is missing, + //use a default number (denoted by zero). + //TODO(malewicz): Currently each partition is split independently into + //a desired number. However, we want the union of all partitions to be + //split into a desired number while maintaining balanced sizes of input + //splits. + int desiredNumSplits = + conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0); org.apache.hadoop.mapred.InputSplit[] baseSplits = - inputFormat.getSplits(jobConf, 0); + inputFormat.getSplits(jobConf, desiredNumSplits); for(org.apache.hadoop.mapred.InputSplit split : baseSplits) { splits.add(new HCatSplit(