diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 4ebd096d0d..cf97fed2a9 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -880,6 +880,96 @@ public void mmTable() throws Exception { } @Test + public void mmTableOriginalsOrc() throws Exception { + mmTableOriginals("ORC"); + } + + @Test + public void mmTableOriginalsText() throws Exception { + mmTableOriginals("TEXTFILE"); + } + + private void mmTableOriginals(String format) throws Exception { + // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat. + boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format); + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + verifyFooBarResult(tblName, 3); + + FileSystem fs = FileSystem.get(conf); + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + + verifyFooBarResult(tblName, 3); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 3); + verifyHasBase(table.getSd(), fs, "base_0000001"); + + // Try with an extra delta. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + // Neither select nor compaction (which is a select) wil work after this. + if (isBrokenUntilMapreduce7086) return; + + verifyFooBarResult(tblName, 9); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 9); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + // Try with an extra base. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 6); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 6); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + msClient.close(); + } + + + @Test public void mmTableBucketed() throws Exception { String dbName = "default"; String tblName = "mm_nonpart"; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 3141a7e981..d2fac51e54 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4385,46 +4385,6 @@ private void checkMmLb(Partition part) throws HiveException { + " to MM is not supported. Please re-create a table in the desired format."); } - private List> generateAddMmTasks(Table tbl, Long writeId) throws HiveException { - // We will move all the files in the table/partition directories into the first MM - // directory, then commit the first write ID. - List srcs = new ArrayList<>(), tgts = new ArrayList<>(); - if (writeId == null) { - throw new HiveException("Internal error - write ID not set for MM conversion"); - } - - int stmtId = 0; - String mmDir = AcidUtils.deltaSubdir(writeId, writeId, stmtId); - - Hive db = getHive(); - if (tbl.getPartitionKeys().size() > 0) { - PartitionIterable parts = new PartitionIterable(db, tbl, null, - HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); - Iterator partIter = parts.iterator(); - while (partIter.hasNext()) { - Partition part = partIter.next(); - checkMmLb(part); - Path src = part.getDataLocation(), tgt = new Path(src, mmDir); - srcs.add(src); - tgts.add(tgt); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Will move " + src + " to " + tgt); - } - } - } else { - checkMmLb(tbl); - Path src = tbl.getDataLocation(), tgt = new Path(src, mmDir); - srcs.add(src); - tgts.add(tgt); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Will move " + src + " to " + tgt); - } - } - // Don't set inputs and outputs - the locks have already been taken so it's pointless. - MoveWork mw = new MoveWork(null, null, null, null, false); - mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); - return Lists.>newArrayList(TaskFactory.get(mw)); - } private List> alterTableAddProps(AlterTableDesc alterTbl, Table tbl, Partition part, EnvironmentContext environmentContext) throws HiveException { @@ -4440,7 +4400,18 @@ private void checkMmLb(Partition part) throws HiveException { Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { - result = generateAddMmTasks(tbl, alterTbl.getWriteId()); + if (tbl.getPartitionKeys().size() > 0) { + Hive db = getHive(); + PartitionIterable parts = new PartitionIterable(db, tbl, null, + HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + Iterator partIter = parts.iterator(); + while (partIter.hasNext()) { + Partition part0 = partIter.next(); + checkMmLb(part0); + } + } else { + checkMmLb(tbl); + } } else if (isFromMmTable && !isToMmTable) { throw new HiveException("Cannot convert an ACID table to non-ACID"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 969c591917..ea68bc558f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -379,34 +379,61 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException Class formatter = currDesc.getInputFileFormatClass(); Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); - String inputs = processCurrPathForMmWriteIds(inputFormat); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs); + List dirs = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals); + if (dirs.isEmpty() && dirsWithOriginals.isEmpty()) return null; + + List inputSplits = new ArrayList<>(); + if (!dirs.isEmpty()) { + String inputs = makeInputString(dirs); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs); + } + + job.set("mapred.input.dir", inputs); + + generateWrappedSplits(inputFormat, inputSplits, job); } - if (inputs == null) return null; - job.set("mapred.input.dir", inputs); - InputSplit[] splits = inputFormat.getSplits(job, 1); - FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { - inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); + if (!dirsWithOriginals.isEmpty()) { + String inputs = makeInputString(dirsWithOriginals); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to " + inputs); + } + JobConf jobNoRec = new JobConf(job); + jobNoRec.set("mapred.input.dir", inputs); + jobNoRec.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); + jobNoRec.setBoolean(" ", false); + + generateWrappedSplits(inputFormat, inputSplits, jobNoRec); } + if (work.getSplitSample() != null) { inputSplits = splitSampling(work.getSplitSample(), inputSplits); } - if (inputSplits.length > 0) { - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { - Arrays.sort(inputSplits, new FetchInputFormatSplitComparator()); - } - return inputSplits; + + if (inputSplits.isEmpty()) return null; + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { + Collections.sort(inputSplits, new FetchInputFormatSplitComparator()); } + return inputSplits.toArray(new FetchInputFormatSplit[inputSplits.size()]); } + return null; } - private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOException { + private void generateWrappedSplits(InputFormat inputFormat, + List inputSplits, JobConf job) throws IOException { + InputSplit[] splits = inputFormat.getSplits(job, 1); + for (int i = 0; i < splits.length; i++) { + inputSplits.add(new FetchInputFormatSplit(splits[i], inputFormat)); + } + } + + private void processCurrPathForMmWriteIds(InputFormat inputFormat, + List dirs, List dirsWithOriginals) throws IOException { if (inputFormat instanceof HiveInputFormat) { - return StringUtils.escapeString(currPath.toString()); // No need to process here. + dirs.add(currPath); // No need to process here. } ValidWriteIdList validWriteIdList; if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { @@ -418,17 +445,19 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + " for MM paths"); } - Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validWriteIdList); - if (dirs == null || dirs.length == 0) { - return null; // No valid inputs. This condition is logged inside the call. - } - StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs[0].toString())); - for(int i = 1; i < dirs.length;i++) { - str.append(",").append(StringUtils.escapeString(dirs[i].toString())); + HiveInputFormat.processPathsForMmRead( + Lists.newArrayList(currPath), job, validWriteIdList, dirs, dirsWithOriginals); + } + + private String makeInputString(List dirs) { + if (dirs == null || dirs.isEmpty()) return ""; + StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs.get(0).toString())); + for(int i = 1; i < dirs.size(); i++) { + str.append(",").append(StringUtils.escapeString(dirs.get(i).toString())); } return str.toString(); - } + } private ValidWriteIdList extractValidWriteIdList() { if (currDesc.getTableName() == null || !org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) { String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY); @@ -438,18 +467,18 @@ private ValidWriteIdList extractValidWriteIdList() { return null; // not fetching from a table directly but from a temp location } - private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, - FetchInputFormatSplit[] splits) { + private List splitSampling(SplitSample splitSample, + List splits) { long totalSize = 0; for (FetchInputFormatSplit split: splits) { totalSize += split.getLength(); } - List result = new ArrayList(splits.length); + List result = new ArrayList(splits.size()); long targetSize = splitSample.getTargetSize(totalSize); - int startIndex = splitSample.getSeedNum() % splits.length; + int startIndex = splitSample.getSeedNum() % splits.size(); long size = 0; - for (int i = 0; i < splits.length; i++) { - FetchInputFormatSplit split = splits[(startIndex + i) % splits.length]; + for (int i = 0; i < splits.size(); i++) { + FetchInputFormatSplit split = splits.get((startIndex + i) % splits.size()); result.add(split); long splitgLength = split.getLength(); if (size + splitgLength >= targetSize) { @@ -460,7 +489,7 @@ private ValidWriteIdList extractValidWriteIdList() { } size += splitgLength; } - return result.toArray(new FetchInputFormatSplit[result.size()]); + return result; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 445e126386..074c195716 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -148,6 +149,7 @@ public boolean accept(Path path) { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 * (Unless via Load Data statement) @@ -379,6 +381,57 @@ else if (filename.startsWith(BUCKET_PREFIX)) { } return result; } + + public static final class DirectoryImpl implements Directory { + private final List abortedDirectories; + private final boolean isBaseInRawFormat; + private final List original; + private final List obsolete; + private final List deltas; + private final Path base; + + public DirectoryImpl(List abortedDirectories, + boolean isBaseInRawFormat, List original, + List obsolete, List deltas, Path base) { + this.abortedDirectories = abortedDirectories; + this.isBaseInRawFormat = isBaseInRawFormat; + this.original = original; + this.obsolete = obsolete; + this.deltas = deltas; + this.base = base; + } + + @Override + public Path getBaseDirectory() { + return base; + } + + @Override + public boolean isBaseInRawFormat() { + return isBaseInRawFormat; + } + + @Override + public List getOriginalFiles() { + return original; + } + + @Override + public List getCurrentDirectories() { + return deltas; + } + + @Override + public List getObsolete() { + return obsolete; + } + + @Override + public List getAbortedDirectories() { + return abortedDirectories; + } + } + //This is used for (full) Acid tables. InsertOnly use NOT_ACID public enum Operation implements Serializable { NOT_ACID, INSERT, UPDATE, DELETE; @@ -973,7 +1026,7 @@ public static Directory getAcidState(Path directory, // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles); + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); } } @@ -1045,7 +1098,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * If this sort order is changed and there are tables that have been converted to transactional * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it * may result in data loss since it may change how - * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { @@ -1055,37 +1108,8 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); - return new Directory() { - - @Override - public Path getBaseDirectory() { - return base; - } - @Override - public boolean isBaseInRawFormat() { - return isBaseInRawFormat; - } - - @Override - public List getOriginalFiles() { - return original; - } - - @Override - public List getCurrentDirectories() { - return deltas; - } - - @Override - public List getObsolete() { - return obsolete; - } - - @Override - public List getAbortedDirectories() { - return abortedDirectories; - } - }; + return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, + obsolete, deltas, base); } /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) @@ -1197,8 +1221,9 @@ public Long getFileId() { * @param original the list of original files * @throws IOException */ - private static void findOriginals(FileSystem fs, FileStatus stat, - List original, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { + public static void findOriginals(FileSystem fs, FileStatus stat, + List original, Ref useFileIds, + boolean ignoreEmptyFiles, boolean recursive) throws IOException { assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; @@ -1217,8 +1242,10 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - if (child.getFileStatus().isDir()) { - findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles); + if (child.getFileStatus().isDirectory()) { + if (recursive) { + findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { original.add(child); @@ -1229,7 +1256,9 @@ private static void findOriginals(FileSystem fs, FileStatus stat, List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { - findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles); + if (recursive) { + findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getLen() > 0) { original.add(createOriginalObj(null, child)); @@ -1239,6 +1268,7 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } } + public static boolean isTablePropertyTransactional(Properties props) { String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); if (resultStr == null) { @@ -1806,7 +1836,7 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) return null; } Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); - // Assume that for an MM table, or if there's only the base directory, we are good. + // Assume that for an MM table, or if there's only the base directory, we are good. if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { Utilities.FILE_OP_LOGGER.warn( "Computing stats for an ACID table; stats may be inaccurate"); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 611a4c346b..99ee78aebf 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -29,35 +29,30 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map.Entry; - -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hive.common.util.Ref; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -65,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; @@ -82,7 +78,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -514,13 +513,13 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job pushFilters(conf, tableScan, this.mrwork); } - Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList); - if (finalDirs == null) { + List dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>(); + processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals); + if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) { return; // No valid inputs. } - FileInputFormat.setInputPaths(conf, finalDirs); - conf.setInputFormat(inputFormat.getClass()); + conf.setInputFormat(inputFormat.getClass()); int headerCount = 0; int footerCount = 0; if (table != null) { @@ -532,84 +531,113 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job } } - InputSplit[] iss = inputFormat.getSplits(conf, splits); - for (InputSplit is : iss) { - result.add(new HiveInputSplit(is, inputFormatClass.getName())); + if (!finalDirs.isEmpty()) { + FileInputFormat.setInputPaths(conf, finalDirs.toArray(new Path[finalDirs.size()])); + InputSplit[] iss = inputFormat.getSplits(conf, splits); + for (InputSplit is : iss) { + result.add(new HiveInputSplit(is, inputFormatClass.getName())); + } } - if (iss.length == 0 && finalDirs.length > 0 && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { + + if (!dirsWithFileOriginals.isEmpty()) { + // We are going to add splits for these directories with recursive = false, so we do + // any subdirectories (deltas or original directories) and only read the original fi + // The fact that there's a loop calling addSplitsForGroup already implies it's ok to + // the real input format multiple times... however some split concurrency/etc config + // effectively be ignored for such splits, as they are applied separately in each ca + JobConf nonRecConf = new JobConf(conf); + FileInputFormat.setInputPaths(nonRecConf, + dirsWithFileOriginals.toArray(new Path[dirsWithFileOriginals.size()])); + nonRecConf.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); + nonRecConf.setBoolean("mapred.input.dir.recursive", false); + InputSplit[] iss = inputFormat.getSplits(nonRecConf, splits); + for (InputSplit is : iss) { + result.add(new HiveInputSplit(is, inputFormatClass.getName())); + } + } + + if (result.isEmpty() && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { // If there are no inputs; the Execution engine skips the operator tree. // To prevent it from happening; an opaque ZeroRows input is added here - when needed. - result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), - ZeroRowsInputFormat.class.getName())); + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit( + finalDirs.get(0).toString()), ZeroRowsInputFormat.class.getName())); } } - public static Path[] processPathsForMmRead(List dirs, JobConf conf, - ValidWriteIdList validWriteIdList) throws IOException { + + public static void processPathsForMmRead(List dirs, JobConf conf, + ValidWriteIdList validWriteIdList, List finalPaths, + List pathsWithFileOriginals) throws IOException { if (validWriteIdList == null) { - return dirs.toArray(new Path[dirs.size()]); - } else { - List finalPaths = new ArrayList<>(dirs.size()); - for (Path dir : dirs) { - processForWriteIds(dir, conf, validWriteIdList, finalPaths); - } - if (finalPaths.isEmpty()) { - LOG.warn("No valid inputs found in " + dirs); - return null; - } - return finalPaths.toArray(new Path[finalPaths.size()]); + finalPaths.addAll(dirs); + return; + } + for (Path dir : dirs) { + processForWriteIds(dir, conf, validWriteIdList, finalPaths, pathsWithFileOriginals); + } + if (finalPaths.isEmpty() && pathsWithFileOriginals.isEmpty()) { + LOG.warn("No valid inputs found in " + dirs); } } - private static void processForWriteIds(Path dir, JobConf conf, - ValidWriteIdList validWriteIdList, List finalPaths) throws IOException { + private static void processForWriteIds(Path dir, JobConf conf, ValidWriteIdList validWriteIdList, + List finalPaths, List pathsWithFileOriginals) throws IOException { FileSystem fs = dir.getFileSystem(conf); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs"); - } + Utilities.FILE_OP_LOGGER.trace("Checking {} for inputs", dir); + // Ignore nullscan-optimized paths. if (fs instanceof NullScanFileSystem) { finalPaths.add(dir); return; } - // Tez require the use of recursive input dirs for union processing, so we have to look into the - // directory to find out - LinkedList subdirs = new LinkedList<>(); - subdirs.add(dir); // add itself as a starting point - while (!subdirs.isEmpty()) { - Path currDir = subdirs.poll(); - FileStatus[] files = fs.listStatus(currDir); - boolean hadAcidState = false; // whether getAcidState has been called for currDir - for (FileStatus file : files) { - Path path = file.getPath(); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + path + " for inputs"); - } - if (!file.isDirectory()) { - Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (AcidUtils.extractWriteId(path) == null) { - subdirs.add(path); - } else if (!hadAcidState) { - AcidUtils.Directory dirInfo - = AcidUtils.getAcidState(currDir, conf, validWriteIdList, Ref.from(false), true, null); - hadAcidState = true; - - // Find the base, created for IOW. - Path base = dirInfo.getBaseDirectory(); - if (base != null) { - finalPaths.add(base); - } + // TODO: as is, this breaks list bucketing for MM because we no longer look at nested dirs. + // Trying to tell apart what is what w/MM dirs inside nested dirs is a mess or impossible. + // We will fix it later (or earlier) by nesting LB dirs inside MM dirs. + // We need to iterate to detect original directories, that are supported in MM but not ACID. + boolean hasOriginalFiles = false, hasAcidDirs = false; + List originalDirectories = new ArrayList<>(); + for (FileStatus file : fs.listStatus(dir, AcidUtils.hiddenFileFilter)) { + Path currDir = file.getPath(); + Utilities.FILE_OP_LOGGER.trace("Checking {} for being an input", currDir); + if (!file.isDirectory()) { + hasOriginalFiles = true; + } else if (AcidUtils.extractWriteId(currDir) == null) { + originalDirectories.add(currDir); // Add as is; it would become a recursive split. + } else { + hasAcidDirs = true; + } + } + if (hasAcidDirs) { + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, conf, validWriteIdList, Ref.from(false), true, null); + + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", base); + finalPaths.add(base); + // Base means originals no longer matter. + originalDirectories.clear(); + hasOriginalFiles = false; + } - // Find the parsed delta files. - for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { - Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); - finalPaths.add(delta.getPath()); - } - } + // Find the parsed delta files. + for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", delta.getPath()); + finalPaths.add(delta.getPath()); } } + if (!originalDirectories.isEmpty()) { + Utilities.FILE_OP_LOGGER.debug("Adding original directories {}", originalDirectories); + finalPaths.addAll(originalDirectories); + } + if (hasOriginalFiles) { + Utilities.FILE_OP_LOGGER.debug("Directory has original files {}", dir); + pathsWithFileOriginals.add(dir); + } } + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; @@ -721,7 +749,7 @@ private static void processForWriteIds(Path dir, JobConf conf, pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer); } - if (dirs.length != 0) { + if (dirs.length != 0) { // TODO: should this be currentDirs? if (LOG.isInfoEnabled()) { LOG.info("Generating splits for dirs: {}", dirs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 019682fb10..696e6f37e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -106,6 +106,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -117,6 +118,7 @@ import org.apache.orc.ColumnStatistics; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.Footer; +import org.apache.orc.OrcProto.Type; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; @@ -342,7 +344,7 @@ public static boolean isOriginal(Footer footer) { return false; } - + public static boolean[] genIncludedColumns(TypeDescription readerSchema, List included) { return genIncludedColumns(readerSchema, included, null); @@ -699,15 +701,15 @@ public boolean validateInput(FileSystem fs, HiveConf conf, // & therefore we should be able to retrieve them here and determine appropriate behavior. // Note that this will be meaningless for non-acid tables & will be set to null. //this is set by Utilities.copyTablePropertiesToConf() - boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); - String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - this.acidOperationalProperties = isTableTransactional ? - AcidOperationalProperties.parseString(transactionalProperties) : null; + boolean isTxnTable = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + String txnProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + this.acidOperationalProperties = isTxnTable + ? AcidOperationalProperties.parseString(txnProperties) : null; String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString() - + " isTransactionalTable: " + isTableTransactional); + + " isTransactionalTable: " + isTxnTable); } @VisibleForTesting @@ -1174,13 +1176,30 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { + if (context.acidOperationalProperties != null + && context.acidOperationalProperties.isInsertOnly()) { + // See the class comment - HIF handles MM for all input formats, so if we try to handle it + // again, in particular for the non-recursive originals-only getSplits call, we will just + // get confused. This bypass was not necessary when MM tables didn't support originals. Now + // that they do, we use this path for anything MM table related, although everything except + // the originals could still be handled by AcidUtils like a regular non-txn table. + boolean isRecursive = context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); + List originals = new ArrayList<>(); + List baseFiles = new ArrayList<>(); + AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, useFileIds, true, isRecursive); + for (HdfsFileStatusWithId fileId : originals) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + } + return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, + Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); + } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, - context.writeIdList, useFileIds, true, null); + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, context.conf, context.writeIdList, useFileIds, true, null); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { - //for non-acid tables, all data files are in getOriginalFiles() list + // For non-acid tables (or paths), all data files are in getOriginalFiles() list for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } @@ -1195,7 +1214,6 @@ private AcidDirInfo callInternal() throws IOException { // Find the parsed deltas- some of them containing only the insert delta events // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) List parsedDeltas = new ArrayList<>(); - if (context.acidOperationalProperties != null && context.acidOperationalProperties.isSplitUpdate()) { // If we have split-update turned on for this table, then the delta events have already been @@ -1256,7 +1274,8 @@ private AcidDirInfo callInternal() throws IOException { We already handled all delete deltas above and there should not be any other deltas for any table type. (this was acid 1.0 code path). */ - assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir; + assert dirInfo.getCurrentDirectories().isEmpty() : + "Non empty curDir list?!: " + dirInfo.getCurrentDirectories(); // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b698c84080..982b180761 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -337,9 +337,11 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, } int deltaCount = dir.getCurrentDirectories().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " + + origCount + " originals"); return; } try { @@ -355,7 +357,8 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, // Note: we could skip creating the table and just add table type stuff directly to the // "insert overwrite directory" command if there were no bucketing or list bucketing. - String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName; + String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_"; + String tmpTableName = null; while (true) { tmpTableName = tmpPrefix + System.currentTimeMillis(); String query = buildMmCompactionCtQuery(tmpTableName, t, diff --git ql/src/test/queries/clientpositive/mm_conversions.q ql/src/test/queries/clientpositive/mm_conversions.q index 55565a9428..c2b37d470e 100644 --- ql/src/test/queries/clientpositive/mm_conversions.q +++ ql/src/test/queries/clientpositive/mm_conversions.q @@ -25,9 +25,26 @@ select * from simple_to_mm s2 order by key; insert into table simple_to_mm select key from intermediate; insert into table simple_to_mm select key from intermediate; select * from simple_to_mm s3 order by key; +insert overwrite table simple_to_mm select key from intermediate; +select * from simple_to_mm s4 order by key; drop table simple_to_mm; +drop table simple_to_mm_text; +create table simple_to_mm_text(key int) stored as textfile; +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t1 order by key; +alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from simple_to_mm_text t2 order by key; +insert into table simple_to_mm_text select key from intermediate; +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t3 order by key; +insert overwrite table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t4 order by key; +drop table simple_to_mm_text; + + + drop table part_to_mm; create table part_to_mm(key int) partitioned by (key_mm int) stored as orc; insert into table part_to_mm partition(key_mm='455') select key from intermediate;