diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cc490afc77..31b81f46d2 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2388,6 +2388,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_LOCK_QUERY_STRING_MAX_LENGTH("hive.lock.query.string.max.length", 1000000, "The maximum length of the query string to store in the lock.\n" + "The default value is 1000000, since the data limit of a znode is 1MB"), + HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false, + "Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" + + "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" + + "read MM tables with original files will fail. The default in Hive 3.0 is false."), // Zookeeper related configs HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "", diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 7e17d5d888..4c0abe3189 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -148,6 +149,7 @@ public void setup() throws Exception { TxnDbUtil.prepDb(hiveConf); conf = hiveConf; + HiveConf.setBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS, true); msClient = new HiveMetaStoreClient(conf); driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); @@ -978,6 +980,96 @@ public void mmTable() throws Exception { } @Test + public void mmTableOriginalsOrc() throws Exception { + mmTableOriginals("ORC"); + } + + @Test + public void mmTableOriginalsText() throws Exception { + mmTableOriginals("TEXTFILE"); + } + + private void mmTableOriginals(String format) throws Exception { + // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat. + boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format); + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + verifyFooBarResult(tblName, 3); + + FileSystem fs = FileSystem.get(conf); + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + + verifyFooBarResult(tblName, 3); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 3); + verifyHasBase(table.getSd(), fs, "base_0000001"); + + // Try with an extra delta. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + // Neither select nor compaction (which is a select) wil work after this. + if (isBrokenUntilMapreduce7086) return; + + verifyFooBarResult(tblName, 9); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 9); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + // Try with an extra base. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 6); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 6); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + msClient.close(); + } + + + @Test public void mmTableBucketed() throws Exception { String dbName = "default"; String tblName = "mm_nonpart"; @@ -1048,7 +1140,7 @@ public void mmTableOpenWriteId() throws Exception { msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3. runMajorCompaction(dbName, tblName); // Compact 4 and 5. verifyFooBarResult(tblName, 2); - verifyHasBase(table.getSd(), fs, "base_0000005"); + verifyHasBase(table.getSd(), fs, "base_0000005"); runCleaner(conf); verifyDeltaCount(table.getSd(), fs, 0); } @@ -1102,7 +1194,7 @@ public void mmTablePartitioned() throws Exception { p2 = msClient.getPartition(dbName, tblName, "ds=2"), p3 = msClient.getPartition(dbName, tblName, "ds=3"); msClient.close(); - + FileSystem fs = FileSystem.get(conf); verifyDeltaCount(p1.getSd(), fs, 3); verifyDeltaCount(p2.getSd(), fs, 2); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index abde9f786f..3d30a287ff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4473,7 +4473,22 @@ private void checkMmLb(Partition part) throws HiveException { Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { - result = generateAddMmTasks(tbl, alterTbl.getWriteId()); + if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS)) { + result = generateAddMmTasks(tbl, alterTbl.getWriteId()); + } else { + if (tbl.getPartitionKeys().size() > 0) { + Hive db = getHive(); + PartitionIterable parts = new PartitionIterable(db, tbl, null, + HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + Iterator partIter = parts.iterator(); + while (partIter.hasNext()) { + Partition part0 = partIter.next(); + checkMmLb(part0); + } + } else { + checkMmLb(tbl); + } + } } else if (isFromMmTable && !isToMmTable) { throw new HiveException("Cannot convert an ACID table to non-ACID"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 969c591917..5463319900 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -379,34 +379,58 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException Class formatter = currDesc.getInputFileFormatClass(); Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); - String inputs = processCurrPathForMmWriteIds(inputFormat); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs); + List dirs = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals); + if (dirs.isEmpty() && dirsWithOriginals.isEmpty()) return null; + + List inputSplits = new ArrayList<>(); + if (!dirs.isEmpty()) { + String inputs = makeInputString(dirs); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs); + } + + job.set("mapred.input.dir", inputs); + + generateWrappedSplits(inputFormat, inputSplits, job); } - if (inputs == null) return null; - job.set("mapred.input.dir", inputs); - InputSplit[] splits = inputFormat.getSplits(job, 1); - FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { - inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); + if (!dirsWithOriginals.isEmpty()) { + String inputs = makeInputString(dirsWithOriginals); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to " + inputs); + } + JobConf jobNoRec = HiveInputFormat.createConfForMmOriginalsSplit(job, dirsWithOriginals); + jobNoRec.set("mapred.input.dir", inputs); + generateWrappedSplits(inputFormat, inputSplits, jobNoRec); } + if (work.getSplitSample() != null) { inputSplits = splitSampling(work.getSplitSample(), inputSplits); } - if (inputSplits.length > 0) { - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { - Arrays.sort(inputSplits, new FetchInputFormatSplitComparator()); - } - return inputSplits; + + if (inputSplits.isEmpty()) return null; + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { + Collections.sort(inputSplits, new FetchInputFormatSplitComparator()); } + return inputSplits.toArray(new FetchInputFormatSplit[inputSplits.size()]); } + return null; } - private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOException { + private void generateWrappedSplits(InputFormat inputFormat, + List inputSplits, JobConf job) throws IOException { + InputSplit[] splits = inputFormat.getSplits(job, 1); + for (int i = 0; i < splits.length; i++) { + inputSplits.add(new FetchInputFormatSplit(splits[i], inputFormat)); + } + } + + private void processCurrPathForMmWriteIds(InputFormat inputFormat, + List dirs, List dirsWithOriginals) throws IOException { if (inputFormat instanceof HiveInputFormat) { - return StringUtils.escapeString(currPath.toString()); // No need to process here. + dirs.add(currPath); // No need to process here. } ValidWriteIdList validWriteIdList; if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { @@ -418,17 +442,19 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + " for MM paths"); } - Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validWriteIdList); - if (dirs == null || dirs.length == 0) { - return null; // No valid inputs. This condition is logged inside the call. - } - StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs[0].toString())); - for(int i = 1; i < dirs.length;i++) { - str.append(",").append(StringUtils.escapeString(dirs[i].toString())); + HiveInputFormat.processPathsForMmRead( + Lists.newArrayList(currPath), job, validWriteIdList, dirs, dirsWithOriginals); + } + + private String makeInputString(List dirs) { + if (dirs == null || dirs.isEmpty()) return ""; + StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs.get(0).toString())); + for(int i = 1; i < dirs.size(); i++) { + str.append(",").append(StringUtils.escapeString(dirs.get(i).toString())); } return str.toString(); - } + } private ValidWriteIdList extractValidWriteIdList() { if (currDesc.getTableName() == null || !org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) { String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY); @@ -438,18 +464,18 @@ private ValidWriteIdList extractValidWriteIdList() { return null; // not fetching from a table directly but from a temp location } - private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, - FetchInputFormatSplit[] splits) { + private List splitSampling(SplitSample splitSample, + List splits) { long totalSize = 0; for (FetchInputFormatSplit split: splits) { totalSize += split.getLength(); } - List result = new ArrayList(splits.length); + List result = new ArrayList(splits.size()); long targetSize = splitSample.getTargetSize(totalSize); - int startIndex = splitSample.getSeedNum() % splits.length; + int startIndex = splitSample.getSeedNum() % splits.size(); long size = 0; - for (int i = 0; i < splits.length; i++) { - FetchInputFormatSplit split = splits[(startIndex + i) % splits.length]; + for (int i = 0; i < splits.size(); i++) { + FetchInputFormatSplit split = splits.get((startIndex + i) % splits.size()); result.add(split); long splitgLength = split.getLength(); if (size + splitgLength >= targetSize) { @@ -460,7 +486,7 @@ private ValidWriteIdList extractValidWriteIdList() { } size += splitgLength; } - return result.toArray(new FetchInputFormatSplit[result.size()]); + return result; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 183515a0ed..10f7bd2ae0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -149,6 +150,7 @@ public boolean accept(Path path) { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 * (Unless via Load Data statement) @@ -380,6 +382,57 @@ else if (filename.startsWith(BUCKET_PREFIX)) { } return result; } + + public static final class DirectoryImpl implements Directory { + private final List abortedDirectories; + private final boolean isBaseInRawFormat; + private final List original; + private final List obsolete; + private final List deltas; + private final Path base; + + public DirectoryImpl(List abortedDirectories, + boolean isBaseInRawFormat, List original, + List obsolete, List deltas, Path base) { + this.abortedDirectories = abortedDirectories; + this.isBaseInRawFormat = isBaseInRawFormat; + this.original = original; + this.obsolete = obsolete; + this.deltas = deltas; + this.base = base; + } + + @Override + public Path getBaseDirectory() { + return base; + } + + @Override + public boolean isBaseInRawFormat() { + return isBaseInRawFormat; + } + + @Override + public List getOriginalFiles() { + return original; + } + + @Override + public List getCurrentDirectories() { + return deltas; + } + + @Override + public List getObsolete() { + return obsolete; + } + + @Override + public List getAbortedDirectories() { + return abortedDirectories; + } + } + //This is used for (full) Acid tables. InsertOnly use NOT_ACID public enum Operation implements Serializable { NOT_ACID, INSERT, UPDATE, DELETE; @@ -974,7 +1027,7 @@ public static Directory getAcidState(Path directory, // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles); + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); } } @@ -1046,7 +1099,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * If this sort order is changed and there are tables that have been converted to transactional * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it * may result in data loss since it may change how - * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { @@ -1056,37 +1109,8 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); - return new Directory() { - - @Override - public Path getBaseDirectory() { - return base; - } - @Override - public boolean isBaseInRawFormat() { - return isBaseInRawFormat; - } - - @Override - public List getOriginalFiles() { - return original; - } - - @Override - public List getCurrentDirectories() { - return deltas; - } - - @Override - public List getObsolete() { - return obsolete; - } - - @Override - public List getAbortedDirectories() { - return abortedDirectories; - } - }; + return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, + obsolete, deltas, base); } /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) @@ -1198,8 +1222,9 @@ public Long getFileId() { * @param original the list of original files * @throws IOException */ - private static void findOriginals(FileSystem fs, FileStatus stat, - List original, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { + public static void findOriginals(FileSystem fs, FileStatus stat, + List original, Ref useFileIds, + boolean ignoreEmptyFiles, boolean recursive) throws IOException { assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; @@ -1218,8 +1243,10 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - if (child.getFileStatus().isDir()) { - findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles); + if (child.getFileStatus().isDirectory()) { + if (recursive) { + findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { original.add(child); @@ -1230,7 +1257,9 @@ private static void findOriginals(FileSystem fs, FileStatus stat, List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { - findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles); + if (recursive) { + findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getLen() > 0) { original.add(createOriginalObj(null, child)); @@ -1240,6 +1269,7 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } } + public static boolean isTablePropertyTransactional(Properties props) { String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); if (resultStr == null) { @@ -1807,7 +1837,7 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) return null; } Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); - // Assume that for an MM table, or if there's only the base directory, we are good. + // Assume that for an MM table, or if there's only the base directory, we are good. if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { Utilities.FILE_OP_LOGGER.warn( "Computing stats for an ACID table; stats may be inaccurate"); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index 75fa09de8e..5df88bab5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -139,21 +139,39 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null); } // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF. - Path[] finalDirs = (mmIds == null) ? new Path[] { dir } - : processPathsForMmRead(Lists.newArrayList(dir), newjob, mmIds); - if (finalDirs == null) { + List finalDirs = null, dirsWithMmOriginals = null; + if (mmIds == null) { + finalDirs = Lists.newArrayList(dir); + } else { + finalDirs = new ArrayList<>(); + dirsWithMmOriginals = new ArrayList<>(); + processPathsForMmRead( + Lists.newArrayList(dir), newjob, mmIds, finalDirs, dirsWithMmOriginals); + } + if (finalDirs.isEmpty() && (dirsWithMmOriginals == null || dirsWithMmOriginals.isEmpty())) { continue; // No valid inputs - possible in MM case. } for (Path finalDir : finalDirs) { FileStatus[] listStatus = listStatus(newjob, finalDir); - for (FileStatus status : listStatus) { numOrigSplits = addBHISplit( status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); } } + if (dirsWithMmOriginals != null) { + for (Path originalsDir : dirsWithMmOriginals) { + FileSystem fs = originalsDir.getFileSystem(job); + FileStatus[] listStatus = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER); + for (FileStatus status : listStatus) { + if (status.isDirectory()) continue; + numOrigSplits = addBHISplit( + status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); + } + } + } } + LOG.info(result.size() + " bucketized splits generated from " + numOrigSplits + " original splits."); return result.toArray(new BucketizedHiveInputSplit[result.size()]); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 655d10b643..31eb19ed71 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -29,35 +29,30 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map.Entry; - -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hive.common.util.Ref; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -65,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; @@ -82,7 +78,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -505,45 +504,72 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job pushFilters(conf, tableScan, this.mrwork); } - Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList); - if (finalDirs == null) { + List dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>(); + processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals); + if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) { // This is for transactional tables. if (!conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { LOG.warn("No valid inputs found in " + dirs); - return; // No valid inputs. } else if (validMmWriteIdList != null) { // AcidUtils.getAcidState() is already called to verify there is no input split. // Thus for a GroupByOperator summary row, set finalDirs and add a Dummy split here. - finalDirs = dirs.toArray(new Path[dirs.size()]); - result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), - ZeroRowsInputFormat.class.getName())); + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit( + dirs.get(0).toString()), ZeroRowsInputFormat.class.getName())); } - } else { - FileInputFormat.setInputPaths(conf, finalDirs); - conf.setInputFormat(inputFormat.getClass()); - - int headerCount = 0; - int footerCount = 0; - if (table != null) { - headerCount = Utilities.getHeaderCount(table); - footerCount = Utilities.getFooterCount(table, conf); - if (headerCount != 0 || footerCount != 0) { - // Input file has header or footer, cannot be splitted. - HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE); - } + return; // No valid inputs. + } + + conf.setInputFormat(inputFormat.getClass()); + int headerCount = 0; + int footerCount = 0; + if (table != null) { + headerCount = Utilities.getHeaderCount(table); + footerCount = Utilities.getFooterCount(table, conf); + if (headerCount != 0 || footerCount != 0) { + // Input file has header or footer, cannot be splitted. + HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE); } + } + if (!finalDirs.isEmpty()) { + FileInputFormat.setInputPaths(conf, finalDirs.toArray(new Path[finalDirs.size()])); InputSplit[] iss = inputFormat.getSplits(conf, splits); for (InputSplit is : iss) { result.add(new HiveInputSplit(is, inputFormatClass.getName())); } - if (iss.length == 0 && finalDirs.length > 0 && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { - // If there are no inputs; the Execution engine skips the operator tree. - // To prevent it from happening; an opaque ZeroRows input is added here - when needed. - result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), - ZeroRowsInputFormat.class.getName())); + } + + if (!dirsWithFileOriginals.isEmpty()) { + // We are going to add splits for these directories with recursive = false, so we ignore + // any subdirectories (deltas or original directories) and only read the original files. + // The fact that there's a loop calling addSplitsForGroup already implies it's ok to + // the real input format multiple times... however some split concurrency/etc configs + // that are applied separately in each call will effectively be ignored for such splits. + JobConf nonRecConf = createConfForMmOriginalsSplit(conf, dirsWithFileOriginals); + InputSplit[] iss = inputFormat.getSplits(nonRecConf, splits); + for (InputSplit is : iss) { + result.add(new HiveInputSplit(is, inputFormatClass.getName())); } } + + if (result.isEmpty() && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { + // If there are no inputs; the Execution engine skips the operator tree. + // To prevent it from happening; an opaque ZeroRows input is added here - when needed. + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit( + finalDirs.get(0).toString()), ZeroRowsInputFormat.class.getName())); + } + } + + public static JobConf createConfForMmOriginalsSplit( + JobConf conf, List dirsWithFileOriginals) { + JobConf nonRecConf = new JobConf(conf); + FileInputFormat.setInputPaths(nonRecConf, + dirsWithFileOriginals.toArray(new Path[dirsWithFileOriginals.size()])); + nonRecConf.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); + nonRecConf.setBoolean("mapred.input.dir.recursive", false); + // TODO: change to FileInputFormat.... field after MAPREDUCE-7086. + nonRecConf.setBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", true); + return nonRecConf; } protected ValidWriteIdList getMmValidWriteIds( @@ -560,71 +586,84 @@ protected ValidWriteIdList getMmValidWriteIds( return validWriteIdList; } - public static Path[] processPathsForMmRead(List dirs, JobConf conf, - ValidWriteIdList validWriteIdList) throws IOException { - if (validWriteIdList == null) { - return dirs.toArray(new Path[dirs.size()]); - } else { - List finalPaths = new ArrayList<>(dirs.size()); - for (Path dir : dirs) { - processForWriteIds(dir, conf, validWriteIdList, finalPaths); - } - if (finalPaths.isEmpty()) { - return null; - } - return finalPaths.toArray(new Path[finalPaths.size()]); + public static void processPathsForMmRead(List dirs, Configuration conf, + ValidWriteIdList validWriteIdList, List finalPaths, + List pathsWithFileOriginals) throws IOException { + if (validWriteIdList == null) { + finalPaths.addAll(dirs); + return; + } + boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS); + for (Path dir : dirs) { + processForWriteIds( + dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); } } - private static void processForWriteIds(Path dir, JobConf conf, - ValidWriteIdList validWriteIdList, List finalPaths) throws IOException { + private static void processForWriteIds(Path dir, Configuration conf, + ValidWriteIdList validWriteIdList, boolean allowOriginals, List finalPaths, + List pathsWithFileOriginals) throws IOException { FileSystem fs = dir.getFileSystem(conf); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs"); - } + Utilities.FILE_OP_LOGGER.trace("Checking {} for inputs", dir); + // Ignore nullscan-optimized paths. if (fs instanceof NullScanFileSystem) { finalPaths.add(dir); return; } - // Tez require the use of recursive input dirs for union processing, so we have to look into the - // directory to find out - LinkedList subdirs = new LinkedList<>(); - subdirs.add(dir); // add itself as a starting point - while (!subdirs.isEmpty()) { - Path currDir = subdirs.poll(); - FileStatus[] files = fs.listStatus(currDir); - boolean hadAcidState = false; // whether getAcidState has been called for currDir - for (FileStatus file : files) { - Path path = file.getPath(); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + path + " for inputs"); + // We need to iterate to detect original directories, that are supported in MM but not ACID. + boolean hasOriginalFiles = false, hasAcidDirs = false; + List originalDirectories = new ArrayList<>(); + for (FileStatus file : fs.listStatus(dir, AcidUtils.hiddenFileFilter)) { + Path currDir = file.getPath(); + Utilities.FILE_OP_LOGGER.trace("Checking {} for being an input", currDir); + if (!file.isDirectory()) { + hasOriginalFiles = true; + } else if (AcidUtils.extractWriteId(currDir) == null) { + if (allowOriginals) { + originalDirectories.add(currDir); // Add as is; it would become a recursive split. + } else { + Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) directory {}", currDir); } - if (!file.isDirectory()) { - Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (AcidUtils.extractWriteId(path) == null) { - subdirs.add(path); - } else if (!hadAcidState) { - AcidUtils.Directory dirInfo - = AcidUtils.getAcidState(currDir, conf, validWriteIdList, Ref.from(false), true, null); - hadAcidState = true; - - // Find the base, created for IOW. - Path base = dirInfo.getBaseDirectory(); - if (base != null) { - finalPaths.add(base); - } + } else { + hasAcidDirs = true; + } + } + if (hasAcidDirs) { + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, conf, validWriteIdList, Ref.from(false), true, null); - // Find the parsed delta files. - for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { - Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); - finalPaths.add(delta.getPath()); - } - } + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", base); + finalPaths.add(base); + // Base means originals no longer matter. + originalDirectories.clear(); + hasOriginalFiles = false; + } + + // Find the parsed delta files. + for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", delta.getPath()); + finalPaths.add(delta.getPath()); + } + } + if (!originalDirectories.isEmpty()) { + Utilities.FILE_OP_LOGGER.debug("Adding original directories {}", originalDirectories); + finalPaths.addAll(originalDirectories); + } + if (hasOriginalFiles) { + if (allowOriginals) { + Utilities.FILE_OP_LOGGER.debug("Directory has original files {}", dir); + pathsWithFileOriginals.add(dir); + } else { + Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) files in {}", dir); } } } + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; @@ -736,7 +775,7 @@ private static void processForWriteIds(Path dir, JobConf conf, pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer); } - if (dirs.length != 0) { + if (dirs.length != 0) { // TODO: should this be currentDirs? if (LOG.isInfoEnabled()) { LOG.info("Generating splits for dirs: {}", dirs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 2337a350e6..049dbd38e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -106,6 +106,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -117,6 +118,7 @@ import org.apache.orc.ColumnStatistics; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.Footer; +import org.apache.orc.OrcProto.Type; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; @@ -342,7 +344,7 @@ public static boolean isOriginal(Footer footer) { return false; } - + public static boolean[] genIncludedColumns(TypeDescription readerSchema, List included) { return genIncludedColumns(readerSchema, included, null); @@ -701,15 +703,15 @@ public boolean validateInput(FileSystem fs, HiveConf conf, // & therefore we should be able to retrieve them here and determine appropriate behavior. // Note that this will be meaningless for non-acid tables & will be set to null. //this is set by Utilities.copyTablePropertiesToConf() - boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); - String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - this.acidOperationalProperties = isTableTransactional ? - AcidOperationalProperties.parseString(transactionalProperties) : null; + boolean isTxnTable = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + String txnProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + this.acidOperationalProperties = isTxnTable + ? AcidOperationalProperties.parseString(txnProperties) : null; String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString() - + " isTransactionalTable: " + isTableTransactional); + + " isTransactionalTable: " + isTxnTable + " properties: " + txnProperties); } @VisibleForTesting @@ -1144,6 +1146,7 @@ public String toString() { private final Ref useFileIds; private final UserGroupInformation ugi; + @VisibleForTesting FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds, UserGroupInformation ugi) { this(context, fs, dir, Ref.from(useFileIds), ugi); @@ -1176,13 +1179,31 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { + if (context.acidOperationalProperties != null + && context.acidOperationalProperties.isInsertOnly()) { + // See the class comment - HIF handles MM for all input formats, so if we try to handle it + // again, in particular for the non-recursive originals-only getSplits call, we will just + // get confused. This bypass was not necessary when MM tables didn't support originals. Now + // that they do, we use this path for anything MM table related, although everything except + // the originals could still be handled by AcidUtils like a regular non-txn table. + boolean isRecursive = context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, + context.conf.getBoolean("mapred.input.dir.recursive", false)); + List originals = new ArrayList<>(); + List baseFiles = new ArrayList<>(); + AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, useFileIds, true, isRecursive); + for (HdfsFileStatusWithId fileId : originals) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + } + return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, + Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); + } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, - context.writeIdList, useFileIds, true, null); + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, context.conf, context.writeIdList, useFileIds, true, null); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { - //for non-acid tables, all data files are in getOriginalFiles() list + // For non-acid tables (or paths), all data files are in getOriginalFiles() list for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } @@ -1197,7 +1218,6 @@ private AcidDirInfo callInternal() throws IOException { // Find the parsed deltas- some of them containing only the insert delta events // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) List parsedDeltas = new ArrayList<>(); - if (context.acidOperationalProperties != null && context.acidOperationalProperties.isSplitUpdate()) { // If we have split-update turned on for this table, then the delta events have already been @@ -1258,7 +1278,8 @@ private AcidDirInfo callInternal() throws IOException { We already handled all delete deltas above and there should not be any other deltas for any table type. (this was acid 1.0 code path). */ - assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir; + assert dirInfo.getCurrentDirectories().isEmpty() : + "Non empty curDir list?!: " + dirInfo.getCurrentDirectories(); // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index b61a945d94..780723a2a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,6 +17,16 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.security.auth.login.LoginException; + +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,7 +35,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -35,14 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.List; - //TODO: this object is created once to call one method and then immediately destroyed. //So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { @@ -101,37 +103,39 @@ private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, Log } private void copyMmPath() throws LoginException, IOException { - assert dataPathList.size() == 1; ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); - Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0)); - List validPaths = getMmValidPaths(ids, fromPath); - String fromPathStr = fromPath.toString(); - if (!fromPathStr.endsWith(Path.SEPARATOR)) { - fromPathStr += Path.SEPARATOR; - } - for (Path validPath : validPaths) { - // Export valid directories with a modified name so they don't look like bases/deltas. - // We could also dump the delta contents all together and rename the files if names collide. - String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); - Path destPath = new Path(exportRootDataDir, mmChildPath); - exportFileSystem.mkdirs(destPath); - copyOneDataPath(validPath, destPath); + for (Path fromPath : dataPathList) { + fromPath = dataFileSystem.makeQualified(fromPath); + List validPaths = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + HiveInputFormat.processPathsForMmRead(dataPathList, + hiveConf, ids, validPaths, dirsWithOriginals); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + Utilities.FILE_OP_LOGGER.debug("Exporting {} to {}", validPath, destPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + for (Path dirWithOriginals : dirsWithOriginals) { + FileStatus[] files = dataFileSystem.listStatus(dirWithOriginals, AcidUtils.hiddenFileFilter); + List srcPaths = new ArrayList<>(); + for (FileStatus fileStatus : files) { + if (fileStatus.isDirectory()) continue; + srcPaths.add(fileStatus.getPath()); + } + Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}", + dirWithOriginals, exportRootDataDir); + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + } } } - private List getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException { - Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath); - AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids); - List validPaths = new ArrayList<>(); - Path base = acidState.getBaseDirectory(); - if (base != null) { - validPaths.add(base); - } - for (ParsedDelta pd : acidState.getCurrentDirectories()) { - validPaths.add(pd.getPath()); - } - return validPaths; - } /** @@ -141,18 +145,25 @@ private void copyMmPath() throws LoginException, IOException { */ private void exportFilesAsList() throws SemanticException, IOException { try (BufferedWriter writer = writer()) { - if (mmCtx != null) { - assert dataPathList.size() == 1; - Path dataPath = dataPathList.get(0); - ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList( - hiveConf, mmCtx.getFqTableName()); - List validPaths = getMmValidPaths(ids, dataPath); + if (mmCtx == null) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + return; + } + + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + List validPaths = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + for (Path dataPath : dataPathList) { + HiveInputFormat.processPathsForMmRead(Lists.newArrayList(dataPath), + hiveConf, ids, validPaths, dirsWithOriginals); for (Path mmPath : validPaths) { writeFilesList(listFilesInDir(mmPath), writer, AcidUtils.getAcidSubDir(dataPath)); } - } else { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + for (Path dir : dirsWithOriginals) { + FileStatus[] files = dataFileSystem.listStatus(dir, AcidUtils.hiddenFileFilter); + files = Arrays.stream(files).filter(f -> !f.isDirectory()).toArray(FileStatus[]::new); + writeFilesList(files, writer, AcidUtils.getAcidSubDir(dataPath)); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 80f77b9f0c..e77fc3eac8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -186,7 +186,7 @@ public void setLbCtx(ListBucketingCtx lbCtx) { } } } else { - Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + dirPath); + Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + dirPath, new Exception()); resTsks.add(mvTask); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b698c84080..982b180761 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -337,9 +337,11 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, } int deltaCount = dir.getCurrentDirectories().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " + + origCount + " originals"); return; } try { @@ -355,7 +357,8 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, // Note: we could skip creating the table and just add table type stuff directly to the // "insert overwrite directory" command if there were no bucketing or list bucketing. - String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName; + String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_"; + String tmpTableName = null; while (true) { tmpTableName = tmpPrefix + System.currentTimeMillis(); String query = buildMmCompactionCtQuery(tmpTableName, t, diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 3e2784ba2d..a4d34a7513 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -198,7 +198,7 @@ public void testMmExim() throws Exception { String.format("select a,b from %s order by a,b", importName)); Assert.assertEquals("After import: " + rs, allData, rs); runStatementOnDriver("drop table if exists " + importName); - + // Do insert overwrite to create some invalid deltas, and import into a non-MM table. int[][] rows2 = {{5,6},{7,8}}; runStatementOnDriver(String.format("insert overwrite table %s %s", @@ -259,7 +259,7 @@ private void verifyMmExportPaths(List paths, int deltasOrBases) { return paths; } - + /** * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) * @throws Exception diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index a88a570c52..dbd7895417 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -35,6 +35,7 @@ import org.junit.Rule; import org.junit.rules.TestName; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; @@ -43,6 +44,7 @@ import java.util.Set; public abstract class TxnCommandsBaseForTests { + private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class); //bucket count for test tables; set it to 1 for easier debugging final static int BUCKET_COUNT = 2; @Rule @@ -85,6 +87,7 @@ void setUpInternal() throws Exception { "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(hiveConf); File f = new File(getWarehouseDir()); @@ -149,6 +152,7 @@ void runCleaner(HiveConf hiveConf) throws MetaException { } List runStatementOnDriver(String stmt) throws Exception { + LOG.info("Running the query: " + stmt); CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); diff --git ql/src/test/queries/clientpositive/mm_conversions.q ql/src/test/queries/clientpositive/mm_conversions.q index 55565a9428..e933582927 100644 --- ql/src/test/queries/clientpositive/mm_conversions.q +++ ql/src/test/queries/clientpositive/mm_conversions.q @@ -15,21 +15,42 @@ insert into table intermediate partition(p='455') select distinct key from src w insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1; insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1; +set hive.mm.allow.originals=true; +set hive.exim.test.mode=true; drop table simple_to_mm; -create table simple_to_mm(key int) stored as orc; +create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false"); insert into table simple_to_mm select key from intermediate; select * from simple_to_mm s1 order by key; alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +export table simple_to_mm to 'ql/test/data/exports/export0'; select * from simple_to_mm s2 order by key; +create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted0_mm from 'ql/test/data/exports/export0'; +select * from import_converted0_mm order by key; +drop table import_converted0_mm; + insert into table simple_to_mm select key from intermediate; insert into table simple_to_mm select key from intermediate; +export table simple_to_mm to 'ql/test/data/exports/export1'; select * from simple_to_mm s3 order by key; +create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted1_mm from 'ql/test/data/exports/export1'; +select * from import_converted1_mm order by key; +drop table import_converted1_mm; + +insert overwrite table simple_to_mm select key from intermediate; +export table simple_to_mm to 'ql/test/data/exports/export2'; +select * from simple_to_mm s4 order by key; +create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted2_mm from 'ql/test/data/exports/export2'; +select * from import_converted2_mm order by key; +drop table import_converted2_mm; drop table simple_to_mm; drop table part_to_mm; -create table part_to_mm(key int) partitioned by (key_mm int) stored as orc; +create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false"); insert into table part_to_mm partition(key_mm='455') select key from intermediate; insert into table part_to_mm partition(key_mm='456') select key from intermediate; select * from part_to_mm s1 order by key, key_mm; @@ -40,4 +61,19 @@ insert into table part_to_mm partition(key_mm='457') select key from intermediat select * from part_to_mm s3 order by key, key_mm; drop table part_to_mm; +set hive.mm.allow.originals=false; + +drop table simple_to_mm_text; +create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false"); +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t1 order by key; +alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from simple_to_mm_text t2 order by key; +insert into table simple_to_mm_text select key from intermediate; +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t3 order by key; +insert overwrite table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t4 order by key; +drop table simple_to_mm_text; + drop table intermediate; diff --git ql/src/test/results/clientpositive/llap/mm_conversions.q.out ql/src/test/results/clientpositive/llap/mm_conversions.q.out index 4754710291..8a9036a7bc 100644 --- ql/src/test/results/clientpositive/llap/mm_conversions.q.out +++ ql/src/test/results/clientpositive/llap/mm_conversions.q.out @@ -41,11 +41,11 @@ PREHOOK: query: drop table simple_to_mm PREHOOK: type: DROPTABLE POSTHOOK: query: drop table simple_to_mm POSTHOOK: type: DROPTABLE -PREHOOK: query: create table simple_to_mm(key int) stored as orc +PREHOOK: query: create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: create table simple_to_mm(key int) stored as orc +POSTHOOK: query: create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@simple_to_mm @@ -83,6 +83,14 @@ POSTHOOK: query: alter table simple_to_mm set tblproperties("transactional"="tru POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@simple_to_mm POSTHOOK: Output: default@simple_to_mm +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### PREHOOK: query: select * from simple_to_mm s2 order by key PREHOOK: type: QUERY PREHOOK: Input: default@simple_to_mm @@ -94,6 +102,41 @@ POSTHOOK: Input: default@simple_to_mm 0 98 100 +PREHOOK: query: create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted0_mm +PREHOOK: query: import table import_converted0_mm from 'ql/test/data/exports/export0' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: import table import_converted0_mm from 'ql/test/data/exports/export0' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted0_mm +PREHOOK: query: select * from import_converted0_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted0_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted0_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted0_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table import_converted0_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted0_mm +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: drop table import_converted0_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted0_mm +POSTHOOK: Output: default@import_converted0_mm PREHOOK: query: insert into table simple_to_mm select key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -124,6 +167,14 @@ POSTHOOK: Input: default@intermediate@p=456 POSTHOOK: Input: default@intermediate@p=457 POSTHOOK: Output: default@simple_to_mm POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### PREHOOK: query: select * from simple_to_mm s3 order by key PREHOOK: type: QUERY PREHOOK: Input: default@simple_to_mm @@ -141,6 +192,116 @@ POSTHOOK: Input: default@simple_to_mm 100 100 100 +PREHOOK: query: create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: import table import_converted1_mm from 'ql/test/data/exports/export1' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: import table import_converted1_mm from 'ql/test/data/exports/export1' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: select * from import_converted1_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted1_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted1_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted1_mm +#### A masked pattern was here #### +0 +0 +0 +98 +98 +98 +100 +100 +100 +PREHOOK: query: drop table import_converted1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted1_mm +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: drop table import_converted1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted1_mm +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: insert overwrite table simple_to_mm select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm +POSTHOOK: query: insert overwrite table simple_to_mm select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm +POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +PREHOOK: query: select * from simple_to_mm s4 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm s4 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted2_mm +PREHOOK: query: import table import_converted2_mm from 'ql/test/data/exports/export2' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: import table import_converted2_mm from 'ql/test/data/exports/export2' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted2_mm +PREHOOK: query: select * from import_converted2_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted2_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted2_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted2_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table import_converted2_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted2_mm +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: drop table import_converted2_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted2_mm +POSTHOOK: Output: default@import_converted2_mm PREHOOK: query: drop table simple_to_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@simple_to_mm @@ -153,11 +314,11 @@ PREHOOK: query: drop table part_to_mm PREHOOK: type: DROPTABLE POSTHOOK: query: drop table part_to_mm POSTHOOK: type: DROPTABLE -PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc +PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@part_to_mm -POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc +POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@part_to_mm @@ -299,6 +460,144 @@ POSTHOOK: query: drop table part_to_mm POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@part_to_mm POSTHOOK: Output: default@part_to_mm +PREHOOK: query: drop table simple_to_mm_text +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table simple_to_mm_text +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@simple_to_mm_text +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t1 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t1 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@simple_to_mm_text +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@simple_to_mm_text +POSTHOOK: Output: default@simple_to_mm_text +PREHOOK: query: select * from simple_to_mm_text t2 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t2 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t3 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t3 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +0 +0 +98 +98 +98 +100 +100 +100 +PREHOOK: query: insert overwrite table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert overwrite table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t4 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t4 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table simple_to_mm_text +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@simple_to_mm_text +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: drop table simple_to_mm_text +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@simple_to_mm_text +POSTHOOK: Output: default@simple_to_mm_text PREHOOK: query: drop table intermediate PREHOOK: type: DROPTABLE PREHOOK: Input: default@intermediate