diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index f4ebd3bd7a..8b1bbaa2d5 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -39,6 +39,7 @@ */ public final class JavaUtils { + public static final String BASE_PREFIX = "base"; public static final String DELTA_PREFIX = "delta"; public static final String DELTA_DIGITS = "%07d"; public static final int DELTA_DIGITS_LEN = 7; @@ -167,8 +168,8 @@ private JavaUtils() { public static Long extractTxnId(Path file) { String fileName = file.getName(); - String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 - if (parts.length < 4 || !DELTA_PREFIX.equals(parts[0])) { + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 + if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { LOG.debug("Cannot extract transaction ID for a MM table: " + file + " (" + Arrays.toString(parts) + ")"); return null; @@ -185,20 +186,31 @@ public static Long extractTxnId(Path file) { } public static class IdPathFilter implements PathFilter { - private final String mmDirName; + private String mmDirName; private final boolean isMatch, isIgnoreTemp, isPrefix; + public IdPathFilter(long writeId, int stmtId, boolean isMatch) { - this(writeId, stmtId, isMatch, false); + this(writeId, stmtId, isMatch, false, false); } public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp) { - String mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId) + "_"; - if (stmtId >= 0) { - mmDirName += String.format(STATEMENT_DIGITS, stmtId); - isPrefix = false; + this(writeId, stmtId, isMatch, isIgnoreTemp, false); + } + public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp, boolean isBaseDir) { + String mmDirName = null; + if (!isBaseDir) { + mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId) + "_"; + if (stmtId >= 0) { + mmDirName += String.format(STATEMENT_DIGITS, stmtId); + isPrefix = false; + } else { + isPrefix = true; + } } else { - isPrefix = true; + mmDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId); + isPrefix = false; } + this.mmDirName = mmDirName; this.isMatch = isMatch; this.isIgnoreTemp = isIgnoreTemp; diff --git data/conf/hive-log4j2.properties data/conf/hive-log4j2.properties index e5bb166a35..dbc76da40f 100644 --- data/conf/hive-log4j2.properties +++ data/conf/hive-log4j2.properties @@ -19,7 +19,7 @@ name = HiveLog4j2Test packages = org.apache.hadoop.hive.ql.log # list of properties -property.hive.log.level = DEBUG +property.hive.log.level = ALL property.hive.root.logger = DRFA property.hive.log.dir = ${sys:test.tmp.dir}/log property.hive.log.file = hive.log diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index b163a1e265..4f6429b72b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -260,7 +260,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), - tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null); + tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -340,7 +340,7 @@ public void jobCloseOp(Configuration hconf, boolean success) // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false); + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false); // TODO: MMIOW check ppath } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index b4989f1509..219d1adf25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -337,7 +337,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = AcidUtils.deltaSubdir(txnId, txnId, stmtId); + String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId); if (unionPath != null) { // Create the union directory inside the MM directory. subdirPath += Path.SEPARATOR + unionPath; @@ -1324,7 +1324,7 @@ public void closeOp(boolean abort) throws HiveException { } if (conf.isMmTable()) { Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath); + commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1383,7 +1383,7 @@ public void jobCloseOp(Configuration hconf, boolean success) conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter, - conf.isMmTable(), conf.isMmCtas()); + conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 65b2f87357..8732348a1b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -234,7 +234,7 @@ private void mvFileToFinalPath(Path specPath, Configuration hconf, Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")"); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, false); // Step3: move to the file destination Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 29c33f98c0..7ef979da1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1502,7 +1502,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files List emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, statuses, dpCtx, conf, hconf, filesKept); + fs, statuses, dpCtx, conf, hconf, filesKept, false); perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { @@ -1593,23 +1593,23 @@ private static void addFilesToPathSet(Collection files, Set fi /** * Remove all temporary files and duplicate (double-committed) files from a given directory. */ - public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException { - removeTempOrDuplicateFiles(fs, path, null,null,null); + public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean isBaseDir) throws IOException { + removeTempOrDuplicateFiles(fs, path, null,null,null, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { if (path == null) { return null; } FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); - return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { - return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null); + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { + return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null, isBaseDir); } /** @@ -1618,12 +1618,12 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * @return a list of path names corresponding to should-be-created empty buckets. */ public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept) + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept, boolean isBaseDir) throws IOException { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles( - fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept); + fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1642,7 +1642,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long txnId, - int stmtId, boolean isMmTable, Set filesKept) throws IOException { + int stmtId, boolean isMmTable, Set filesKept, boolean isBaseDir) throws IOException { if (fileStats == null) { return null; } @@ -1661,7 +1661,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (isMmTable) { Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); } @@ -1685,7 +1685,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( fs.listStatus(new Path(mmDir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { @@ -1703,7 +1703,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1715,12 +1715,12 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return result; } - private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items) throws IOException { + private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { if (items.length > 1) { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); @@ -4057,7 +4057,7 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf) + int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { @@ -4072,7 +4072,7 @@ private static void tryDelete(FileSystem fs, Path path) { || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId); + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4149,7 +4149,7 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, - Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException { + Path path, int skipLevels, PathFilter filter, long txnId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append('*'); @@ -4159,7 +4159,7 @@ private static boolean isS3(FileSystem fs) { // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId)).append("_*"); throw new AssertionError("GlobStatus should not be called without a statement ID"); } else { - sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId)); } Path pathPattern = new Path(path, sb.toString()); return statusToPath(fs.globStatus(pathPattern, filter)); @@ -4167,9 +4167,9 @@ private static boolean isS3(FileSystem fs) { private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, - long txnId, int stmtId, Configuration conf) throws IOException { + long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf, isBaseDir); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4182,12 +4182,12 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, - String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException { + String taskId, Long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (commitPaths.isEmpty()) { return; } // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4206,8 +4206,10 @@ public static void writeMmCommitManifest(List commitPaths, Path specPath, } } - private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) { - Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { + Path manifestPath = new Path(specPath, "_tmp." + + AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, txnId, txnId, stmtId)); + return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); } @@ -4224,13 +4226,13 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas) throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - filter, txnId, stmtId, hconf); + filter, txnId, stmtId, hconf, isInsertOverwrite); return; } @@ -4253,13 +4255,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); + JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false, isInsertOverwrite); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf, isInsertOverwrite); ArrayList mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4320,7 +4322,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId, - isMmTable, null); + isMmTable, null, isInsertOverwrite); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { assert mbc != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c0b71f65f..6d0008200d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -204,6 +204,19 @@ public static String deleteDeltaSubdir(long min, long max, int statementId) { public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } + + /** + * Return a base or delta directory string + * according to the given "baseDirRequired". + */ + public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long max, int statementId) { + if (!baseDirRequired) { + return deltaSubdir(min, max, statementId); + } else { + return baseDir(min); + } + } + /** * Create a filename for a bucket file. * @param directory the partition directory @@ -1211,7 +1224,19 @@ public static boolean isAcidTable(CreateTableDesc table) { public static boolean isFullAcidTable(Table table) { return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters()); } - + + public static boolean isFullAcidTable(CreateTableDesc td) { + if (td == null || td.getTblProps() == null) { + return false; + } + String tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true") && + !AcidUtils.isInsertOnlyTable(td.getTblProps()); + } + /** * Sets the acidOperationalProperties in the configuration object argument. * @param conf Mutable configuration object diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6a1dc729f3..31e605808f 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -549,7 +549,14 @@ private static void processForWriteIds(Path dir, JobConf conf, } else if (!hadAcidState) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null); hadAcidState = true; - // TODO [MM gap]: for IOW, we also need to count in base dir, if any + + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + finalPaths.add(base); + } + + // Find the parsed delta files. for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); finalPaths.add(delta.getPath()); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3e9fff195f..fdd4555123 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1774,12 +1774,6 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace"); } - if ((loadFileType == LoadFileType.REPLACE_ALL) && oldPartPath != null) { - boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge, - new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true, - tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0); - } } else { // Either a non-MM query, or a load into MM table from an external source. PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER; @@ -1796,7 +1790,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) { boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), - isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite); + isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite?true:false); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, @@ -2083,7 +2077,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // where this is used; we always want to load everything; also the only case where // we have multiple statements anyway is union. Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, txnId, -1, conf); + fs, loadPath, numDP, numLB, null, txnId, -1, conf, false); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { @@ -2286,13 +2280,6 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType // Note: this assumes both paths are qualified; which they are, currently. if (isMmTable && loadPath.equals(tbl.getPath())) { Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath()); - if (loadFileType == LoadFileType.REPLACE_ALL) { - Path tableDest = tbl.getPath(); - boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge, - new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true, - tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0); - } newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId); } else { // Either a non-MM query, or a load into MM table from an external source. @@ -2309,7 +2296,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (loadFileType == LoadFileType.REPLACE_ALL) { boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tblPath, loadPath, destPath, tblPath, - sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable); + sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable?true:false); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf); @@ -3835,7 +3822,7 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, boolean isSrcLocal, boolean purge, List newFiles, PathFilter deletePathFilter, - boolean isMmTable) throws HiveException { + boolean isMmTableOverwrite) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -3856,7 +3843,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, if (oldPath != null) { // Note: we assume lbLevels is 0 here. Same as old code for non-MM. // For MM tables, this can only be a LOAD command. Does LOAD even support LB? - deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0); + deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTableOverwrite, 0); } // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates @@ -3902,7 +3889,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, } private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge, - PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException { + PathFilter pathFilter, boolean isMmTableOverwrite, int lbLevels) throws HiveException { Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath + " and old path " + oldPath); boolean isOldPathUnderDestf = false; @@ -3914,32 +3901,13 @@ private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is // not the destf or its subdir? isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false); - if (isOldPathUnderDestf || isMmTable) { - if (lbLevels == 0 || !isMmTable) { + if (isOldPathUnderDestf || isMmTableOverwrite) { + if (lbLevels == 0 || !isMmTableOverwrite) { cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge); - } else { - // We need to clean up different MM IDs from each LB directory separately. - // Avoid temporary directories in the immediate table/part dir. - // Note: we could just find directories with any MM directories inside? - // the rest doesn't have to be cleaned up. Play it safe. - String mask = "[^._]*"; - for (int i = 0; i < lbLevels - 1; ++i) { - mask += Path.SEPARATOR + "*"; - } - Path glob = new Path(oldPath, mask); - FileStatus[] lbDirs = oldFs.globStatus(glob); - for (FileStatus lbDir : lbDirs) { - Path lbPath = lbDir.getPath(); - if (!lbDir.isDirectory()) { - throw new HiveException("Unexpected path during overwrite: " + lbPath); - } - Utilities.FILE_OP_LOGGER.info("Cleaning up LB directory " + lbPath); - cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge); - } } } } catch (IOException e) { - if (isOldPathUnderDestf || isMmTable) { + if (isOldPathUnderDestf || isMmTableOverwrite) { // if oldPath is a subdir of destf but it could not be cleaned throw new HiveException("Directory " + oldPath.toString() + " could not be cleaned up.", e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java index ab71073560..f0083f2066 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java @@ -64,6 +64,7 @@ private final Set destCubes; private final Set destGroupingSets; private final Map destToHaving; + private final Map destToOpType; // insertIntoTables/insertOverwriteTables map a table's fullName to its ast; private final Map insertIntoTables; private final Map insertOverwriteTables; @@ -135,6 +136,7 @@ public QBParseInfo(String alias, boolean isSubQ) { destToSortby = new HashMap(); destToOrderby = new HashMap(); destToLimit = new HashMap>(); + destToOpType = new HashMap<>(); insertIntoTables = new HashMap(); insertOverwriteTables = new HashMap(); destRollups = new HashSet(); @@ -155,7 +157,7 @@ public QBParseInfo(String alias, boolean isSubQ) { } - /* +/* * If a QB is such that the aggregation expressions need to be handled by * the Windowing PTF; we invoke this function to clear the AggExprs on the dest. */ @@ -180,6 +182,18 @@ public void addAggregationExprsForClause(String clause, public void addInsertIntoTable(String fullName, ASTNode ast) { insertIntoTables.put(fullName.toLowerCase(), ast); } + + public void setDestToOpType(String clause, boolean value) { + destToOpType.put(clause, value); + } + + public boolean isDestToOpTypeInsertOverwrite(String clause) { + if (destToOpType.containsKey(clause)) { + return destToOpType.get(clause); + } else { + return false; + } + } /** * See also {@link #getInsertOverwriteTables()} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b323edeb74..5dd3583215 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1506,6 +1506,7 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1, PlannerContext plan String fullTableName = getUnescapedName((ASTNode) ast.getChild(0).getChild(0), SessionState.get().getCurrentDatabase()); qbp.getInsertOverwriteTables().put(fullTableName.toLowerCase(), ast); + qbp.setDestToOpType(ctx_1.dest, true); } } } @@ -6781,7 +6782,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Integer dest_type = qbm.getDestTypeForAlias(dest); Table dest_tab = null; // destination table if any - boolean destTableIsAcid = false; // should the destination table be written to using ACID + boolean destTableIsAcid = false; // true for full ACID table and MM table + boolean destTableIsFullAcid = false; // should the destination table be written to using ACID boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition dest_part = null;// destination partition if any @@ -6802,7 +6804,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6852,7 +6855,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } @@ -6895,7 +6898,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); checkExternalTable(dest_tab); @@ -6928,7 +6932,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(), dest_part.isStoredAsSubDirectories(), conf); AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } @@ -6945,7 +6949,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsAcid) + dest_tab.getTableName()) && !destTableIsAcid) // // Both Full-acid and MM tables are excluded. ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); ltd.setLbCtx(lbCtx); @@ -7019,6 +7023,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); + destTableIsFullAcid = tblDesc != null && AcidUtils.isFullAcidTable(tblDesc); boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { @@ -7109,7 +7114,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0))); // If this table is working with ACID semantics, turn off merging - canBeMerged &= !destTableIsAcid; + canBeMerged &= !destTableIsFullAcid; // Generate the partition columns from the parent input if (dest_type.intValue() == QBMetaData.DEST_TABLE @@ -7120,7 +7125,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, dest_tab, txnId, isMmCtas, dest_type); + canBeMerged, dest_tab, txnId, isMmCtas, dest_type, qb); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7231,7 +7236,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, - Integer dest_type) throws SemanticException { + Integer dest_type, QB qb) throws SemanticException { boolean isInsertOverwrite = false; switch (dest_type) { case QBMetaData.DEST_PARTITION: @@ -7240,7 +7245,8 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, //INSERT [OVERWRITE] path String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { + if (iowMap.containsKey(destTableFullName) && + qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest)) { isInsertOverwrite = true; } break; diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java new file mode 100644 index 0000000000..4eaf387fd0 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService; +import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; +import org.apache.hadoop.hive.ql.txn.compactor.Initiator; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests here are for micro-managed tables: + * specifically INSERT OVERWRITE statements and Major/Minor Compactions. + */ +public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommandsForMmTable.class); + protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + enum TableExtended { + NONACIDPART("nonAcidPart", "p"), + MMTBL("mmTbl"), + MMTBL2("mmTbl2"), + MMTBLPART("mmTblPart","p"); + + final String name; + final String partitionColumns; + @Override + public String toString() { + return name; + } + String getPartitionColumns() { + return partitionColumns; + } + TableExtended(String name) { + this(name, null); + } + TableExtended(String name, String partitionColumns) { + this.name = name; + this.partitionColumns = partitionColumns; + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUpInternal(); + setUpInternalExtended(false); + } + + void setUpInternalExtended(boolean isOrcFormat) throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING, true); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true"); + hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + + runStatementOnDriver("create table " + TableExtended.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); + if (!isOrcFormat) { + runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + } else { + runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + } + } + protected void dropTables() throws Exception { + super.dropTables(); + for(TestTxnCommandsForMmTable.TableExtended t : TestTxnCommandsForMmTable.TableExtended.values()) { + runStatementOnDriver("drop table if exists " + t); + } + } + /** + * Test compaction for Micro-managed table + * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables + * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any + * @throws Exception + */ + @Test + public void testMmTableCompaction() throws Exception { + // 1. Insert some rows into MM table + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. + runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + verifyDirAndResult(2); + + // 3. Let a transaction be aborted + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(5,6)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + // There should be 3 delta directories. The new one is the aborted one. + verifyDirAndResult(3); + + // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. + runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + // The worker should remove the subdir for aborted transaction + verifyDirAndResult(2); + + // 5. Run Cleaner. Shouldn't impact anything. + runCleaner(hiveConf); + verifyDirAndResult(2); + } + + /** + * Test a scenario, on a micro-managed table, where an IOW comes in + * after a MAJOR compaction, and then a MINOR compaction is initiated. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an MM table + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Perform a major compaction. + runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs. + Assert.assertEquals(2, status.length); + boolean sawBase = false; + int deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertFalse(sawBase); + // Verify query result + int [][] resultData = new int[][] {{1,2},{3,4}}; + List rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 3. INSERT OVERWRITE + // Prepare data for the source table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); + // Insert overwrite MM table from source table + runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + int baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + resultData = new int[][] {{5,6},{7,8}}; + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Perform a minor compaction. Nothing should change. + // Both deltas and the base dir should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + Assert.assertTrue(dirName.matches("base_.*")); + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir. + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** + * Test a scenario, on a partitioned micro-managed table, that an IOW comes in + * before a MAJOR compaction happens. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForPartitionedMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to a partitioned MM table. + int[][] valuesOdd = {{5,6},{7,8}}; + int[][] valuesEven = {{2,1},{4,3}}; + runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd)); + runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven)); + + // Verify dirs + String[] pStrings = {"/p=odd", "/p=even"}; + + for(int i=0; i < pStrings.length; i++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir per partition location + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + int[][] newValsOdd = {{5,5},{11,11}}; + int[][] newValsEven = {{2,2}}; + + runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd)); + runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven)); + + // Insert overwrite MM table from source table + List rs = null; + String s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + + " select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='odd'"; + rs = runStatementOnDriver("explain formatted " + s); + LOG.info("Explain formatted: " + rs.toString()); + runStatementOnDriver(s); + + s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='even') " + + " select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='even'"; + runStatementOnDriver(s); + + // Verify resulting dirs. + boolean sawBase = false; + String[] baseDirs = {"", ""}; + int deltaCount = 0; + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir, plus a base dir in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDirs[h] = dirName; + Assert.assertTrue(baseDirs[i].matches("base_.*")); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b"); + int [][] rExpectedEven = new int[][] {{2,2}}; + Assert.assertEquals(stringifyValues(rExpectedEven), rs); + + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b"); + int [][] rExpectedOdd = new int[][] {{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpectedOdd), rs); + + // 3. Perform a major compaction. Nothing should change. + // Both deltas and base dirs should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='odd') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='even') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(2, status.length); + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue("BASE ERROR: " + dirName, dirName.matches("base_.*")); + Assert.assertEquals(baseDirs[h], dirName); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b"); + int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + + // 4. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + Assert.assertEquals(baseDirs[h], status[0].getPath().getName()); + } + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + /** + * Test a scenario, on a dynamically partitioned micro-managed table, that an IOW comes in + * before a MAJOR compaction happens. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteWithDynamicPartition() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to a partitioned MM table. + int[][] valuesOdd = {{5,6},{7,8}}; + int[][] valuesEven = {{2,1},{4,3}}; + runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd)); + runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven)); + + // Verify dirs + String[] pStrings = {"/p=odd", "/p=even"}; + + for(int i=0; i < pStrings.length; i++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir per partition location + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + int[][] newValsOdd = {{5,5},{11,11}}; + int[][] newValsEven = {{2,2}}; + + runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd)); + runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven)); + + runStatementOnDriver("insert overwrite table " + TableExtended.MMTBLPART + " partition(p) select a,b,p from " + TableExtended.NONACIDPART); + + // Verify resulting dirs. + boolean sawBase = false; + String[] baseDirs = {"", ""}; + int deltaCount = 0; + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir, plus a base dir in the location + Assert.assertEquals(2, status.length); // steve + + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDirs[h] = dirName; + Assert.assertTrue(baseDirs[h].matches("base_.*")); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + List rs = null; + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b"); + int [][] rExpectedEven = new int[][] {{2,2}}; + Assert.assertEquals(stringifyValues(rExpectedEven), rs); + + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b"); + int [][] rExpectedOdd = new int[][] {{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpectedOdd), rs); + + // Verify query result + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b"); + int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + @Test + public void testInsertOverwriteWithUnionAll() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an MM table + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Insert Overwrite. + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + TestTxnCommands2.makeValuesClause(values)); + + runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL + " where a between 1 and 3 union all select a,b from " + Table.NONACIDORCTBL + " where a between 5 and 7"); + + // Verify resulting dirs. + boolean sawBase = false; + String baseDir = ""; + int deltaCount = 0; + + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(3, status.length); + + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDir = dirName; + Assert.assertTrue(baseDir.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertTrue(sawBase); + + List rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + int[][] rExpected = new int[][] {{1,2},{2,4},{5,6},{6,8}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + + // 4. Perform a major compaction. + runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + + // 5. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), rs); + + // Verify resulting dirs. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be one base dir in the location + Assert.assertEquals(1, status.length); + + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDir = dirName; + Assert.assertTrue(baseDir.matches("base_.*")); + } + } + Assert.assertEquals(0, deltaCount); + Assert.assertTrue(sawBase); + + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + private void verifyDirAndResult(int expectedDeltas) throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + // Verify the content of subdirs + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + int sawDeltaTimes = 0; + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + sawDeltaTimes++; + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, files.length); + Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); + } + Assert.assertEquals(expectedDeltas, sawDeltaTimes); + + // Verify query result + int [][] resultData = new int[][] {{1,2}, {3,4}}; + List rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(resultData), rs); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java new file mode 100644 index 0000000000..1a701752f5 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Before; +import org.junit.Test; + +/** + * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by + * default, and having 'transactional_properties' set to 'default'. This specifically tests the + * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on. + */ +public class TestTxnCommandsForOrcMmTable extends TestTxnCommandsForMmTable { + + public TestTxnCommandsForOrcMmTable() { + super(); + } + + @Override + @Before + public void setUp() throws Exception { + setUpInternal(); + setUpInternalExtended(true); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 8737369c39..8e1fd1afd1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -69,7 +70,7 @@ public String toString() { public void setUp() throws Exception { setUpInternal(); } - void setUpInternal() throws Exception { + protected void setUpInternal() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -101,7 +102,7 @@ void setUpInternal() throws Exception { runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc"); } - private void dropTables() throws Exception { + protected void dropTables() throws Exception { for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) { runStatementOnDriver("drop table if exists " + t); } @@ -134,6 +135,14 @@ String getWarehouseDir() { String makeValuesClause(int[][] rows) { return TestTxnCommands2.makeValuesClause(rows); } + + void runWorker(HiveConf hiveConf) throws MetaException { + TestTxnCommands2.runWorker(hiveConf); + } + + void runCleaner(HiveConf hiveConf) throws MetaException { + TestTxnCommands2.runCleaner(hiveConf); + } List runStatementOnDriver(String stmt) throws Exception { CommandProcessorResponse cpr = d.run(stmt); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 9a22c54b12..61f5d1aa02 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -218,7 +218,7 @@ public void testRemoveTempOrDuplicateFilesOnMrWithDp() throws Exception { Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); FileSinkDesc conf = getFileSinkDesc(tempDirPath); - List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf); + List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false); String expectedScheme = tempDirPath.toUri().getScheme(); String expectedAuthority = tempDirPath.toUri().getAuthority(); diff --git ql/src/test/results/clientpositive/llap/mm_all.q.out ql/src/test/results/clientpositive/llap/mm_all.q.out index cfbe659634..03c129347f 100644 --- ql/src/test/results/clientpositive/llap/mm_all.q.out +++ ql/src/test/results/clientpositive/llap/mm_all.q.out @@ -1365,6 +1365,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2 POSTHOOK: type: QUERY POSTHOOK: Input: default@multi0_2_mm #### A masked pattern was here #### +455 97 +455 98 +456 0 +456 10 +457 100 +457 103 PREHOOK: query: from intermediate insert into table multi0_1_mm select p, key insert overwrite table multi0_2_mm select key, p @@ -1417,6 +1423,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2 POSTHOOK: type: QUERY POSTHOOK: Input: default@multi0_2_mm #### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 PREHOOK: query: drop table multi0_1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@multi0_1_mm @@ -1541,17 +1553,11 @@ POSTHOOK: Input: default@multi1_mm@p=2 100 457 2 103 457 1 103 457 2 -455 97 1 455 97 2 -455 98 1 455 98 2 -456 0 1 456 0 2 -456 10 1 456 10 2 -457 100 1 457 100 2 -457 103 1 457 103 2 PREHOOK: query: from intermediate insert into table multi1_mm partition(p) select p, key, p @@ -1621,22 +1627,16 @@ POSTHOOK: Input: default@multi1_mm@p=457 103 457 1 103 457 1 103 457 2 -455 97 1 455 97 2 455 97 455 -455 98 1 455 98 2 455 98 455 -456 0 1 456 0 2 456 0 456 -456 10 1 456 10 2 456 10 456 -457 100 1 457 100 2 457 100 457 -457 103 1 457 103 2 457 103 457 PREHOOK: query: from intermediate @@ -1705,27 +1705,21 @@ POSTHOOK: Input: default@multi1_mm@p=457 103 457 1 103 457 2 455 97 1 -455 97 1 455 97 2 455 97 455 455 98 1 -455 98 1 455 98 2 455 98 455 456 0 1 -456 0 1 456 0 2 456 0 456 456 10 1 -456 10 1 456 10 2 456 10 456 457 100 1 -457 100 1 457 100 2 457 100 457 457 103 1 -457 103 1 457 103 2 457 103 457 PREHOOK: query: drop table multi1_mm diff --git ql/src/test/results/clientpositive/mm_all.q.out ql/src/test/results/clientpositive/mm_all.q.out index 5ad5957c5d..490c67f2ed 100644 --- ql/src/test/results/clientpositive/mm_all.q.out +++ ql/src/test/results/clientpositive/mm_all.q.out @@ -1384,6 +1384,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2 POSTHOOK: type: QUERY POSTHOOK: Input: default@multi0_2_mm #### A masked pattern was here #### +455 97 +455 98 +456 0 +456 10 +457 100 +457 103 PREHOOK: query: from intermediate insert into table multi0_1_mm select p, key insert overwrite table multi0_2_mm select key, p @@ -1436,6 +1442,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2 POSTHOOK: type: QUERY POSTHOOK: Input: default@multi0_2_mm #### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 PREHOOK: query: drop table multi0_1_mm PREHOOK: type: DROPTABLE PREHOOK: Input: default@multi0_1_mm @@ -1560,17 +1572,11 @@ POSTHOOK: Input: default@multi1_mm@p=2 100 457 2 103 457 1 103 457 2 -455 97 1 455 97 2 -455 98 1 455 98 2 -456 0 1 456 0 2 -456 10 1 456 10 2 -457 100 1 457 100 2 -457 103 1 457 103 2 PREHOOK: query: from intermediate insert into table multi1_mm partition(p) select p, key, p @@ -1640,22 +1646,16 @@ POSTHOOK: Input: default@multi1_mm@p=457 103 457 1 103 457 1 103 457 2 -455 97 1 455 97 2 455 97 455 -455 98 1 455 98 2 455 98 455 -456 0 1 456 0 2 456 0 456 -456 10 1 456 10 2 456 10 456 -457 100 1 457 100 2 457 100 457 -457 103 1 457 103 2 457 103 457 PREHOOK: query: from intermediate @@ -1724,27 +1724,21 @@ POSTHOOK: Input: default@multi1_mm@p=457 103 457 1 103 457 2 455 97 1 -455 97 1 455 97 2 455 97 455 455 98 1 -455 98 1 455 98 2 455 98 455 456 0 1 -456 0 1 456 0 2 456 0 456 456 10 1 -456 10 1 456 10 2 456 10 456 457 100 1 -457 100 1 457 100 2 457 100 457 457 103 1 -457 103 1 457 103 2 457 103 457 PREHOOK: query: drop table multi1_mm