Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 1088810) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (working copy) @@ -46,64 +46,12 @@ * one mapper. */ public class BucketizedHiveInputFormat - extends HiveInputFormat { + extends HiveCombineSplitsInputFormat { public static final Log LOG = LogFactory .getLog("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat"); @Override - public RecordReader getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - BucketizedHiveInputSplit hsplit = (BucketizedHiveInputSplit) split; - - String inputFormatClassName = null; - Class inputFormatClass = null; - try { - inputFormatClassName = hsplit.inputFormatClassName(); - inputFormatClass = job.getClassByName(inputFormatClassName); - } catch (Exception e) { - throw new IOException("cannot find class " + inputFormatClassName); - } - - // clone a jobConf for setting needed columns for reading - JobConf cloneJobConf = new JobConf(job); - pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() - .toString(), hsplit.getPath().toUri().getPath()); - - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, - cloneJobConf); - BucketizedHiveRecordReader rr= new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf, - reporter); - rr.initIOContext(hsplit, cloneJobConf, inputFormatClass); - return rr; - } - - protected FileStatus[] listStatus(JobConf job, Path path) throws IOException { - ArrayList result = new ArrayList(); - List errors = new ArrayList(); - - FileSystem fs = path.getFileSystem(job); - FileStatus[] matches = fs.globStatus(path); - if (matches == null) { - errors.add(new IOException("Input path does not exist: " + path)); - } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + path + " matches 0 files")); - } else { - for (FileStatus globStat : matches) { - FileUtils.listStatusRecursively(fs, globStat, result); - } - } - - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result.toArray(new FileStatus[result.size()]); - - } - - @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { init(job); @@ -135,13 +83,13 @@ InputSplit[] iss = inputFormat.getSplits(newjob, 0); if (iss != null && iss.length > 0) { numOrigSplits += iss.length; - result.add(new BucketizedHiveInputSplit(iss, inputFormatClass + result.add(new HiveCombineSplit(iss, inputFormatClass .getName())); } } } LOG.info(result.size() + " bucketized splits generated from " + numOrigSplits + " original splits."); - return result.toArray(new BucketizedHiveInputSplit[result.size()]); + return result.toArray(new HiveCombineSplit[result.size()]); } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java (revision 1088810) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java (working copy) @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.ql.exec.ExecMapper; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; -import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * HiveInputSplit encapsulates an InputSplit with its corresponding - * inputFormatClass. The reason that it derives from FileSplit is to make sure - * "map.input.file" in MapTask. - */ -public class BucketizedHiveInputSplit extends HiveInputSplit { - - protected InputSplit[] inputSplits; - protected String inputFormatClassName; - - public String getInputFormatClassName() { - return inputFormatClassName; - } - - public void setInputFormatClassName(String inputFormatClassName) { - this.inputFormatClassName = inputFormatClassName; - } - - public BucketizedHiveInputSplit() { - // This is the only public constructor of FileSplit - super(); - } - - public BucketizedHiveInputSplit(InputSplit[] inputSplits, - String inputFormatClassName) { - // This is the only public constructor of FileSplit - super(); - - assert (inputSplits != null && inputSplits.length > 0); - this.inputSplits = inputSplits; - this.inputFormatClassName = inputFormatClassName; - } - - public int getNumSplits() { - return inputSplits.length; - } - - public InputSplit getSplit(int idx) { - assert (idx >= 0 && idx < inputSplits.length); - return inputSplits[idx]; - } - - public String inputFormatClassName() { - return inputFormatClassName; - } - - @Override - public Path getPath() { - if (inputSplits != null && inputSplits.length > 0 - && inputSplits[0] instanceof FileSplit) { - return ((FileSplit) inputSplits[0]).getPath(); - } - return new Path(""); - } - - /** The position of the first byte in the file to process. */ - @Override - public long getStart() { - if (inputSplits != null && inputSplits.length > 0 - && inputSplits[0] instanceof FileSplit) { - return ((FileSplit) inputSplits[0]).getStart(); - } - return 0; - } - - @Override - public String toString() { - if (inputSplits != null && inputSplits.length > 0) { - return inputFormatClassName + ":" + inputSplits[0].toString(); - } - return inputFormatClassName + ":null"; - } - - @Override - public long getLength() { - long r = 0; - if (inputSplits != null) { - try { - for (InputSplit inputSplit : inputSplits) { - r += inputSplit.getLength(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return r; - } - - public long getLength(int idx) { - if (inputSplits != null) { - try { - return inputSplits[idx].getLength(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - return -1; - } - - @Override - public String[] getLocations() throws IOException { - assert (inputSplits != null && inputSplits.length > 0); - return inputSplits[0].getLocations(); - } - - @Override - public void readFields(DataInput in) throws IOException { - String inputSplitClassName = in.readUTF(); - - int numSplits = in.readInt(); - inputSplits = new InputSplit[numSplits]; - for (int i = 0; i < numSplits; i++) { - try { - inputSplits[i] = (InputSplit) ReflectionUtils.newInstance(conf - .getClassByName(inputSplitClassName), conf); - } catch (Exception e) { - throw new IOException( - "Cannot create an instance of InputSplit class = " - + inputSplitClassName + ":" + e.getMessage()); - } - inputSplits[i].readFields(in); - } - inputFormatClassName = in.readUTF(); - } - - @Override - public void write(DataOutput out) throws IOException { - assert (inputSplits != null && inputSplits.length > 0); - out.writeUTF(inputSplits[0].getClass().getName()); - out.writeInt(inputSplits.length); - for (InputSplit inputSplit : inputSplits) { - inputSplit.write(out); - } - out.writeUTF(inputFormatClassName); - } -} Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 1088810) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (working copy) @@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; @@ -35,63 +34,12 @@ * file. */ public class BucketizedHiveRecordReader - extends HiveContextAwareRecordReader { - protected final BucketizedHiveInputSplit split; - protected final InputFormat inputFormat; - protected final JobConf jobConf; - protected final Reporter reporter; - protected RecordReader curReader; - protected long progress; - protected int idx; + extends HiveCombineSplitRecordReader { public BucketizedHiveRecordReader(InputFormat inputFormat, - BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf, - Reporter reporter) throws IOException { - this.split = bucketizedSplit; - this.inputFormat = inputFormat; - this.jobConf = jobConf; - this.reporter = reporter; - initNextRecordReader(); - } - - public void doClose() throws IOException { - if (curReader != null) { - curReader.close(); - curReader = null; - } - idx = 0; - } - - public K createKey() { - return (K) curReader.createKey(); - } - - public V createValue() { - return (V) curReader.createValue(); - } - - public long getPos() throws IOException { - if (curReader != null) { - return curReader.getPos(); - } else { - return 0; - } - } - - public float getProgress() throws IOException { - // The calculation is strongly dependent on the assumption that all splits - // came from the same file - return Math.min(1.0f, ((curReader == null) ? progress : curReader.getPos()) - / (float) (split.getLength())); - } - - public boolean doNext(K key, V value) throws IOException { - while ((curReader == null) || !curReader.next(key, value)) { - if (!initNextRecordReader()) { - return false; - } - } - return true; + HiveCombineSplit combineSplit, JobConf jobConf, Reporter reporter) + throws IOException { + super(inputFormat, combineSplit, jobConf, reporter); } /** @@ -99,27 +47,6 @@ * BucketizedHiveRecordReader. */ protected boolean initNextRecordReader() throws IOException { - if (curReader != null) { - curReader.close(); - curReader = null; - if (idx > 0) { - progress += split.getLength(idx - 1); // done processing so far - } - } - - // if all chunks have been processed, nothing more to do. - if (idx == split.getNumSplits()) { - return false; - } - - // get a record reader for the idx-th chunk - try { - curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf, - reporter); - } catch (Exception e) { - throw new RuntimeException(e); - } - idx++; - return true; + return super.initNextRecordReaderBase(); } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplit.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplit.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplit.java (revision 0) @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.util.ReflectionUtils; + +public class HiveCombineSplit extends HiveInputSplit { + + protected InputSplit[] inputSplits; + protected String inputFormatClassName; + + public String getInputFormatClassName() { + return inputFormatClassName; + } + + public void setInputFormatClassName(String inputFormatClassName) { + this.inputFormatClassName = inputFormatClassName; + } + + public HiveCombineSplit() { + // This is the only public constructor of FileSplit + super(); + } + + public HiveCombineSplit(InputSplit[] inputSplits, + String inputFormatClassName) { + // This is the only public constructor of FileSplit + super(); + + assert (inputSplits != null && inputSplits.length > 0); + this.inputSplits = inputSplits; + this.inputFormatClassName = inputFormatClassName; + } + + public int getNumSplits() { + return inputSplits.length; + } + + public InputSplit getSplit(int idx) { + assert (idx >= 0 && idx < inputSplits.length); + return inputSplits[idx]; + } + + public String inputFormatClassName() { + return inputFormatClassName; + } + + @Override + public Path getPath() { + if (inputSplits != null && inputSplits.length > 0 + && inputSplits[0] instanceof FileSplit) { + return ((FileSplit) inputSplits[0]).getPath(); + } + return new Path(""); + } + + /** The position of the first byte in the file to process. */ + @Override + public long getStart() { + if (inputSplits != null && inputSplits.length > 0 + && inputSplits[0] instanceof FileSplit) { + return ((FileSplit) inputSplits[0]).getStart(); + } + return 0; + } + + @Override + public String toString() { + if (inputSplits != null && inputSplits.length > 0) { + return inputFormatClassName + ":" + inputSplits[0].toString(); + } + return inputFormatClassName + ":null"; + } + + @Override + public long getLength() { + long r = 0; + if (inputSplits != null) { + try { + for (InputSplit inputSplit : inputSplits) { + r += inputSplit.getLength(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return r; + } + + public long getLength(int idx) { + if (inputSplits != null) { + try { + return inputSplits[idx].getLength(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return -1; + } + + @Override + public String[] getLocations() throws IOException { + assert (inputSplits != null && inputSplits.length > 0); + return inputSplits[0].getLocations(); + } + + @Override + public void readFields(DataInput in) throws IOException { + String inputSplitClassName = in.readUTF(); + + int numSplits = in.readInt(); + inputSplits = new InputSplit[numSplits]; + for (int i = 0; i < numSplits; i++) { + try { + inputSplits[i] = (InputSplit) ReflectionUtils.newInstance(conf + .getClassByName(inputSplitClassName), conf); + } catch (Exception e) { + throw new IOException( + "Cannot create an instance of InputSplit class = " + + inputSplitClassName + ":" + e.getMessage()); + } + inputSplits[i].readFields(in); + } + inputFormatClassName = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + assert (inputSplits != null && inputSplits.length > 0); + out.writeUTF(inputSplits[0].getClass().getName()); + out.writeInt(inputSplits.length); + for (InputSplit inputSplit : inputSplits) { + inputSplit.write(out); + } + out.writeUTF(inputFormatClassName); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitRecordReader.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitRecordReader.java (revision 0) @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class HiveCombineSplitRecordReader + extends HiveContextAwareRecordReader { + + protected final HiveCombineSplit split; + protected final InputFormat inputFormat; + protected final JobConf jobConf; + protected final Reporter reporter; + protected RecordReader curReader; + protected long progress; + protected int idx; + + public HiveCombineSplitRecordReader(InputFormat inputFormat, + HiveCombineSplit combineSplit, JobConf jobConf, + Reporter reporter) throws IOException { + this.split = combineSplit; + this.inputFormat = inputFormat; + this.jobConf = jobConf; + this.reporter = reporter; + initNextRecordReader(); + } + + public void doClose() throws IOException { + if (curReader != null) { + curReader.close(); + curReader = null; + } + idx = 0; + } + + public K createKey() { + return (K) curReader.createKey(); + } + + public V createValue() { + return (V) curReader.createValue(); + } + + public long getPos() throws IOException { + if (curReader != null) { + return curReader.getPos(); + } else { + return 0; + } + } + + public float getProgress() throws IOException { + // The calculation is strongly dependent on the assumption that all splits + // came from the same file + return Math.min(1.0f, ((curReader == null) ? progress : curReader.getPos()) + / (float) (split.getLength())); + } + + public boolean doNext(K key, V value) throws IOException { + while ((curReader == null) || !curReader.next(key, value)) { + if (!initNextRecordReader()) { + return false; + } + } + return true; + } + + /** + * Get the record reader for the next chunk in this + * CombineSplitHiveRecordReader. + */ + protected boolean initNextRecordReaderBase() throws IOException { + if (curReader != null) { + curReader.close(); + curReader = null; + if (idx > 0) { + progress += split.getLength(idx - 1); // done processing so far + } + } + + // if all chunks have been processed, nothing more to do. + if (idx == split.getNumSplits()) { + return false; + } + + // get a record reader for the idx-th chunk + try { + curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf, + reporter); + } catch (Exception e) { + throw new RuntimeException(e); + } + + idx++; + return true; + } + + /** + * Get the record reader for the next chunk in this + * CombineSplitHiveRecordReader. + */ + protected boolean initNextRecordReader() throws IOException { + boolean ret = initNextRecordReaderBase(); + if (ret) { + this.getIOContext().setInputFile(split.getPath().toString()); + } + return ret; + } + +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitsInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitsInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveCombineSplitsInputFormat.java (revision 0) @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InvalidInputException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class HiveCombineSplitsInputFormat + extends HiveInputFormat { + + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hive.ql.io.HiveCombineSplitsInputFormat"); + + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + HiveCombineSplit hsplit = (HiveCombineSplit) split; + + String inputFormatClassName = null; + Class inputFormatClass = null; + try { + inputFormatClassName = hsplit.inputFormatClassName(); + inputFormatClass = job.getClassByName(inputFormatClassName); + } catch (Exception e) { + throw new IOException("cannot find class " + inputFormatClassName); + } + + // clone a jobConf for setting needed columns for reading + JobConf cloneJobConf = new JobConf(job); + pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() + .toString(), hsplit.getPath().toUri().getPath()); + + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, + cloneJobConf); + HiveCombineSplitRecordReader rr = new HiveCombineSplitRecordReader( + inputFormat, hsplit, cloneJobConf, reporter); + rr.initIOContext(hsplit, cloneJobConf, inputFormatClass); + return rr; + } + + protected FileStatus[] listStatus(JobConf job, Path path) throws IOException { + ArrayList result = new ArrayList(); + List errors = new ArrayList(); + + FileSystem fs = path.getFileSystem(job); + FileStatus[] matches = fs.globStatus(path); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + path)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + path + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + FileUtils.listStatusRecursively(fs, globStat, result); + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result.toArray(new FileStatus[result.size()]); + + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + init(job); + + Path[] dirs = FileInputFormat.getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + JobConf newjob = new JobConf(job); + ArrayList result = new ArrayList(); + + long minSize = job.getLong("mapred.min.split.size", 256 * 1024 * 1024); + + // dir is for each partition + for (Path dir : dirs) { + PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + + List currentSplit = new ArrayList(); + long currentSplitSize = 0; + + FileStatus[] listStatus = listStatus(newjob, dir); + + for (FileStatus status : listStatus) { + FileInputFormat.setInputPaths(newjob, status.getPath()); + InputSplit[] iss = inputFormat.getSplits(newjob, 0); + if (iss != null && iss.length > 0) { + for (InputSplit inputSplitTmp : iss) { + currentSplit.add(inputSplitTmp); + currentSplitSize += inputSplitTmp.getLength(); + if (currentSplitSize > minSize) { + result.add(new HiveCombineSplit(currentSplit + .toArray(new InputSplit[currentSplit.size()]), + inputFormatClass.getName())); + currentSplit.clear(); + currentSplitSize = 0; + } + } + } + } + + if (currentSplit != null && currentSplit.size() > 0) { + result.add(new HiveCombineSplit(currentSplit + .toArray(new InputSplit[currentSplit.size()]), inputFormatClass + .getName())); + } + } + return result.toArray(new HiveCombineSplit[result.size()]); + } + +} Index: ql/src/test/queries/clientpositive/combine_split.q =================================================================== --- ql/src/test/queries/clientpositive/combine_split.q (revision 0) +++ ql/src/test/queries/clientpositive/combine_split.q (revision 0) @@ -0,0 +1,27 @@ +set hive.enforce.bucketing = true; +set hive.exec.reducers.max = 1; + +CREATE TABLE combine_split_bucket(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +CREATE TABLE combine_split_bucket_part(key int, value string) PARTITIONED BY (ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; + +CREATE TABLE srcbucket_part (key int, value string) PARTITIONED BY (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08'); +load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08'); + +set hive.exec.compress.output=true; +insert overwrite table combine_split_bucket +select key, value from srcbucket_part where ds='2008-04-08'; + +insert overwrite table combine_split_bucket_part partition(ds='01') +select key, value from srcbucket_part where ds='2008-04-08'; + +insert overwrite table combine_split_bucket_part partition(ds='02') +select key, value from srcbucket_part where ds='2008-04-08'; + +set hive.input.format=org.apache.hadoop.hive.ql.io.HiveCombineSplitsInputFormat; + +select key, INPUT__FILE__NAME from srcbucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME limit 40; + +select key, INPUT__FILE__NAME, ds from combine_split_bucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME, ds limit 40; \ No newline at end of file Index: ql/src/test/results/clientpositive/combine_split.q.out =================================================================== --- ql/src/test/results/clientpositive/combine_split.q.out (revision 0) +++ ql/src/test/results/clientpositive/combine_split.q.out (revision 0) @@ -0,0 +1,148 @@ +PREHOOK: query: CREATE TABLE combine_split_bucket(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE combine_split_bucket(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@combine_split_bucket +PREHOOK: query: CREATE TABLE combine_split_bucket_part(key int, value string) PARTITIONED BY (ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE combine_split_bucket_part(key int, value string) PARTITIONED BY (ds STRING) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@combine_split_bucket_part +PREHOOK: query: CREATE TABLE srcbucket_part (key int, value string) PARTITIONED BY (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE srcbucket_part (key int, value string) PARTITIONED BY (ds string) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket_part +PREHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_part +POSTHOOK: query: load data local inpath '../data/files/srcbucket20.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_part +POSTHOOK: Output: default@srcbucket_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket21.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket22.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_part@ds=2008-04-08 +PREHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +PREHOOK: type: LOAD +PREHOOK: Output: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: query: load data local inpath '../data/files/srcbucket23.txt' INTO TABLE srcbucket_part partition(ds='2008-04-08') +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket_part@ds=2008-04-08 +PREHOOK: query: insert overwrite table combine_split_bucket +select key, value from srcbucket_part where ds='2008-04-08' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Output: default@combine_split_bucket +POSTHOOK: query: insert overwrite table combine_split_bucket +select key, value from srcbucket_part where ds='2008-04-08' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Output: default@combine_split_bucket +POSTHOOK: Lineage: combine_split_bucket.key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket.value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: insert overwrite table combine_split_bucket_part partition(ds='01') +select key, value from srcbucket_part where ds='2008-04-08' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Output: default@combine_split_bucket_part@ds=01 +POSTHOOK: query: insert overwrite table combine_split_bucket_part partition(ds='01') +select key, value from srcbucket_part where ds='2008-04-08' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Output: default@combine_split_bucket_part@ds=01 +POSTHOOK: Lineage: combine_split_bucket.key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket.value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: insert overwrite table combine_split_bucket_part partition(ds='02') +select key, value from srcbucket_part where ds='2008-04-08' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Output: default@combine_split_bucket_part@ds=02 +POSTHOOK: query: insert overwrite table combine_split_bucket_part partition(ds='02') +select key, value from srcbucket_part where ds='2008-04-08' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Output: default@combine_split_bucket_part@ds=02 +POSTHOOK: Lineage: combine_split_bucket.key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket.value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: select key, INPUT__FILE__NAME from srcbucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME limit 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@srcbucket_part@ds=2008-04-08 +PREHOOK: Output: file:/var/folders/6g/6grtCwPMEf4sqHUPpy6xQG9ByHg/-Tmp-/heyongqiang/hive_2011-04-04_18-10-09_500_6344934922281906563/-mr-10000 +POSTHOOK: query: select key, INPUT__FILE__NAME from srcbucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcbucket_part@ds=2008-04-08 +POSTHOOK: Output: file:/var/folders/6g/6grtCwPMEf4sqHUPpy6xQG9ByHg/-Tmp-/heyongqiang/hive_2011-04-04_18-10-09_500_6344934922281906563/-mr-10000 +POSTHOOK: Lineage: combine_split_bucket.key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket.value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +0 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +0 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +0 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +2 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +4 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +5 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +5 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +5 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +8 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +9 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +10 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket23.txt +11 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +12 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +12 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +15 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +15 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +17 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +18 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket23.txt +18 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket23.txt +19 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +20 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +24 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +24 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +26 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +26 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +27 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +28 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +30 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +33 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +34 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +35 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +35 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +35 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +37 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +37 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +41 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket21.txt +42 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +42 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket22.txt +43 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket23.txt +44 pfile:/Users/heyongqiang/Documents/workspace/Hive-3/build/ql/test/data/warehouse/srcbucket_part/ds=2008-04-08/srcbucket20.txt +PREHOOK: query: select key, INPUT__FILE__NAME, ds from combine_split_bucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME, ds limit 40 +PREHOOK: type: QUERY +PREHOOK: Output: file:/var/folders/6g/6grtCwPMEf4sqHUPpy6xQG9ByHg/-Tmp-/heyongqiang/hive_2011-04-04_18-10-16_535_3131007036618332712/-mr-10000 +POSTHOOK: query: select key, INPUT__FILE__NAME, ds from combine_split_bucket_part where ds='2008-04-08' order by key, INPUT__FILE__NAME, ds limit 40 +POSTHOOK: type: QUERY +POSTHOOK: Output: file:/var/folders/6g/6grtCwPMEf4sqHUPpy6xQG9ByHg/-Tmp-/heyongqiang/hive_2011-04-04_18-10-16_535_3131007036618332712/-mr-10000 +POSTHOOK: Lineage: combine_split_bucket.key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket.value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=01).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).key SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: combine_split_bucket_part PARTITION(ds=02).value SIMPLE [(srcbucket_part)srcbucket_part.FieldSchema(name:value, type:string, comment:null), ]