diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8347f7f084..a1f1f73095 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2394,6 +2394,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_LOCK_QUERY_STRING_MAX_LENGTH("hive.lock.query.string.max.length", 1000000, "The maximum length of the query string to store in the lock.\n" + "The default value is 1000000, since the data limit of a znode is 1MB"), + HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false, + "Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" + + "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" + + "read MM tables with original files will fail. The default in Hive 3.0 is false."), // Zookeeper related configs HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "", diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index af425197f9..46c99d6293 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -154,6 +155,7 @@ public void setup() throws Exception { TxnDbUtil.prepDb(hiveConf); conf = hiveConf; + HiveConf.setBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS, true); msClient = new HiveMetaStoreClient(conf); driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); @@ -984,6 +986,96 @@ public void mmTable() throws Exception { } @Test + public void mmTableOriginalsOrc() throws Exception { + mmTableOriginals("ORC"); + } + + @Test + public void mmTableOriginalsText() throws Exception { + mmTableOriginals("TEXTFILE"); + } + + private void mmTableOriginals(String format) throws Exception { + // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat. + boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format); + String dbName = "default"; + String tblName = "mm_nonpart"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + verifyFooBarResult(tblName, 3); + + FileSystem fs = FileSystem.get(conf); + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + + verifyFooBarResult(tblName, 3); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 3); + verifyHasBase(table.getSd(), fs, "base_0000001"); + + // Try with an extra delta. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + + // Neither select nor compaction (which is a select) wil work after this. + if (isBrokenUntilMapreduce7086) return; + + verifyFooBarResult(tblName, 9); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 9); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + // Try with an extra base. + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + + format + " TBLPROPERTIES ('transactional'='false')", driver); + table = msClient.getTable(dbName, tblName); + + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver); + executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver); + executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 3); + + executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES " + + "('transactional'='true', 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver); + verifyFooBarResult(tblName, 6); + + runMajorCompaction(dbName, tblName); + verifyFooBarResult(tblName, 6); + verifyHasBase(table.getSd(), fs, "base_0000002"); + + msClient.close(); + } + + + @Test public void mmTableBucketed() throws Exception { String dbName = "default"; String tblName = "mm_nonpart"; @@ -1054,7 +1146,7 @@ public void mmTableOpenWriteId() throws Exception { msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3. runMajorCompaction(dbName, tblName); // Compact 4 and 5. verifyFooBarResult(tblName, 2); - verifyHasBase(table.getSd(), fs, "base_0000005"); + verifyHasBase(table.getSd(), fs, "base_0000005"); runCleaner(conf); verifyDeltaCount(table.getSd(), fs, 0); } @@ -1108,7 +1200,7 @@ public void mmTablePartitioned() throws Exception { p2 = msClient.getPartition(dbName, tblName, "ds=2"), p3 = msClient.getPartition(dbName, tblName, "ds=3"); msClient.close(); - + FileSystem fs = FileSystem.get(conf); verifyDeltaCount(p1.getSd(), fs, 3); verifyDeltaCount(p2.getSd(), fs, 2); diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 1181461162..aeb4d48883 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -585,6 +585,7 @@ minillaplocal.query.files=\ mapjoin_hint.q,\ mapjoin_emit_interval.q,\ mergejoin_3way.q,\ + mm_bhif.q,\ mm_conversions.q,\ mm_exim.q,\ mm_loaddata.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index b0ec5abcce..1a8e5e79e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -63,14 +63,25 @@ public int execute(DriverContext driverContext) { protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; try { - Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath); + Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath); console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); + FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter()); + + // TODO: this is very brittle given that Hive supports nested directories in the tables. + // The caller should pass a flag explicitly telling us if the directories in the + // input are data, or parent of data. For now, retain this for backward compat. + if (srcs != null && srcs.length == 1 && srcs[0].isDirectory() + /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) - still broken for partitions*/) { + Utilities.FILE_OP_LOGGER.debug( + "Recursing into a single child directory {}", srcs[0].getPath().getName()); + srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -107,40 +118,6 @@ protected int copyOnePath(Path fromPath, Path toPath) { } } - // Note: initially copied from LoadSemanticAnalyzer. - private static FileStatus[] matchFilesOrDir( - FileSystem fs, Path path, boolean isSourceMm) throws IOException { - if (!fs.exists(path)) return null; - if (!isSourceMm) return matchFilesOneDir(fs, path, null); - // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); - if (mmDirs == null || mmDirs.length == 0) return null; - List allFiles = new ArrayList(); - for (FileStatus mmDir : mmDirs) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath()); - } - matchFilesOneDir(fs, mmDir.getPath(), allFiles); - } - return allFiles.toArray(new FileStatus[allFiles.size()]); - } - - private static FileStatus[] matchFilesOneDir( - FileSystem fs, Path path, List result) throws IOException { - FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); - if (srcs != null && srcs.length == 1) { - if (srcs[0].isDirectory()) { - srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } - } - if (result != null && srcs != null) { - for (int i = 0; i < srcs.length; ++i) { - result.add(srcs[i]); - } - } - return srcs; - } - private static final class EximPathFilter implements PathFilter { @Override public boolean accept(Path p) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index ab758c43c7..239262b05c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4474,7 +4474,22 @@ private void checkMmLb(Partition part) throws HiveException { Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps()); if (isToMmTable != null) { if (!isFromMmTable && isToMmTable) { - result = generateAddMmTasks(tbl, alterTbl.getWriteId()); + if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS)) { + result = generateAddMmTasks(tbl, alterTbl.getWriteId()); + } else { + if (tbl.getPartitionKeys().size() > 0) { + Hive db = getHive(); + PartitionIterable parts = new PartitionIterable(db, tbl, null, + HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + Iterator partIter = parts.iterator(); + while (partIter.hasNext()) { + Partition part0 = partIter.next(); + checkMmLb(part0); + } + } else { + checkMmLb(tbl); + } + } } else if (isFromMmTable && !isToMmTable) { throw new HiveException("Cannot convert an ACID table to non-ACID"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index e3af4f9a53..3c6a606b01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -47,15 +47,13 @@ public String getName() { protected int execute(DriverContext driverContext) { try { // Also creates the root directory - TableExport.Paths exportPaths = - new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), - conf, false); + TableExport.Paths exportPaths = new TableExport.Paths( + work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); work.acidPostProcess(db); - TableExport tableExport = new TableExport( - exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf - ); + TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), + work.getReplicationSpec(), db, null, conf, work.getMmContext()); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 969c591917..224690109e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -379,34 +379,59 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException Class formatter = currDesc.getInputFileFormatClass(); Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); - String inputs = processCurrPathForMmWriteIds(inputFormat); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to " + inputs); + List dirs = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + processCurrPathForMmWriteIds(inputFormat, dirs, dirsWithOriginals); + if (dirs.isEmpty() && dirsWithOriginals.isEmpty()) { + LOG.debug("No valid directories for " + currPath); + continue; } - if (inputs == null) return null; - job.set("mapred.input.dir", inputs); - InputSplit[] splits = inputFormat.getSplits(job, 1); - FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { - inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); + List inputSplits = new ArrayList<>(); + if (!dirs.isEmpty()) { + String inputs = makeInputString(dirs); + Utilities.FILE_OP_LOGGER.trace("Setting fetch inputs to {}", inputs); + job.set("mapred.input.dir", inputs); + + generateWrappedSplits(inputFormat, inputSplits, job); + } + + if (!dirsWithOriginals.isEmpty()) { + String inputs = makeInputString(dirsWithOriginals); + Utilities.FILE_OP_LOGGER.trace("Setting originals fetch inputs to {}", inputs); + JobConf jobNoRec = HiveInputFormat.createConfForMmOriginalsSplit(job, dirsWithOriginals); + jobNoRec.set("mapred.input.dir", inputs); + generateWrappedSplits(inputFormat, inputSplits, jobNoRec); } + if (work.getSplitSample() != null) { inputSplits = splitSampling(work.getSplitSample(), inputSplits); } - if (inputSplits.length > 0) { - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { - Arrays.sort(inputSplits, new FetchInputFormatSplitComparator()); - } - return inputSplits; + + if (inputSplits.isEmpty()) { + LOG.debug("No splits for " + currPath); + continue; + } + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) { + Collections.sort(inputSplits, new FetchInputFormatSplitComparator()); } + return inputSplits.toArray(new FetchInputFormatSplit[inputSplits.size()]); } + return null; } - private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOException { + private void generateWrappedSplits(InputFormat inputFormat, + List inputSplits, JobConf job) throws IOException { + InputSplit[] splits = inputFormat.getSplits(job, 1); + for (int i = 0; i < splits.length; i++) { + inputSplits.add(new FetchInputFormatSplit(splits[i], inputFormat)); + } + } + + private void processCurrPathForMmWriteIds(InputFormat inputFormat, + List dirs, List dirsWithOriginals) throws IOException { if (inputFormat instanceof HiveInputFormat) { - return StringUtils.escapeString(currPath.toString()); // No need to process here. + dirs.add(currPath); // No need to process here. } ValidWriteIdList validWriteIdList; if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { @@ -418,17 +443,19 @@ private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOEx Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + " for MM paths"); } - Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validWriteIdList); - if (dirs == null || dirs.length == 0) { - return null; // No valid inputs. This condition is logged inside the call. - } - StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs[0].toString())); - for(int i = 1; i < dirs.length;i++) { - str.append(",").append(StringUtils.escapeString(dirs[i].toString())); + HiveInputFormat.processPathsForMmRead( + Lists.newArrayList(currPath), job, validWriteIdList, dirs, dirsWithOriginals); + } + + private String makeInputString(List dirs) { + if (dirs == null || dirs.isEmpty()) return ""; + StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs.get(0).toString())); + for(int i = 1; i < dirs.size(); i++) { + str.append(",").append(StringUtils.escapeString(dirs.get(i).toString())); } return str.toString(); - } + } private ValidWriteIdList extractValidWriteIdList() { if (currDesc.getTableName() == null || !org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) { String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY); @@ -438,18 +465,18 @@ private ValidWriteIdList extractValidWriteIdList() { return null; // not fetching from a table directly but from a temp location } - private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, - FetchInputFormatSplit[] splits) { + private List splitSampling(SplitSample splitSample, + List splits) { long totalSize = 0; for (FetchInputFormatSplit split: splits) { totalSize += split.getLength(); } - List result = new ArrayList(splits.length); + List result = new ArrayList(splits.size()); long targetSize = splitSample.getTargetSize(totalSize); - int startIndex = splitSample.getSeedNum() % splits.length; + int startIndex = splitSample.getSeedNum() % splits.size(); long size = 0; - for (int i = 0; i < splits.length; i++) { - FetchInputFormatSplit split = splits[(startIndex + i) % splits.length]; + for (int i = 0; i < splits.size(); i++) { + FetchInputFormatSplit split = splits.get((startIndex + i) % splits.size()); result.add(split); long splitgLength = split.getLength(); if (size + splitgLength >= targetSize) { @@ -460,7 +487,7 @@ private ValidWriteIdList extractValidWriteIdList() { } size += splitgLength; } - return result.toArray(new FetchInputFormatSplit[result.size()]); + return result; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 31846a38e0..406bea011d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -72,6 +73,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 88d352b1d4..ccdf04aae7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,7 +289,10 @@ private void dumpTable(String dbName, String tblName, String validTxnList, Path if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); } - new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); + replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 51a793f9fb..7fce67fc3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; @@ -149,6 +150,7 @@ public boolean accept(Path path) { public static final int MAX_STATEMENTS_PER_TXN = 10000; public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$"); public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}"); + /** * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1 * (Unless via Load Data statement) @@ -390,6 +392,57 @@ else if (filename.startsWith(BUCKET_PREFIX)) { } return result; } + + public static final class DirectoryImpl implements Directory { + private final List abortedDirectories; + private final boolean isBaseInRawFormat; + private final List original; + private final List obsolete; + private final List deltas; + private final Path base; + + public DirectoryImpl(List abortedDirectories, + boolean isBaseInRawFormat, List original, + List obsolete, List deltas, Path base) { + this.abortedDirectories = abortedDirectories; + this.isBaseInRawFormat = isBaseInRawFormat; + this.original = original; + this.obsolete = obsolete; + this.deltas = deltas; + this.base = base; + } + + @Override + public Path getBaseDirectory() { + return base; + } + + @Override + public boolean isBaseInRawFormat() { + return isBaseInRawFormat; + } + + @Override + public List getOriginalFiles() { + return original; + } + + @Override + public List getCurrentDirectories() { + return deltas; + } + + @Override + public List getObsolete() { + return obsolete; + } + + @Override + public List getAbortedDirectories() { + return abortedDirectories; + } + } + //This is used for (full) Acid tables. InsertOnly use NOT_ACID public enum Operation implements Serializable { NOT_ACID, INSERT, UPDATE, DELETE; @@ -984,7 +1037,7 @@ public static Directory getAcidState(Path directory, // Okay, we're going to need these originals. Recurse through them and figure out what we // really need. for (FileStatus origDir : originalDirectories) { - findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles); + findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true); } } @@ -1056,7 +1109,7 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId * If this sort order is changed and there are tables that have been converted to transactional * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it * may result in data loss since it may change how - * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns + * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen). */ Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> { @@ -1066,37 +1119,8 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good. final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs); - return new Directory() { - - @Override - public Path getBaseDirectory() { - return base; - } - @Override - public boolean isBaseInRawFormat() { - return isBaseInRawFormat; - } - - @Override - public List getOriginalFiles() { - return original; - } - - @Override - public List getCurrentDirectories() { - return deltas; - } - - @Override - public List getObsolete() { - return obsolete; - } - - @Override - public List getAbortedDirectories() { - return abortedDirectories; - } - }; + return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, + obsolete, deltas, base); } /** * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) @@ -1208,8 +1232,9 @@ public Long getFileId() { * @param original the list of original files * @throws IOException */ - private static void findOriginals(FileSystem fs, FileStatus stat, - List original, Ref useFileIds, boolean ignoreEmptyFiles) throws IOException { + public static void findOriginals(FileSystem fs, FileStatus stat, + List original, Ref useFileIds, + boolean ignoreEmptyFiles, boolean recursive) throws IOException { assert stat.isDir(); List childrenWithId = null; Boolean val = useFileIds.value; @@ -1228,8 +1253,10 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - if (child.getFileStatus().isDir()) { - findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles); + if (child.getFileStatus().isDirectory()) { + if (recursive) { + findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) { original.add(child); @@ -1240,7 +1267,9 @@ private static void findOriginals(FileSystem fs, FileStatus stat, List children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter); for (FileStatus child : children) { if (child.isDir()) { - findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles); + if (recursive) { + findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, true); + } } else { if(!ignoreEmptyFiles || child.getLen() > 0) { original.add(createOriginalObj(null, child)); @@ -1250,6 +1279,7 @@ private static void findOriginals(FileSystem fs, FileStatus stat, } } + public static boolean isTablePropertyTransactional(Properties props) { String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); if (resultStr == null) { @@ -1817,7 +1847,7 @@ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) return null; } Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList); - // Assume that for an MM table, or if there's only the base directory, we are good. + // Assume that for an MM table, or if there's only the base directory, we are good. if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) { Utilities.FILE_OP_LOGGER.warn( "Computing stats for an ACID table; stats may be inaccurate"); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index e09c6ecac0..5d2093101e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -22,15 +22,12 @@ import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; @@ -40,6 +37,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * BucketizedHiveInputFormat serves the similar function as hiveInputFormat but @@ -50,8 +51,7 @@ public class BucketizedHiveInputFormat extends HiveInputFormat { - public static final Logger LOG = LoggerFactory - .getLogger("org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat"); + public static final Logger LOG = LoggerFactory.getLogger(BucketizedHiveInputFormat.class); @Override public RecordReader getRecordReader(InputSplit split, JobConf job, @@ -123,30 +123,70 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, // for each dir, get all files under the dir, do getSplits to each // individual file, // and then create a BucketizedHiveInputSplit on it + + ArrayList currentDir = null; for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); - // create a new InputFormat instance if this is the first time to see this - // class + // create a new InputFormat instance if this is the first time to see this class Class inputFormatClass = part.getInputFileFormatClass(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); newjob.setInputFormat(inputFormat.getClass()); - FileStatus[] listStatus = listStatus(newjob, dir); - - for (FileStatus status : listStatus) { - LOG.info("block size: " + status.getBlockSize()); - LOG.info("file length: " + status.getLen()); - FileInputFormat.setInputPaths(newjob, status.getPath()); - InputSplit[] iss = inputFormat.getSplits(newjob, 0); - if (iss != null && iss.length > 0) { - numOrigSplits += iss.length; - result.add(new BucketizedHiveInputSplit(iss, inputFormatClass - .getName())); + ValidWriteIdList mmIds = null; + if (part.getTableDesc() != null) { + // This can happen for truncate table case for non-MM tables. + mmIds = getMmValidWriteIds(newjob, part.getTableDesc(), null); + } + // TODO: should this also handle ACID operation, etc.? seems to miss a lot of stuff from HIF. + List finalDirs = null, dirsWithMmOriginals = null; + if (mmIds == null) { + finalDirs = Lists.newArrayList(dir); + } else { + finalDirs = new ArrayList<>(); + dirsWithMmOriginals = new ArrayList<>(); + processPathsForMmRead( + Lists.newArrayList(dir), newjob, mmIds, finalDirs, dirsWithMmOriginals); + } + if (finalDirs.isEmpty() && (dirsWithMmOriginals == null || dirsWithMmOriginals.isEmpty())) { + continue; // No valid inputs - possible in MM case. + } + + for (Path finalDir : finalDirs) { + FileStatus[] listStatus = listStatus(newjob, finalDir); + for (FileStatus status : listStatus) { + numOrigSplits = addBHISplit( + status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); + } + } + if (dirsWithMmOriginals != null) { + for (Path originalsDir : dirsWithMmOriginals) { + FileSystem fs = originalsDir.getFileSystem(job); + FileStatus[] listStatus = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER); + for (FileStatus status : listStatus) { + if (status.isDirectory()) continue; + numOrigSplits = addBHISplit( + status, inputFormat, inputFormatClass, numOrigSplits, newjob, result); + } } } } + LOG.info(result.size() + " bucketized splits generated from " + numOrigSplits + " original splits."); return result.toArray(new BucketizedHiveInputSplit[result.size()]); } + + private int addBHISplit(FileStatus status, InputFormat inputFormat, Class inputFormatClass, + int numOrigSplits, JobConf newjob, ArrayList result) throws IOException { + LOG.info("block size: " + status.getBlockSize()); + LOG.info("file length: " + status.getLen()); + FileInputFormat.setInputPaths(newjob, status.getPath()); + InputSplit[] iss = inputFormat.getSplits(newjob, 0); + if (iss != null && iss.length > 0) { + numOrigSplits += iss.length; + result.add(new BucketizedHiveInputSplit(iss, inputFormatClass + .getName())); + } + return numOrigSplits; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6c6eeff930..bcc05081aa 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -29,35 +29,30 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.Map.Entry; - -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; -import org.apache.hive.common.util.Ref; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; -import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -65,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; @@ -82,7 +78,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.Ref; import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -461,18 +460,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job InputFormat inputFormat, Class inputFormatClass, int splits, TableDesc table, List result) throws IOException { - ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName()); - ValidWriteIdList validMmWriteIdList; - if (AcidUtils.isInsertOnlyTable(table.getProperties())) { - if (validWriteIdList == null) { - throw new IOException("Insert-Only table: " + table.getTableName() - + " is missing from the ValidWriteIdList config: " - + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); - } - validMmWriteIdList = validWriteIdList; - } else { - validMmWriteIdList = null; // for non-MM case - } + ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList( + conf, table.getTableName()); + ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList); try { Utilities.copyTablePropertiesToConf(table, conf); @@ -497,112 +487,166 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job pushFilters(conf, tableScan, this.mrwork); } - Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList); - if (finalDirs == null) { + List dirsWithFileOriginals = new ArrayList<>(), finalDirs = new ArrayList<>(); + processPathsForMmRead(dirs, conf, validMmWriteIdList, finalDirs, dirsWithFileOriginals); + if (finalDirs.isEmpty() && dirsWithFileOriginals.isEmpty()) { // This is for transactional tables. if (!conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { LOG.warn("No valid inputs found in " + dirs); - return; // No valid inputs. } else if (validMmWriteIdList != null) { // AcidUtils.getAcidState() is already called to verify there is no input split. // Thus for a GroupByOperator summary row, set finalDirs and add a Dummy split here. - finalDirs = dirs.toArray(new Path[dirs.size()]); - result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), - ZeroRowsInputFormat.class.getName())); + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit( + dirs.get(0).toString()), ZeroRowsInputFormat.class.getName())); } - } else { - FileInputFormat.setInputPaths(conf, finalDirs); - conf.setInputFormat(inputFormat.getClass()); - - int headerCount = 0; - int footerCount = 0; - if (table != null) { - headerCount = Utilities.getHeaderCount(table); - footerCount = Utilities.getFooterCount(table, conf); - if (headerCount != 0 || footerCount != 0) { - // Input file has header or footer, cannot be splitted. - HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE); - } + return; // No valid inputs. + } + + conf.setInputFormat(inputFormat.getClass()); + int headerCount = 0; + int footerCount = 0; + if (table != null) { + headerCount = Utilities.getHeaderCount(table); + footerCount = Utilities.getFooterCount(table, conf); + if (headerCount != 0 || footerCount != 0) { + // Input file has header or footer, cannot be splitted. + HiveConf.setLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, Long.MAX_VALUE); } + } + if (!finalDirs.isEmpty()) { + FileInputFormat.setInputPaths(conf, finalDirs.toArray(new Path[finalDirs.size()])); InputSplit[] iss = inputFormat.getSplits(conf, splits); for (InputSplit is : iss) { result.add(new HiveInputSplit(is, inputFormatClass.getName())); } - if (iss.length == 0 && finalDirs.length > 0 && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { - // If there are no inputs; the Execution engine skips the operator tree. - // To prevent it from happening; an opaque ZeroRows input is added here - when needed. - result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit(finalDirs[0].toString()), - ZeroRowsInputFormat.class.getName())); + } + + if (!dirsWithFileOriginals.isEmpty()) { + // We are going to add splits for these directories with recursive = false, so we ignore + // any subdirectories (deltas or original directories) and only read the original files. + // The fact that there's a loop calling addSplitsForGroup already implies it's ok to + // the real input format multiple times... however some split concurrency/etc configs + // that are applied separately in each call will effectively be ignored for such splits. + JobConf nonRecConf = createConfForMmOriginalsSplit(conf, dirsWithFileOriginals); + InputSplit[] iss = inputFormat.getSplits(nonRecConf, splits); + for (InputSplit is : iss) { + result.add(new HiveInputSplit(is, inputFormatClass.getName())); } } + + if (result.isEmpty() && conf.getBoolean(Utilities.ENSURE_OPERATORS_EXECUTED, false)) { + // If there are no inputs; the Execution engine skips the operator tree. + // To prevent it from happening; an opaque ZeroRows input is added here - when needed. + result.add(new HiveInputSplit(new NullRowsInputFormat.DummyInputSplit( + finalDirs.get(0).toString()), ZeroRowsInputFormat.class.getName())); + } } - public static Path[] processPathsForMmRead(List dirs, JobConf conf, - ValidWriteIdList validWriteIdList) throws IOException { - if (validWriteIdList == null) { - return dirs.toArray(new Path[dirs.size()]); - } else { - List finalPaths = new ArrayList<>(dirs.size()); - for (Path dir : dirs) { - processForWriteIds(dir, conf, validWriteIdList, finalPaths); - } - if (finalPaths.isEmpty()) { - return null; + public static JobConf createConfForMmOriginalsSplit( + JobConf conf, List dirsWithFileOriginals) { + JobConf nonRecConf = new JobConf(conf); + FileInputFormat.setInputPaths(nonRecConf, + dirsWithFileOriginals.toArray(new Path[dirsWithFileOriginals.size()])); + nonRecConf.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); + nonRecConf.setBoolean("mapred.input.dir.recursive", false); + // TODO: change to FileInputFormat.... field after MAPREDUCE-7086. + nonRecConf.setBoolean("mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs", true); + return nonRecConf; + } + + protected ValidWriteIdList getMmValidWriteIds( + JobConf conf, TableDesc table, ValidWriteIdList validWriteIdList) throws IOException { + if (!AcidUtils.isInsertOnlyTable(table.getProperties())) return null; + if (validWriteIdList == null) { + validWriteIdList = AcidUtils.getTableValidWriteIdList( conf, table.getTableName()); + if (validWriteIdList == null) { + throw new IOException("Insert-Only table: " + table.getTableName() + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); } - return finalPaths.toArray(new Path[finalPaths.size()]); } + return validWriteIdList; } - private static void processForWriteIds(Path dir, JobConf conf, - ValidWriteIdList validWriteIdList, List finalPaths) throws IOException { - FileSystem fs = dir.getFileSystem(conf); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs"); + public static void processPathsForMmRead(List dirs, Configuration conf, + ValidWriteIdList validWriteIdList, List finalPaths, + List pathsWithFileOriginals) throws IOException { + if (validWriteIdList == null) { + finalPaths.addAll(dirs); + return; + } + boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS); + for (Path dir : dirs) { + processForWriteIds( + dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals); } + } + + private static void processForWriteIds(Path dir, Configuration conf, + ValidWriteIdList validWriteIdList, boolean allowOriginals, List finalPaths, + List pathsWithFileOriginals) throws IOException { + FileSystem fs = dir.getFileSystem(conf); + Utilities.FILE_OP_LOGGER.trace("Checking {} for inputs", dir); + // Ignore nullscan-optimized paths. if (fs instanceof NullScanFileSystem) { finalPaths.add(dir); return; } - // Tez require the use of recursive input dirs for union processing, so we have to look into the - // directory to find out - LinkedList subdirs = new LinkedList<>(); - subdirs.add(dir); // add itself as a starting point - while (!subdirs.isEmpty()) { - Path currDir = subdirs.poll(); - FileStatus[] files = fs.listStatus(currDir); - boolean hadAcidState = false; // whether getAcidState has been called for currDir - for (FileStatus file : files) { - Path path = file.getPath(); - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Checking " + path + " for inputs"); + // We need to iterate to detect original directories, that are supported in MM but not ACID. + boolean hasOriginalFiles = false, hasAcidDirs = false; + List originalDirectories = new ArrayList<>(); + for (FileStatus file : fs.listStatus(dir, AcidUtils.hiddenFileFilter)) { + Path currDir = file.getPath(); + Utilities.FILE_OP_LOGGER.trace("Checking {} for being an input", currDir); + if (!file.isDirectory()) { + hasOriginalFiles = true; + } else if (AcidUtils.extractWriteId(currDir) == null) { + if (allowOriginals) { + originalDirectories.add(currDir); // Add as is; it would become a recursive split. + } else { + Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) directory {}", currDir); } - if (!file.isDirectory()) { - Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (AcidUtils.extractWriteId(path) == null) { - subdirs.add(path); - } else if (!hadAcidState) { - AcidUtils.Directory dirInfo - = AcidUtils.getAcidState(currDir, conf, validWriteIdList, Ref.from(false), true, null); - hadAcidState = true; - - // Find the base, created for IOW. - Path base = dirInfo.getBaseDirectory(); - if (base != null) { - finalPaths.add(base); - } + } else { + hasAcidDirs = true; + } + } + if (hasAcidDirs) { + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, conf, validWriteIdList, Ref.from(false), true, null); - // Find the parsed delta files. - for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { - Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); - finalPaths.add(delta.getPath()); - } - } + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", base); + finalPaths.add(base); + // Base means originals no longer matter. + originalDirectories.clear(); + hasOriginalFiles = false; + } + + // Find the parsed delta files. + for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { + Utilities.FILE_OP_LOGGER.debug("Adding input {}", delta.getPath()); + finalPaths.add(delta.getPath()); + } + } + if (!originalDirectories.isEmpty()) { + Utilities.FILE_OP_LOGGER.debug("Adding original directories {}", originalDirectories); + finalPaths.addAll(originalDirectories); + } + if (hasOriginalFiles) { + if (allowOriginals) { + Utilities.FILE_OP_LOGGER.debug("Directory has original files {}", dir); + pathsWithFileOriginals.add(dir); + } else { + Utilities.FILE_OP_LOGGER.debug("Ignoring unknown (original?) files in {}", dir); } } } + Path[] getInputPaths(JobConf job) throws IOException { Path[] dirs; @@ -714,7 +758,7 @@ private static void processForWriteIds(Path dir, JobConf conf, pushProjection(newjob, readColumnsBuffer, readColumnNamesBuffer); } - if (dirs.length != 0) { + if (dirs.length != 0) { // TODO: should this be currentDirs? if (LOG.isInfoEnabled()) { LOG.info("Generating splits for dirs: {}", dirs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index d58b82ecbb..694cf759f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -105,6 +105,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -117,6 +118,7 @@ import org.apache.orc.FileFormatException; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.Footer; +import org.apache.orc.OrcProto.Type; import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.StripeStatistics; @@ -342,7 +344,7 @@ public static boolean isOriginal(Footer footer) { return false; } - + public static boolean[] genIncludedColumns(TypeDescription readerSchema, List included) { return genIncludedColumns(readerSchema, included, null); @@ -701,15 +703,15 @@ public boolean validateInput(FileSystem fs, HiveConf conf, // & therefore we should be able to retrieve them here and determine appropriate behavior. // Note that this will be meaningless for non-acid tables & will be set to null. //this is set by Utilities.copyTablePropertiesToConf() - boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); - String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); - this.acidOperationalProperties = isTableTransactional ? - AcidOperationalProperties.parseString(transactionalProperties) : null; + boolean isTxnTable = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + String txnProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + this.acidOperationalProperties = isTxnTable + ? AcidOperationalProperties.parseString(txnProperties) : null; String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString() - + " isTransactionalTable: " + isTableTransactional); + + " isTransactionalTable: " + isTxnTable + " properties: " + txnProperties); } @VisibleForTesting @@ -1144,6 +1146,7 @@ public String toString() { private final Ref useFileIds; private final UserGroupInformation ugi; + @VisibleForTesting FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds, UserGroupInformation ugi) { this(context, fs, dir, Ref.from(useFileIds), ugi); @@ -1176,13 +1179,31 @@ public AcidDirInfo run() throws Exception { } private AcidDirInfo callInternal() throws IOException { + if (context.acidOperationalProperties != null + && context.acidOperationalProperties.isInsertOnly()) { + // See the class comment - HIF handles MM for all input formats, so if we try to handle it + // again, in particular for the non-recursive originals-only getSplits call, we will just + // get confused. This bypass was not necessary when MM tables didn't support originals. Now + // that they do, we use this path for anything MM table related, although everything except + // the originals could still be handled by AcidUtils like a regular non-txn table. + boolean isRecursive = context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, + context.conf.getBoolean("mapred.input.dir.recursive", false)); + List originals = new ArrayList<>(); + List baseFiles = new ArrayList<>(); + AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, useFileIds, true, isRecursive); + for (HdfsFileStatusWithId fileId : originals) { + baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); + } + return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals, + Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>()); + } //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, - context.writeIdList, useFileIds, true, null); + AcidUtils.Directory dirInfo = AcidUtils.getAcidState( + dir, context.conf, context.writeIdList, useFileIds, true, null); // find the base files (original or new style) List baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { - //for non-acid tables, all data files are in getOriginalFiles() list + // For non-acid tables (or paths), all data files are in getOriginalFiles() list for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE)); } @@ -1197,7 +1218,6 @@ private AcidDirInfo callInternal() throws IOException { // Find the parsed deltas- some of them containing only the insert delta events // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details) List parsedDeltas = new ArrayList<>(); - if (context.acidOperationalProperties != null && context.acidOperationalProperties.isSplitUpdate()) { // If we have split-update turned on for this table, then the delta events have already been @@ -1258,7 +1278,8 @@ private AcidDirInfo callInternal() throws IOException { We already handled all delete deltas above and there should not be any other deltas for any table type. (this was acid 1.0 code path). */ - assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir; + assert dirInfo.getCurrentDirectories().isEmpty() : + "Non empty curDir list?!: " + dirInfo.getCurrentDirectories(); // When split-update is not enabled, then all the deltas in the current directories // should be considered as usual. parsedDeltas.addAll(dirInfo.getCurrentDirectories()); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index 591c4b8c26..c112978ff7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -236,7 +236,9 @@ public static void jobClose(Path outputPath, boolean success, JobConf job, Path backupPath = backupOutputPath(fs, outputPath, job); Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); - fs.delete(backupPath, true); + if (backupPath != null) { + fs.delete(backupPath, true); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index d3c62a2775..4a366a9360 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Set; + +import javax.annotation.Nullable; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; - -import javax.annotation.Nullable; -import java.util.Set; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; /** * ExportSemanticAnalyzer. * */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { + private boolean isMmExport = false; ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -48,7 +52,9 @@ @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + Task task = analyzeExport(ast, null, db, conf, inputs, outputs); + isMmExport = task.getWork().getMmContext() != null; + rootTasks.add(task); } /** * @param acidTableName - table name in db.table format; not NULL if exporting Acid table @@ -80,12 +86,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { try { ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ + } catch (SemanticException sme) { + if (!replicationSpec.isInReplicationScope()) throw sme; + if ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException)) { // If we're in replication scope, it's possible that we're running the export long after // the table was dropped, so the table not existing currently or being a different kind of // table is not an error - it simply means we should no-op, and let a future export @@ -101,15 +105,26 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + // Note: this tableExport is actually never used other than for auth, and another one is + // created when the task is executed. So, we don't care about the correct MM state here. + TableExport.AuthEntities authEntities = new TableExport( + exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; + MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle); + + Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}", + ts == null ? null : ts.tableName, mmCtx); // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); + ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec, + ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx); // Create an export task and add it as a root task - return TaskFactory.get(exportWork); + return TaskFactory.get(exportWork); + } + + @Override + public boolean hasTransactionalInQuery() { + return isMmExport; // Full ACID export goes thru UpdateDelete analyzer. } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 1a3cef9515..cc7f0d5ca0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -384,7 +384,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -425,9 +425,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } LoadTableDesc loadTableWork = new LoadTableDesc( @@ -482,7 +480,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -519,9 +517,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } Task addPartTask = TaskFactory.get( @@ -832,8 +828,6 @@ private static void createRegularImportTasks( EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws HiveException, IOException, MetaException { - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); @@ -843,7 +837,7 @@ private static void createRegularImportTasks( org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -855,8 +849,7 @@ private static void createRegularImportTasks( Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, - isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -875,7 +868,7 @@ private static void createRegularImportTasks( if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm)); + replicationSpec, x, writeId, stmtId)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -893,7 +886,7 @@ private static void createRegularImportTasks( FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, - writeId, stmtId, isSourceMm)); + writeId, stmtId)); } } x.getTasks().add(t); @@ -930,7 +923,6 @@ private static void createReplImportTasks( throws HiveException, URISyntaxException, IOException, MetaException { Task dropTblTask = null; - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -1014,14 +1006,14 @@ private static void createReplImportTasks( for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } @@ -1066,7 +1058,7 @@ private static void createReplImportTasks( if (ptn == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1083,7 +1075,7 @@ private static void createReplImportTasks( if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1109,7 +1101,7 @@ private static void createReplImportTasks( if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 0f42ab8f86..088b5cf8cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -216,7 +216,7 @@ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode t case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { return new UpdateDeleteSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 70eb750338..d73fc4f336 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,8 @@ * it has a blocking queue that stores partitions to be dumped via a producer thread. * it has a worker thread pool that reads of the queue to perform the various tasks. */ +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; @@ -50,16 +53,18 @@ private final HiveConf hiveConf; private final int nThreads; private final SessionState callersSession; + private final MmContext mmCtx; private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue queue; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf) { + HiveConf hiveConf, MmContext mmCtx) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); this.callersSession = SessionState.get(); @@ -106,7 +111,7 @@ void write(final ReplicationSpec forReplicationSpec) throws InterruptedException List dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf) + new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index e801a6421d..20ff23a46b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class TableExport { private static final Logger logger = LoggerFactory.getLogger(TableExport.class); @@ -59,9 +62,10 @@ private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; + private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -76,6 +80,7 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; this.paths = paths; + this.mmCtx = mmCtx; } public boolean write() throws SemanticException { @@ -147,13 +152,13 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { List dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); // this is the data copy - new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf) + new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 690498ff6b..085f4a1cb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,27 +17,36 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.security.auth.login.LoginException; + +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.List; - +//TODO: this object is created once to call one method and then immediately destroyed. +//So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { private static Logger logger = LoggerFactory.getLogger(FileOperations.class); private final List dataPathList; @@ -45,13 +54,15 @@ private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final MmContext mmCtx; - public FileOperations(List dataPathList, Path exportRootDataDir, - String distCpDoAsUser, HiveConf hiveConf) throws IOException { + public FileOperations(List dataPathList, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, MmContext mmCtx) throws IOException { this.dataPathList = dataPathList; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; if ((dataPathList != null) && !dataPathList.isEmpty()) { dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); } else { @@ -72,23 +83,68 @@ public void export(ReplicationSpec forReplicationSpec) throws Exception { * This writes the actual data in the exportRootDataDir from the source. */ private void copyFiles() throws IOException, LoginException { - for (Path dataPath : dataPathList) { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath); - List srcPaths = new ArrayList<>(); - for (FileStatus fileStatus : fileStatuses) { - srcPaths.add(fileStatus.getPath()); + if (mmCtx == null) { + for (Path dataPath : dataPathList) { + copyOneDataPath(dataPath, exportRootDataDir); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + } else { + copyMmPath(); + } + } + + private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); + List srcPaths = new ArrayList<>(); + for (FileStatus fileStatus : fileStatuses) { + srcPaths.add(fileStatus.getPath()); } + + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); } + private void copyMmPath() throws LoginException, IOException { + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + for (Path fromPath : dataPathList) { + fromPath = dataFileSystem.makeQualified(fromPath); + List validPaths = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); + HiveInputFormat.processPathsForMmRead(dataPathList, + hiveConf, ids, validPaths, dirsWithOriginals); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + Utilities.FILE_OP_LOGGER.debug("Exporting {} to {}", validPath, destPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + for (Path dirWithOriginals : dirsWithOriginals) { + FileStatus[] files = dataFileSystem.listStatus(dirWithOriginals, AcidUtils.hiddenFileFilter); + List srcPaths = new ArrayList<>(); + for (FileStatus fileStatus : files) { + if (fileStatus.isDirectory()) continue; + srcPaths.add(fileStatus.getPath()); + } + Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}", + dirWithOriginals, exportRootDataDir); + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + } + } + } + + + /** * This needs the root data directory to which the data needs to be exported to. * The data export here is a list of files either in table/partition that are written to the _files * in the exportRootDataDir provided. */ private void exportFilesAsList() throws SemanticException, IOException { + // This is only called for replication that handles MM tables; no need for mmCtx. try (BufferedWriter writer = writer()) { for (Path dataPath : dataPathList) { writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 80f77b9f0c..e77fc3eac8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -186,7 +186,7 @@ public void setLbCtx(ListBucketingCtx lbCtx) { } } } else { - Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + dirPath); + Utilities.FILE_OP_LOGGER.info("Resolver returning movetask for " + dirPath, new Exception()); resTsks.add(mvTask); } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index c0e4a43d9c..018983f6dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,15 +33,10 @@ private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isSkipMmDirs = false; public CopyWork() { } - public CopyWork(final Path fromPath, final Path toPath) { - this(fromPath, toPath, true); - } - public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { this(new Path[] { fromPath }, new Path[] { toPath }); this.setErrorOnSrcEmpty(errorOnSrcEmpty); @@ -92,17 +87,4 @@ public void setErrorOnSrcEmpty(boolean errorOnSrcEmpty) { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } - - /** - * Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. - * */ - public void setSkipSourceMmDirs(boolean isMm) { - this.isSkipMmDirs = isMm; - } - - public boolean doSkipSourceMmDirs() { - return isSkipMmDirs ; - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 72ce79836c..d91569ec6d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,39 +17,65 @@ */ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; - @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { - private Logger LOG = LoggerFactory.getLogger(ExportWork.class); + private static Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; + public final static class MmContext { + private final String fqTableName; + + private MmContext(String fqTableName) { + this.fqTableName = fqTableName; + } + + @Override + public String toString() { + return "[" + fqTableName + "]"; + } + + public static MmContext createIfNeeded(Table t) { + if (t == null) return null; + if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null; + return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName())); + } + + public String getFqTableName() { + return fqTableName; + } + } + private final String exportRootDirName; private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; - private String qualifiedTableName; + private String acidFqTableName; + private final MmContext mmContext; /** - * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg, String qualifiedTable) { + String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.qualifiedTableName = qualifiedTable; + this.mmContext = mmContext; + this.acidFqTableName = acidFqTableName; } public String getExportRootDir() { @@ -60,24 +86,16 @@ public TableSpec getTableSpec() { return tableSpec; } - public void setTableSpec(TableSpec tableSpec) { - this.tableSpec = tableSpec; - } - public ReplicationSpec getReplicationSpec() { return replicationSpec; } - public void setReplicationSpec(ReplicationSpec replicationSpec) { - this.replicationSpec = replicationSpec; - } - public String getAstRepresentationForErrorMsg() { return astRepresentationForErrorMsg; } - public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { - this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + public MmContext getMmContext() { + return mmContext; } /** @@ -88,10 +106,10 @@ public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) * for more info. */ public void acidPostProcess(Hive db) throws HiveException { - if(qualifiedTableName != null) { - LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + if (acidFqTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName + " using partSpec=" + tableSpec.partSpec); - tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index b698c84080..982b180761 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -337,9 +337,11 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, } int deltaCount = dir.getCurrentDirectories().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) { + int origCount = dir.getOriginalFiles().size(); + if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas"); + + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " + + origCount + " originals"); return; } try { @@ -355,7 +357,8 @@ private void runMmCompaction(HiveConf conf, Table t, Partition p, // Note: we could skip creating the table and just add table type stuff directly to the // "insert overwrite directory" command if there were no bucketing or list bucketing. - String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName; + String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_"; + String tmpTableName = null; while (true) { tmpTableName = tmpPrefix + System.currentTimeMillis(); String query = buildMmCompactionCtQuery(tmpTableName, t, diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 6faba42e23..a4d34a7513 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,9 +17,24 @@ */ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; @@ -47,13 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; - /** * The LockManager is not ready, but for no-concurrency straight-line path we can * test AC=true, and AC=false with commit/rollback/exception and test resulting data. @@ -152,6 +160,106 @@ public void testSimpleAcidInsert() throws Exception { Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); } + @Test + public void testMmExim() throws Exception { + String tableName = "mm_table", importName = tableName + "_import"; + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + + // Regular insert: export some MM deltas, then import into a new table. + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(hiveConf); + Path exportPath = new Path(table.getSd().getLocation() + "_export"); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + List paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 2); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName); + Assert.assertEquals(imported.toString(), "insert_only", + imported.getParameters().get("transactional_properties")); + Path importPath = new Path(imported.getSd().getLocation()); + FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + Assert.assertEquals(Arrays.toString(stat), 1, stat.length); + assertIsDelta(stat[0]); + List allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows1)); + allData.sort(null); + Collections.sort(allData); + List rs = runStatementOnDriver( + String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + + // Do insert overwrite to create some invalid deltas, and import into a non-MM table. + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver(String.format("insert overwrite table %s %s", + tableName, makeValuesClause(rows2))); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 1); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='false')", importName)); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + imported = msClient.getTable("default", importName); + Assert.assertNull(imported.toString(), imported.getParameters().get("transactional")); + Assert.assertNull(imported.toString(), + imported.getParameters().get("transactional_properties")); + importPath = new Path(imported.getSd().getLocation()); + stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + allData = stringifyValues(rows2); + Collections.sort(allData); + rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + runStatementOnDriver("drop table if exists " + tableName); + msClient.close(); + } + + private void assertIsDelta(FileStatus stat) { + Assert.assertTrue(stat.toString(), + stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); + } + + private void verifyMmExportPaths(List paths, int deltasOrBases) { + // 1 file, 1 dir for each, for now. Plus export "data" dir. + // This could be changed to a flat file list later. + Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size()); + // No confusing directories in export. + for (String path : paths) { + Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX)); + Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX)); + } + } + + private List listPathsRecursive(FileSystem fs, Path path) throws IOException { + List paths = new ArrayList<>(); + LinkedList queue = new LinkedList<>(); + queue.add(path); + while (!queue.isEmpty()) { + Path next = queue.pollFirst(); + FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter); + for (FileStatus stat : stats) { + Path child = stat.getPath(); + paths.add(child.toString()); + if (stat.isDirectory()) { + queue.add(child); + } + } + } + return paths; + } + + /** * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) * @throws Exception diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 13d1a5d9c1..861d9dbb2e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -477,6 +477,7 @@ public void testMMCreateFlatSource() throws Exception { } private void testMM(boolean existingTable, boolean isSourceMM) throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1,2}, {3, 4}, {5, 6}}; runStatementOnDriver("drop table if exists T"); @@ -500,9 +501,10 @@ private void testMM(boolean existingTable, boolean isSourceMM) throws Exception //verify that we are indeed doing an Acid write (import) rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); Assert.assertEquals(3, rs.size()); - Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0")); + for (String s : rs) { + Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/")); + Assert.assertTrue(s, s.endsWith("/000000_0")); + } } private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ @@ -516,6 +518,7 @@ private void checkResult(String[][] expectedResult, String query, boolean isVect @Test public void testMMExportAborted() throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1, 2}, {3, 4}, {5, 6}}; int[][] dataAbort = {{10, 2}}; runStatementOnDriver("drop table if exists T"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 25a63d879a..7319ba0e4b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -35,6 +35,7 @@ import org.junit.Rule; import org.junit.rules.TestName; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; @@ -43,6 +44,7 @@ import java.util.Set; public abstract class TxnCommandsBaseForTests { + private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class); //bucket count for test tables; set it to 1 for easier debugging final static int BUCKET_COUNT = 2; @Rule @@ -95,6 +97,7 @@ void setUpInternal() throws Exception { "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); + hiveConf.setBoolean("mapred.input.dir.recursive", true); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.prepDb(hiveConf); File f = new File(getWarehouseDir()); @@ -159,6 +162,7 @@ void runCleaner(HiveConf hiveConf) throws MetaException { } List runStatementOnDriver(String stmt) throws Exception { + LOG.info("Running the query: " + stmt); CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); @@ -217,7 +221,8 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath) throws Ex void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); logResult(LOG, rs); - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs, + expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); diff --git ql/src/test/queries/clientpositive/mm_bhif.q ql/src/test/queries/clientpositive/mm_bhif.q new file mode 100644 index 0000000000..f9c7f8ab84 --- /dev/null +++ ql/src/test/queries/clientpositive/mm_bhif.q @@ -0,0 +1,27 @@ +SET hive.vectorized.execution.enabled=true; +SET hive.vectorized.execution.reduce.enabled=true; +set hive.mapred.mode=nonstrict; +set hive.exec.reducers.max = 10; +set hive.map.groupby.sorted=true; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +-- SORT_QUERY_RESULTS + +CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1'); + +INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1'; + + +set hive.fetch.task.conversion=none; + +select * from T1_mm; + +explain +select count(distinct key) from T1_mm; +select count(distinct key) from T1_mm; + +DROP TABLE T1_mm; diff --git ql/src/test/queries/clientpositive/mm_conversions.q ql/src/test/queries/clientpositive/mm_conversions.q index 14d16abbff..89beedef08 100644 --- ql/src/test/queries/clientpositive/mm_conversions.q +++ ql/src/test/queries/clientpositive/mm_conversions.q @@ -14,21 +14,42 @@ insert into table intermediate partition(p='455') select distinct key from src w insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1; insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1; +set hive.mm.allow.originals=true; +set hive.exim.test.mode=true; drop table simple_to_mm; -create table simple_to_mm(key int) stored as orc; +create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false"); insert into table simple_to_mm select key from intermediate; select * from simple_to_mm s1 order by key; alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +export table simple_to_mm to 'ql/test/data/exports/export0'; select * from simple_to_mm s2 order by key; +create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted0_mm from 'ql/test/data/exports/export0'; +select * from import_converted0_mm order by key; +drop table import_converted0_mm; + insert into table simple_to_mm select key from intermediate; insert into table simple_to_mm select key from intermediate; +export table simple_to_mm to 'ql/test/data/exports/export1'; select * from simple_to_mm s3 order by key; +create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted1_mm from 'ql/test/data/exports/export1'; +select * from import_converted1_mm order by key; +drop table import_converted1_mm; + +insert overwrite table simple_to_mm select key from intermediate; +export table simple_to_mm to 'ql/test/data/exports/export2'; +select * from simple_to_mm s4 order by key; +create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false"); +import table import_converted2_mm from 'ql/test/data/exports/export2'; +select * from import_converted2_mm order by key; +drop table import_converted2_mm; drop table simple_to_mm; drop table part_to_mm; -create table part_to_mm(key int) partitioned by (key_mm int) stored as orc; +create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false"); insert into table part_to_mm partition(key_mm='455') select key from intermediate; insert into table part_to_mm partition(key_mm='456') select key from intermediate; select * from part_to_mm s1 order by key, key_mm; @@ -39,4 +60,30 @@ insert into table part_to_mm partition(key_mm='457') select key from intermediat select * from part_to_mm s3 order by key, key_mm; drop table part_to_mm; + +drop table load_to_mm; +create table load_to_mm (key string, value string) tblproperties("transactional"="false"); +load data local inpath '../../data/files/kv1.txt' into table load_to_mm; +load data local inpath '../../data/files/kv1.txt' into table load_to_mm; +select count(*) from load_to_mm s1; +alter table load_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select count(*) from load_to_mm s2; +drop table load_to_mm; + + +set hive.mm.allow.originals=false; + +drop table simple_to_mm_text; +create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false"); +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t1 order by key; +alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from simple_to_mm_text t2 order by key; +insert into table simple_to_mm_text select key from intermediate; +insert into table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t3 order by key; +insert overwrite table simple_to_mm_text select key from intermediate; +select * from simple_to_mm_text t4 order by key; +drop table simple_to_mm_text; + drop table intermediate; diff --git ql/src/test/queries/clientpositive/mm_exim.q ql/src/test/queries/clientpositive/mm_exim.q index b0c030df68..debcc249b8 100644 --- ql/src/test/queries/clientpositive/mm_exim.q +++ ql/src/test/queries/clientpositive/mm_exim.q @@ -57,13 +57,13 @@ drop table import1_mm; drop table import2_mm; import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; -desc import2_mm; +desc formatted import2_mm; select * from import2_mm order by key, p; drop table import2_mm; drop table import3_mm; import table import3_mm from 'ql/test/data/exports/intermmediate_part'; -desc import3_mm; +desc formatted import3_mm; select * from import3_mm order by key, p; drop table import3_mm; diff --git ql/src/test/results/clientpositive/llap/mm_conversions.q.out ql/src/test/results/clientpositive/llap/mm_conversions.q.out index 4754710291..618aa3ce29 100644 --- ql/src/test/results/clientpositive/llap/mm_conversions.q.out +++ ql/src/test/results/clientpositive/llap/mm_conversions.q.out @@ -41,11 +41,11 @@ PREHOOK: query: drop table simple_to_mm PREHOOK: type: DROPTABLE POSTHOOK: query: drop table simple_to_mm POSTHOOK: type: DROPTABLE -PREHOOK: query: create table simple_to_mm(key int) stored as orc +PREHOOK: query: create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@simple_to_mm -POSTHOOK: query: create table simple_to_mm(key int) stored as orc +POSTHOOK: query: create table simple_to_mm(key int) stored as orc tblproperties("transactional"="false") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@simple_to_mm @@ -83,6 +83,14 @@ POSTHOOK: query: alter table simple_to_mm set tblproperties("transactional"="tru POSTHOOK: type: ALTERTABLE_PROPERTIES POSTHOOK: Input: default@simple_to_mm POSTHOOK: Output: default@simple_to_mm +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export0' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### PREHOOK: query: select * from simple_to_mm s2 order by key PREHOOK: type: QUERY PREHOOK: Input: default@simple_to_mm @@ -94,6 +102,41 @@ POSTHOOK: Input: default@simple_to_mm 0 98 100 +PREHOOK: query: create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: create table import_converted0_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted0_mm +PREHOOK: query: import table import_converted0_mm from 'ql/test/data/exports/export0' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: import table import_converted0_mm from 'ql/test/data/exports/export0' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted0_mm +PREHOOK: query: select * from import_converted0_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted0_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted0_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted0_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table import_converted0_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted0_mm +PREHOOK: Output: default@import_converted0_mm +POSTHOOK: query: drop table import_converted0_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted0_mm +POSTHOOK: Output: default@import_converted0_mm PREHOOK: query: insert into table simple_to_mm select key from intermediate PREHOOK: type: QUERY PREHOOK: Input: default@intermediate @@ -124,6 +167,14 @@ POSTHOOK: Input: default@intermediate@p=456 POSTHOOK: Input: default@intermediate@p=457 POSTHOOK: Output: default@simple_to_mm POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export1' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### PREHOOK: query: select * from simple_to_mm s3 order by key PREHOOK: type: QUERY PREHOOK: Input: default@simple_to_mm @@ -141,6 +192,116 @@ POSTHOOK: Input: default@simple_to_mm 100 100 100 +PREHOOK: query: create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: create table import_converted1_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: import table import_converted1_mm from 'ql/test/data/exports/export1' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: import table import_converted1_mm from 'ql/test/data/exports/export1' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: select * from import_converted1_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted1_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted1_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted1_mm +#### A masked pattern was here #### +0 +0 +0 +98 +98 +98 +100 +100 +100 +PREHOOK: query: drop table import_converted1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted1_mm +PREHOOK: Output: default@import_converted1_mm +POSTHOOK: query: drop table import_converted1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted1_mm +POSTHOOK: Output: default@import_converted1_mm +PREHOOK: query: insert overwrite table simple_to_mm select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm +POSTHOOK: query: insert overwrite table simple_to_mm select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm +POSTHOOK: Lineage: simple_to_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2' +PREHOOK: type: EXPORT +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: export table simple_to_mm to 'ql/test/data/exports/export2' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +PREHOOK: query: select * from simple_to_mm s4 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm s4 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: create table import_converted2_mm(key int) stored as orc tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import_converted2_mm +PREHOOK: query: import table import_converted2_mm from 'ql/test/data/exports/export2' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: import table import_converted2_mm from 'ql/test/data/exports/export2' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import_converted2_mm +PREHOOK: query: select * from import_converted2_mm order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@import_converted2_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import_converted2_mm order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import_converted2_mm +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table import_converted2_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import_converted2_mm +PREHOOK: Output: default@import_converted2_mm +POSTHOOK: query: drop table import_converted2_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import_converted2_mm +POSTHOOK: Output: default@import_converted2_mm PREHOOK: query: drop table simple_to_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@simple_to_mm @@ -153,11 +314,11 @@ PREHOOK: query: drop table part_to_mm PREHOOK: type: DROPTABLE POSTHOOK: query: drop table part_to_mm POSTHOOK: type: DROPTABLE -PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc +PREHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@part_to_mm -POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc +POSTHOOK: query: create table part_to_mm(key int) partitioned by (key_mm int) stored as orc tblproperties("transactional"="false") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@part_to_mm @@ -299,6 +460,206 @@ POSTHOOK: query: drop table part_to_mm POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@part_to_mm POSTHOOK: Output: default@part_to_mm +PREHOOK: query: drop table load_to_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table load_to_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table load_to_mm (key string, value string) tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@load_to_mm +POSTHOOK: query: create table load_to_mm (key string, value string) tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@load_to_mm +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' into table load_to_mm +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@load_to_mm +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' into table load_to_mm +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@load_to_mm +PREHOOK: query: load data local inpath '../../data/files/kv1.txt' into table load_to_mm +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@load_to_mm +POSTHOOK: query: load data local inpath '../../data/files/kv1.txt' into table load_to_mm +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@load_to_mm +PREHOOK: query: select count(*) from load_to_mm s1 +PREHOOK: type: QUERY +PREHOOK: Input: default@load_to_mm +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from load_to_mm s1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_to_mm +#### A masked pattern was here #### +1000 +PREHOOK: query: alter table load_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@load_to_mm +PREHOOK: Output: default@load_to_mm +POSTHOOK: query: alter table load_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@load_to_mm +POSTHOOK: Output: default@load_to_mm +PREHOOK: query: select count(*) from load_to_mm s2 +PREHOOK: type: QUERY +PREHOOK: Input: default@load_to_mm +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from load_to_mm s2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@load_to_mm +#### A masked pattern was here #### +1000 +PREHOOK: query: drop table load_to_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@load_to_mm +PREHOOK: Output: default@load_to_mm +POSTHOOK: query: drop table load_to_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@load_to_mm +POSTHOOK: Output: default@load_to_mm +PREHOOK: query: drop table simple_to_mm_text +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table simple_to_mm_text +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: create table simple_to_mm_text(key int) stored as textfile tblproperties("transactional"="false") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@simple_to_mm_text +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t1 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t1 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@simple_to_mm_text +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: alter table simple_to_mm_text set tblproperties("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@simple_to_mm_text +POSTHOOK: Output: default@simple_to_mm_text +PREHOOK: query: select * from simple_to_mm_text t2 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t2 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: insert into table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert into table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t3 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t3 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +0 +0 +98 +98 +98 +100 +100 +100 +PREHOOK: query: insert overwrite table simple_to_mm_text select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: insert overwrite table simple_to_mm_text select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_to_mm_text +POSTHOOK: Lineage: simple_to_mm_text.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: select * from simple_to_mm_text t4 order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +POSTHOOK: query: select * from simple_to_mm_text t4 order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@simple_to_mm_text +#### A masked pattern was here #### +0 +98 +100 +PREHOOK: query: drop table simple_to_mm_text +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@simple_to_mm_text +PREHOOK: Output: default@simple_to_mm_text +POSTHOOK: query: drop table simple_to_mm_text +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@simple_to_mm_text +POSTHOOK: Output: default@simple_to_mm_text PREHOOK: query: drop table intermediate PREHOOK: type: DROPTABLE PREHOOK: Input: default@intermediate diff --git ql/src/test/results/clientpositive/llap/mm_exim.q.out ql/src/test/results/clientpositive/llap/mm_exim.q.out index 8a43d0f85d..8dfc738fb7 100644 --- ql/src/test/results/clientpositive/llap/mm_exim.q.out +++ ql/src/test/results/clientpositive/llap/mm_exim.q.out @@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@import2_mm -PREHOOK: query: desc import2_mm +PREHOOK: query: desc formatted import2_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import2_mm -POSTHOOK: query: desc import2_mm +POSTHOOK: query: desc formatted import2_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import2_mm +# col_name data_type comment key int p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + bucketing_version 2 + numFiles 3 + numRows 6 + rawDataSize 37 + totalSize 43 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import2_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import2_mm @@ -338,18 +366,46 @@ POSTHOOK: Output: default@import3_mm POSTHOOK: Output: default@import3_mm@p=455 POSTHOOK: Output: default@import3_mm@p=456 POSTHOOK: Output: default@import3_mm@p=457 -PREHOOK: query: desc import3_mm +PREHOOK: query: desc formatted import3_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import3_mm -POSTHOOK: query: desc import3_mm +POSTHOOK: query: desc formatted import3_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import3_mm +# col_name data_type comment key int -p int # Partition Information # col_name data_type comment p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + bucketing_version 2 + numFiles 3 + numPartitions 3 + numRows 6 + rawDataSize 13 + totalSize 19 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import3_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import3_mm diff --git ql/src/test/results/clientpositive/mm_bhif.q.out ql/src/test/results/clientpositive/mm_bhif.q.out new file mode 100644 index 0000000000..4774007660 --- /dev/null +++ ql/src/test/results/clientpositive/mm_bhif.q.out @@ -0,0 +1,146 @@ +PREHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T1_mm +POSTHOOK: query: CREATE TABLE T1_mm(key STRING, val STRING) PARTITIONED BY (ds string) +CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T1_mm +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1_mm +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/bucket_files/000000_0' INTO TABLE T1_mm PARTITION (ds='1') +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1_mm +POSTHOOK: Output: default@t1_mm@ds=1 +PREHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +PREHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: query: INSERT OVERWRITE TABLE T1_mm PARTITION (ds='1') select key, val from T1_mm where ds = '1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +POSTHOOK: Output: default@t1_mm@ds=1 +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).key SIMPLE [(t1_mm)t1_mm.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: t1_mm PARTITION(ds=1).val SIMPLE [(t1_mm)t1_mm.FieldSchema(name:val, type:string, comment:null), ] +PREHOOK: query: select * from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select * from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +1 11 1 +2 12 1 +3 13 1 +7 17 1 +8 18 1 +8 28 1 +PREHOOK: query: explain +select count(distinct key) from T1_mm +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t1_mm + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: string) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(_col0) + mode: partial2 + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(distinct key) from T1_mm +PREHOOK: type: QUERY +PREHOOK: Input: default@t1_mm +PREHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +POSTHOOK: query: select count(distinct key) from T1_mm +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1_mm +POSTHOOK: Input: default@t1_mm@ds=1 +#### A masked pattern was here #### +5 +PREHOOK: query: DROP TABLE T1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@t1_mm +PREHOOK: Output: default@t1_mm +POSTHOOK: query: DROP TABLE T1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@t1_mm +POSTHOOK: Output: default@t1_mm diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 42d02489b4..c3d99c3926 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -132,24 +132,26 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) { if(!isTransactionalPropertiesPresent) { - normazlieTransactionalPropertyDefault(newTable); + normalizeTransactionalPropertyDefault(newTable); isTransactionalPropertiesPresent = true; transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY; } - //only need to check conformance if alter table enabled acid - if (!conformToAcid(newTable)) { - // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing - if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) { - throw new MetaException("The table must be stored using an ACID compliant format (such as ORC): " - + Warehouse.getQualifiedName(newTable)); - } + // We only need to check conformance if alter table enabled acid. + // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing. + boolean isFullAcid = transactionalPropertiesValue == null + || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue); + if (isFullAcid && !conformToAcid(newTable)) { + throw new MetaException("The table must be stored using an ACID compliant " + + "format (such as ORC): " + Warehouse.getQualifiedName(newTable)); } if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { throw new MetaException(Warehouse.getQualifiedName(newTable) + " cannot be declared transactional because it's an external table"); } - validateTableStructure(context.getHandler(), newTable); + if (isFullAcid) { + validateTableStructure(context.getHandler(), newTable); + } hasValidTransactionalValue = true; } @@ -189,6 +191,7 @@ private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throw } checkSorted(newTable); } + private void checkSorted(Table newTable) throws MetaException { if(!TxnUtils.isAcidTable(newTable)) { return; @@ -309,7 +312,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr // normalize prop name parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString()); if(transactionalProperties == null) { - normazlieTransactionalPropertyDefault(newTable); + normalizeTransactionalPropertyDefault(newTable); } initializeTransactionalProperties(newTable); checkSorted(newTable); @@ -325,7 +328,7 @@ private void handleCreateTableTransactionalProp(PreCreateTableEvent context) thr * transactional_properties should take on the default value. Easier to make this explicit in * table definition than keep checking everywhere if it's set or not. */ - private void normazlieTransactionalPropertyDefault(Table table) { + private void normalizeTransactionalPropertyDefault(Table table) { table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, DEFAULT_TRANSACTIONAL_PROPERTY);