commit 7f1df396de87f1448c173921151d0ca316ebd3c3 Author: Owen O'Malley Date: Tue Aug 19 15:35:40 2014 -0700 HIVE-7812. Disable CombineHiveInputFormat for ACID format directories. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index b1c4441..a9bb143 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -298,6 +298,28 @@ static ParsedDelta parseDelta(FileStatus path) { } /** + * Is the given directory in ACID format? + * @param directory the partition directory to check + * @param conf the query configuration + * @return true, if it is an ACID directory + * @throws IOException + */ + public static boolean isAcid(Path directory, + Configuration conf) throws IOException { + FileSystem fs = directory.getFileSystem(conf); + for(FileStatus file: fs.listStatus(directory)) { + String filename = file.getPath().getName(); + if (filename.startsWith(BASE_PREFIX) || + filename.startsWith(DELTA_PREFIX)) { + if (file.isDirectory()) { + return true; + } + } + } + return false; + } + + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index d5e250f..bd5734a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -122,24 +122,7 @@ public boolean accept(Path p) { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { init(job); - Path[] dirs = FileInputFormat.getInputPaths(job); - if (dirs.length == 0) { - // on tez we're avoiding to duplicate the file info in FileInputFormat. - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - try { - List paths = Utilities.getInputPathsTez(job, mrwork); - dirs = paths.toArray(new Path[paths.size()]); - if (dirs.length == 0) { - // if we still don't have any files it's time to fail. - throw new IOException("No input paths specified in job"); - } - } catch (Exception e) { - throw new IOException("Could not create input paths", e); - } - } else { - throw new IOException("No input paths specified in job"); - } - } + Path[] dirs = getInputPaths(job); JobConf newjob = new JobConf(job); ArrayList result = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index bf44548..77b7772 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -264,8 +265,8 @@ public int hashCode() { /** * Create Hive splits based on CombineFileSplit. */ - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + private InputSplit[] getCombineSplits(JobConf job, + int numSplits) throws IOException { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); init(job); @@ -274,17 +275,6 @@ public int hashCode() { mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() .getCombineFileInputFormat(); - - // on tez we're avoiding duplicating path info since the info will go over - // rpc - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - try { - List dirs = Utilities.getInputPathsTez(job, mrwork); - Utilities.setInputPaths(job, dirs); - } catch (Exception e) { - throw new IOException("Could not create input paths", e); - } - } InputSplit[] splits = null; if (combine == null) { @@ -327,13 +317,6 @@ public int hashCode() { // ignore } FileSystem inpFs = path.getFileSystem(job); - if (inputFormatClass.isAssignableFrom(OrcInputFormat.class)) { - if (inpFs.exists(new Path(path, OrcRecordUpdater.ACID_FORMAT))) { - throw new IOException("CombineHiveInputFormat is incompatible " + - " with ACID tables. Please set hive.input.format=" + - "org.apache.hadoop.hive.ql.io.HiveInputFormat"); - } - } // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same @@ -461,6 +444,82 @@ public int hashCode() { return result.toArray(new CombineHiveInputSplit[result.size()]); } + /** + * Create Hive splits based on CombineFileSplit. + */ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + init(job); + Map> pathToAliases = mrwork.getPathToAliases(); + Map> aliasToWork = + mrwork.getAliasToWork(); + + ArrayList result = new ArrayList(); + + Path[] paths = getInputPaths(job); + + List nonCombinablePaths = new ArrayList(paths.length / 2); + List combinablePaths = new ArrayList(paths.length / 2); + + for (Path path : paths) { + + PartitionDesc part = + HiveFileFormatUtils.getPartitionDescFromPathRecursively( + pathToPartitionInfo, path, + IOPrepareCache.get().allocatePartitionDescMap()); + + // Use HiveInputFormat if any of the paths is not splittable + Class inputFormatClass = part.getInputFileFormatClass(); + String inputFormatClassName = inputFormatClass.getName(); + InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + if (inputFormat instanceof AvoidSplitCombination && + ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) { + if (LOG.isDebugEnabled()) { + LOG.debug("The split [" + path + + "] is being parked for HiveInputFormat.getSplits"); + } + nonCombinablePaths.add(path); + } else { + combinablePaths.add(path); + } + } + + // Store the previous value for the path specification + String oldPaths = job.get(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname); + if (LOG.isDebugEnabled()) { + LOG.debug("The received input paths are: [" + oldPaths + + "] against the property " + + HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname); + } + + // Process the normal splits + if (nonCombinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray + (new Path[nonCombinablePaths.size()])); + InputSplit[] splits = super.getSplits(job, numSplits); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Process the combine splits + if (combinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, combinablePaths.toArray + (new Path[combinablePaths.size()])); + InputSplit[] splits = getCombineSplits(job, numSplits); + for (InputSplit split : splits) { + result.add(split); + } + } + + // Restore the old path information back + // This is just to prevent incompatibilities with previous versions Hive + // if some application depends on the original value being set. + job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths); + LOG.info("Number of all splits " + result.size()); + return result.toArray(new InputSplit[result.size()]); + } + private void processPaths(JobConf job, CombineFileInputFormatShim combine, List iss, Path... path) throws IOException { JobConf currJob = new JobConf(job); @@ -635,4 +694,12 @@ public String toString() { return s.toString(); } } + + /** + * This is a marker interface that is used to identify the formats where + * combine split generation is not applicable + */ + public interface AvoidSplitCombination { + boolean shouldSkipCombine(Path path, Configuration conf) throws IOException; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 5c4459b..8f4aeda 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -295,11 +295,7 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job } } - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - PerfLogger perfLogger = PerfLogger.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); - init(job); - + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs = FileInputFormat.getInputPaths(job); if (dirs.length == 0) { // on tez we're avoiding to duplicate the file info in FileInputFormat. @@ -314,6 +310,14 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job throw new IOException("No input paths specified in job"); } } + return dirs; + } + + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + PerfLogger perfLogger = PerfLogger.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); + init(job); + Path[] dirs = getInputPaths(job); JobConf newjob = new JobConf(job); List result = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 913d3ac..8864032 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; @@ -99,7 +100,8 @@ */ public class OrcInputFormat implements InputFormat, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat { + AcidInputFormat, + CombineHiveInputFormat.AvoidSplitCombination { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); @@ -125,6 +127,12 @@ */ private static final double MIN_INCLUDED_LOCATION = 0.80; + @Override + public boolean shouldSkipCombine(Path path, + Configuration conf) throws IOException { + return AcidUtils.isAcid(path, conf); + } + private static class OrcRecordReader implements org.apache.hadoop.mapred.RecordReader, StatsProvidingRecordReader { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index a3ac1f7..5fedb62 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -118,7 +118,6 @@ TimeZone gmt = TimeZone.getTimeZone("GMT+0"); DATE_FORMAT.setTimeZone(gmt); TIME_FORMAT.setTimeZone(gmt); - TimeZone local = TimeZone.getDefault(); } public static class BigRow implements Writable { @@ -560,6 +559,12 @@ public MockOutputStream(MockFile file) throws IOException { this.file = file; } + /** + * Set the blocks and their location for the file. + * Must be called after the stream is closed or the block length will be + * wrong. + * @param blocks the list of blocks + */ public void setBlocks(MockBlock... blocks) { file.blocks = blocks; int offset = 0; @@ -580,12 +585,18 @@ public void close() throws IOException { file.content = new byte[file.length]; System.arraycopy(buf.getData(), 0, file.content, 0, file.length); } + + @Override + public String toString() { + return "Out stream to " + file.toString(); + } } public static class MockFileSystem extends FileSystem { final List files = new ArrayList(); Path workingDir = new Path("/"); + @SuppressWarnings("unused") public MockFileSystem() { // empty } @@ -620,7 +631,7 @@ public FSDataInputStream open(Path path, int i) throws IOException { return new FSDataInputStream(new MockInputStream(file)); } } - return null; + throw new IOException("File not found: " + path); } @Override @@ -743,8 +754,12 @@ public FileStatus getFileStatus(Path path) throws IOException { for(MockBlock block: file.blocks) { if (OrcInputFormat.SplitGenerator.getOverlap(block.offset, block.length, start, len) > 0) { + String[] topology = new String[block.hosts.length]; + for(int i=0; i < topology.length; ++i) { + topology[i] = "/rack/ " + block.hosts[i]; + } result.add(new BlockLocation(block.hosts, block.hosts, - block.offset, block.length)); + topology, block.offset, block.length)); } } return result.toArray(new BlockLocation[result.size()]); @@ -1209,7 +1224,8 @@ JobConf createMockExecutionEnvironment(Path workDir, Path warehouseDir, String tableName, ObjectInspector objectInspector, - boolean isVectorized + boolean isVectorized, + int partitions ) throws IOException { Utilities.clearWorkMap(); JobConf conf = new JobConf(); @@ -1218,9 +1234,20 @@ JobConf createMockExecutionEnvironment(Path workDir, conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized)); conf.set("fs.mock.impl", MockFileSystem.class.getName()); conf.set("mapred.mapper.class", ExecMapper.class.getName()); - Path root = new Path(warehouseDir, tableName + "/p=0"); + Path root = new Path(warehouseDir, tableName); + // clean out previous contents ((MockFileSystem) root.getFileSystem(conf)).clear(); - conf.set("mapred.input.dir", root.toString()); + // build partition strings + String[] partPath = new String[partitions]; + StringBuilder buffer = new StringBuilder(); + for(int p=0; p < partitions; ++p) { + partPath[p] = new Path(root, "p=" + p).toString(); + if (p != 0) { + buffer.append(','); + } + buffer.append(partPath[p]); + } + conf.set("mapred.input.dir", buffer.toString()); StringBuilder columnIds = new StringBuilder(); StringBuilder columnNames = new StringBuilder(); StringBuilder columnTypes = new StringBuilder(); @@ -1249,9 +1276,6 @@ JobConf createMockExecutionEnvironment(Path workDir, tblProps.put("columns.types", columnTypes.toString()); TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class, tblProps); - LinkedHashMap partSpec = - new LinkedHashMap(); - PartitionDesc part = new PartitionDesc(tbl, partSpec); MapWork mapWork = new MapWork(); mapWork.setVectorMode(isVectorized); @@ -1260,11 +1284,16 @@ JobConf createMockExecutionEnvironment(Path workDir, new LinkedHashMap>(); ArrayList aliases = new ArrayList(); aliases.add(tableName); - aliasMap.put(root.toString(), aliases); - mapWork.setPathToAliases(aliasMap); LinkedHashMap partMap = new LinkedHashMap(); - partMap.put(root.toString(), part); + for(int p=0; p < partitions; ++p) { + aliasMap.put(partPath[p], aliases); + LinkedHashMap partSpec = + new LinkedHashMap(); + PartitionDesc part = new PartitionDesc(tbl, partSpec); + partMap.put(partPath[p], part); + } + mapWork.setPathToAliases(aliasMap); mapWork.setPathToPartitionInfo(partMap); mapWork.setScratchColumnMap(new HashMap>()); mapWork.setScratchColumnVectorTypes(new HashMap inputFormat = new CombineHiveInputFormat(); - try { - InputSplit[] splits = inputFormat.getSplits(conf, 1); - assertTrue("shouldn't reach here", false); - } catch (IOException ioe) { - assertEquals("CombineHiveInputFormat is incompatible" - + " with ACID tables. Please set hive.input.format=org.apache.hadoop" - + ".hive.ql.io.HiveInputFormat", - ioe.getMessage()); - } + InputSplit[] splits = inputFormat.getSplits(conf, 1); + assertEquals(3, splits.length); + HiveInputFormat.HiveInputSplit split = + (HiveInputFormat.HiveInputSplit) splits[0]; + assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + split.inputFormatClassName()); + assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", + split.getPath().toString()); + assertEquals(0, split.getStart()); + assertEquals(580, split.getLength()); + split = (HiveInputFormat.HiveInputSplit) splits[1]; + assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + split.inputFormatClassName()); + assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", + split.getPath().toString()); + assertEquals(0, split.getStart()); + assertEquals(601, split.getLength()); + CombineHiveInputFormat.CombineHiveInputSplit combineSplit = + (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; + assertEquals(BUCKETS, combineSplit.getNumPaths()); + for(int bucket=0; bucket < BUCKETS; ++bucket) { + assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0", + combineSplit.getPath(bucket).toString()); + assertEquals(0, combineSplit.getOffset(bucket)); + assertEquals(227, combineSplit.getLength(bucket)); + } + String[] hosts = combineSplit.getLocations(); + assertEquals(2, hosts.length); } @Test