Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputSplit.java (revision 0) @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.ql.exec.ExecMapper; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; +import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * HiveInputSplit encapsulates an InputSplit with its corresponding + * inputFormatClass. The reason that it derives from FileSplit is to make sure + * "map.input.file" in MapTask. + */ +public class BucketizedHiveInputSplit extends FileSplit implements InputSplit, + Configurable { + + InputSplit[] inputSplits; + String inputFormatClassName; + + public BucketizedHiveInputSplit() { + // This is the only public constructor of FileSplit + super((Path) null, 0, 0, (String[]) null); + } + + public BucketizedHiveInputSplit(InputSplit[] inputSplits, + String inputFormatClassName) { + // This is the only public constructor of FileSplit + super((Path) null, 0, 0, (String[]) null); + + assert (inputSplits != null && inputSplits.length > 0); + this.inputSplits = inputSplits; + this.inputFormatClassName = inputFormatClassName; + } + + public int getNumSplits() { + return inputSplits.length; + } + + public InputSplit getSplit(int idx) { + assert (idx >= 0 && idx < inputSplits.length); + return inputSplits[idx]; + } + + public String inputFormatClassName() { + return inputFormatClassName; + } + + @Override + public Path getPath() { + if (inputSplits != null && inputSplits.length > 0 + && inputSplits[0] instanceof FileSplit) { + return ((FileSplit) inputSplits[0]).getPath(); + } + return new Path(""); + } + + /** The position of the first byte in the file to process. */ + @Override + public long getStart() { + if (inputSplits != null && inputSplits.length > 0 + && inputSplits[0] instanceof FileSplit) { + return ((FileSplit) inputSplits[0]).getStart(); + } + return 0; + } + + @Override + public String toString() { + if (inputSplits != null && inputSplits.length > 0) { + return inputFormatClassName + ":" + inputSplits[0].toString(); + } + return inputFormatClassName + ":null"; + } + + @Override + public long getLength() { + long r = 0; + if (inputSplits != null) { + try { + for (InputSplit inputSplit : inputSplits) { + r += inputSplit.getLength(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return r; + } + + public long getLength(int idx) { + if (inputSplits != null) { + try { + return inputSplits[idx].getLength(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return -1; + } + + @Override + public String[] getLocations() throws IOException { + assert (inputSplits != null && inputSplits.length > 0); + return inputSplits[0].getLocations(); + } + + @Override + public void readFields(DataInput in) throws IOException { + String inputSplitClassName = in.readUTF(); + + int numSplits = in.readInt(); + inputSplits = new InputSplit[numSplits]; + for (int i = 0; i < numSplits; i++) { + try { + inputSplits[i] = (InputSplit) ReflectionUtils.newInstance(conf + .getClassByName(inputSplitClassName), conf); + } catch (Exception e) { + throw new IOException( + "Cannot create an instance of InputSplit class = " + + inputSplitClassName + ":" + e.getMessage()); + } + inputSplits[i].readFields(in); + } + inputFormatClassName = in.readUTF(); + } + + @Override + public void write(DataOutput out) throws IOException { + assert (inputSplits != null && inputSplits.length > 0); + out.writeUTF(inputSplits[0].getClass().getName()); + out.writeInt(inputSplits.length); + for (InputSplit inputSplit : inputSplits) { + inputSplit.write(out); + } + out.writeUTF(inputFormatClassName); + } + + Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (revision 0) @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.ExecMapper; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +/** + * HiveRecordReader is a simple wrapper on RecordReader. It allows us to stop + * reading the data when some global flag ExecMapper.getDone() is set. + */ +public class BucketizedHiveRecordReader + implements RecordReader { + protected final BucketizedHiveInputSplit split; + protected final InputFormat inputFormat; + protected final JobConf jobConf; + protected final Reporter reporter; + protected RecordReader curReader; + protected long progress; + protected int idx; + + public BucketizedHiveRecordReader(InputFormat inputFormat, + BucketizedHiveInputSplit bucketizedSplit, JobConf jobConf, + Reporter reporter) throws IOException { + this.split = bucketizedSplit; + this.inputFormat = inputFormat; + this.jobConf = jobConf; + this.reporter = reporter; + initNextRecordReader(); + } + + public void close() throws IOException { + if (curReader != null) { + curReader.close(); + curReader = null; + } + idx = 0; + } + + public K createKey() { + return (K) curReader.createKey(); + } + + public V createValue() { + return (V) curReader.createValue(); + } + + public long getPos() throws IOException { + return progress; + } + + public float getProgress() throws IOException { + return Math.min(1.0f, progress / (float) (split.getLength())); + } + + public boolean next(K key, V value) throws IOException { + while ((curReader == null) || !curReader.next(key, value)) { + if (!initNextRecordReader()) { + return false; + } + } + return true; + } + + /** + * Get the record reader for the next chunk in this CombineFileSplit. + */ + protected boolean initNextRecordReader() throws IOException { + if (curReader != null) { + curReader.close(); + curReader = null; + if (idx > 0) { + progress += split.getLength(idx - 1); // done processing so far + } + } + + // if all chunks have been processed, nothing more to do. + if (idx == split.getNumSplits()) { + return false; + } + + // get a record reader for the idx-th chunk + try { + curReader = inputFormat.getRecordReader(split.getSplit(idx), jobConf, + reporter); + } catch (Exception e) { + throw new RuntimeException(e); + } + idx++; + return true; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (revision 0) @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.ql.exec.ExecMapper; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim; +import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InvalidInputException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * HiveInputFormat is a parameterized InputFormat which looks at the path name + * and determine the correct InputFormat for that path name from + * mapredPlan.pathToPartitionInfo(). It can be used to read files with different + * input format in the same map-reduce job. + */ +public class BucketizedHiveInputFormat + implements InputFormat, JobConfigurable { + + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat"); + + JobConf job; + + public void configure(JobConf job) { + this.job = job; + } + + /** + * A cache of InputFormat instances. + */ + private static Map> inputFormats; + + static InputFormat getInputFormatFromCache( + Class inputFormatClass, JobConf job) throws IOException { + if (inputFormats == null) { + inputFormats = new HashMap>(); + } + if (!inputFormats.containsKey(inputFormatClass)) { + try { + InputFormat newInstance = (InputFormat) ReflectionUtils + .newInstance(inputFormatClass, job); + inputFormats.put(inputFormatClass, newInstance); + } catch (Exception e) { + throw new IOException("Cannot create an instance of InputFormat class " + + inputFormatClass.getName() + " as specified in mapredWork!"); + } + } + return inputFormats.get(inputFormatClass); + } + + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + BucketizedHiveInputSplit hsplit = (BucketizedHiveInputSplit) split; + + String inputFormatClassName = null; + Class inputFormatClass = null; + try { + inputFormatClassName = hsplit.inputFormatClassName(); + inputFormatClass = job.getClassByName(inputFormatClassName); + } catch (Exception e) { + throw new IOException("cannot find class " + inputFormatClassName); + } + + // clone a jobConf for setting needed columns for reading + JobConf cloneJobConf = new JobConf(job); + initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath() + .toString(), hsplit.getPath().toUri().getPath()); + + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, + cloneJobConf); + return new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf, + reporter); + } + + private Map pathToPartitionInfo; + MapredWork mrwork = null; + + protected void init(JobConf job) { + mrwork = Utilities.getMapRedWork(job); + pathToPartitionInfo = mrwork.getPathToPartitionInfo(); + } + + protected FileStatus[] listStatus(FileSystem fs, FileStatus fileStatus) + throws IOException { + ArrayList result = new ArrayList(); + + if (fileStatus.isDir()) { + for (FileStatus stat : fs.listStatus(fileStatus.getPath())) { + for (FileStatus retStat : listStatus(fs, stat)) { + result.add(retStat); + } + } + } else { + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + + } + + protected FileStatus[] listStatus(JobConf job, Path path) throws IOException { + ArrayList result = new ArrayList(); + List errors = new ArrayList(); + + FileSystem fs = path.getFileSystem(job); + FileStatus[] matches = fs.globStatus(path); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + path)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + path + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + for (FileStatus retStat : listStatus(fs, globStat)) { + result.add(retStat); + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result.toArray(new FileStatus[result.size()]); + + } + + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + init(job); + + Path[] dirs = FileInputFormat.getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + JobConf newjob = new JobConf(job); + ArrayList result = new ArrayList(); + + int numOrigSplits = 0; + // for each dir, get all files under the dir, do getSplits to each + // individual file, + // and then create a BucketizedHiveInputSplit on it + for (Path dir : dirs) { + PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); + // create a new InputFormat instance if this is the first time to see this + // class + Class inputFormatClass = part.getInputFileFormatClass(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + newjob.setInputFormat(inputFormat.getClass()); + + FileStatus[] listStatus = listStatus(newjob, dir); + + for (FileStatus status : listStatus) { + LOG.info("block size: " + status.getBlockSize()); + LOG.info("file length: " + status.getLen()); + FileInputFormat.setInputPaths(newjob, status.getPath()); + InputSplit[] iss = inputFormat.getSplits(newjob, 0); + if (iss != null && iss.length > 0) { + numOrigSplits += iss.length; + result.add(new BucketizedHiveInputSplit(iss, inputFormatClass + .getName())); + } + } + } + LOG.info(result.size() + " bucketized splits generated from " + + numOrigSplits + " original splits."); + return result.toArray(new BucketizedHiveInputSplit[result.size()]); + } + + public void validateInput(JobConf job) throws IOException { + + init(job); + + Path[] dirs = FileInputFormat.getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + JobConf newjob = new JobConf(job); + + // for each dir, get the InputFormat, and do validateInput. + for (Path dir : dirs) { + PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); + // create a new InputFormat instance if this is the first time to see this + // class + InputFormat inputFormat = getInputFormatFromCache(part + .getInputFileFormatClass(), job); + + FileInputFormat.setInputPaths(newjob, dir); + newjob.setInputFormat(inputFormat.getClass()); + ShimLoader.getHadoopShims().inputFormatValidateInput(inputFormat, newjob); + } + } + + protected static PartitionDesc getPartitionDescFromPath( + Map pathToPartitionInfo, Path dir) + throws IOException { + PartitionDesc partDesc = pathToPartitionInfo.get(dir.toString()); + if (partDesc == null) { + partDesc = pathToPartitionInfo.get(dir.toUri().getPath()); + } + if (partDesc == null) { + throw new IOException("cannot find dir = " + dir.toString() + + " in partToPartitionInfo!"); + } + + return partDesc; + } + + protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass, + String splitPath, String splitPathWithNoSchema) { + if (this.mrwork == null) { + init(job); + } + + ArrayList aliases = new ArrayList(); + Iterator>> iterator = this.mrwork + .getPathToAliases().entrySet().iterator(); + + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + String key = entry.getKey(); + if (splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key)) { + ArrayList list = entry.getValue(); + for (String val : list) { + aliases.add(val); + } + } + } + + for (String alias : aliases) { + Operator op = this.mrwork.getAliasToWork().get( + alias); + if (op instanceof TableScanOperator) { + TableScanOperator tableScan = (TableScanOperator) op; + ArrayList list = tableScan.getNeededColumnIDs(); + if (list != null) { + ColumnProjectionUtils.appendReadColumnIDs(jobConf, list); + } else { + ColumnProjectionUtils.setFullyReadColumns(jobConf); + } + } + } + } +} \ No newline at end of file