diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index f4ebd3bd7a..8b1bbaa2d5 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -39,6 +39,7 @@ */ public final class JavaUtils { + public static final String BASE_PREFIX = "base"; public static final String DELTA_PREFIX = "delta"; public static final String DELTA_DIGITS = "%07d"; public static final int DELTA_DIGITS_LEN = 7; @@ -167,8 +168,8 @@ private JavaUtils() { public static Long extractTxnId(Path file) { String fileName = file.getName(); - String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 - if (parts.length < 4 || !DELTA_PREFIX.equals(parts[0])) { + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 + if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { LOG.debug("Cannot extract transaction ID for a MM table: " + file + " (" + Arrays.toString(parts) + ")"); return null; @@ -185,20 +186,31 @@ public static Long extractTxnId(Path file) { } public static class IdPathFilter implements PathFilter { - private final String mmDirName; + private String mmDirName; private final boolean isMatch, isIgnoreTemp, isPrefix; + public IdPathFilter(long writeId, int stmtId, boolean isMatch) { - this(writeId, stmtId, isMatch, false); + this(writeId, stmtId, isMatch, false, false); } public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp) { - String mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId) + "_"; - if (stmtId >= 0) { - mmDirName += String.format(STATEMENT_DIGITS, stmtId); - isPrefix = false; + this(writeId, stmtId, isMatch, isIgnoreTemp, false); + } + public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp, boolean isBaseDir) { + String mmDirName = null; + if (!isBaseDir) { + mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId) + "_"; + if (stmtId >= 0) { + mmDirName += String.format(STATEMENT_DIGITS, stmtId); + isPrefix = false; + } else { + isPrefix = true; + } } else { - isPrefix = true; + mmDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId); + isPrefix = false; } + this.mmDirName = mmDirName; this.isMatch = isMatch; this.isIgnoreTemp = isIgnoreTemp; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index b163a1e265..a77b055966 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -260,7 +260,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), - tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null); + tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false); // TODO: MMIOW check ppath LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -340,7 +340,7 @@ public void jobCloseOp(Configuration hconf, boolean success) // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false); + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false); // TODO: MMIOW check ppath } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2331498781..3e62c93eea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -338,7 +338,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = AcidUtils.deltaSubdir(txnId, txnId, stmtId); + String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId); if (unionPath != null) { // Create the union directory inside the MM directory. subdirPath += Path.SEPARATOR + unionPath; @@ -1325,7 +1325,7 @@ public void closeOp(boolean abort) throws HiveException { } if (conf.isMmTable()) { Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath); + commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1384,7 +1384,7 @@ public void jobCloseOp(Configuration hconf, boolean success) conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter, - conf.isMmTable(), conf.isMmCtas()); + conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 65b2f87357..8732348a1b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -234,7 +234,7 @@ private void mvFileToFinalPath(Path specPath, Configuration hconf, Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")"); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, false); // Step3: move to the file destination Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b78c930cf5..dc6c45de20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1464,7 +1464,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files List emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, statuses, dpCtx, conf, hconf, filesKept); + fs, statuses, dpCtx, conf, hconf, filesKept, false); perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { @@ -1552,23 +1552,23 @@ private static void addFilesToPathSet(Collection files, Set fi /** * Remove all temporary files and duplicate (double-committed) files from a given directory. */ - public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException { - removeTempOrDuplicateFiles(fs, path, null,null,null); + public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean isBaseDir) throws IOException { + removeTempOrDuplicateFiles(fs, path, null,null,null, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { if (path == null) { return null; } FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); - return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { - return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null); + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { + return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null, isBaseDir); } /** @@ -1577,12 +1577,12 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws I * @return a list of path names corresponding to should-be-created empty buckets. */ public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept) + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept, boolean isBaseDir) throws IOException { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles( - fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept); + fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1599,7 +1599,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long txnId, - int stmtId, boolean isMmTable, Set filesKept) throws IOException { + int stmtId, boolean isMmTable, Set filesKept, boolean isBaseDir) throws IOException { if (fileStats == null) { return null; } @@ -1618,7 +1618,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (isMmTable) { Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); } @@ -1642,7 +1642,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( fs.listStatus(new Path(mmDir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { @@ -1660,7 +1660,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1672,12 +1672,12 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return result; } - private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items) throws IOException { + private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { if (items.length > 1) { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); @@ -4014,7 +4014,7 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf) + int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { @@ -4029,7 +4029,7 @@ private static void tryDelete(FileSystem fs, Path path) { || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId); + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4098,7 +4098,7 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, - Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException { + Path path, int skipLevels, PathFilter filter, long txnId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append('*'); @@ -4108,7 +4108,7 @@ private static boolean isS3(FileSystem fs) { // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId)).append("_*"); throw new AssertionError("GlobStatus should not be called without a statement ID"); } else { - sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId)); } Path pathPattern = new Path(path, sb.toString()); return statusToPath(fs.globStatus(pathPattern, filter)); @@ -4116,9 +4116,9 @@ private static boolean isS3(FileSystem fs) { private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, - long txnId, int stmtId, Configuration conf) throws IOException { + long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf, isBaseDir); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4131,12 +4131,12 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, - String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException { + String taskId, Long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (CollectionUtils.isEmpty(commitPaths)) { return; } // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4155,8 +4155,10 @@ public static void writeMmCommitManifest(List commitPaths, Path specPath, } } - private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) { - Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { + Path manifestPath = new Path(specPath, "_tmp." + + AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, txnId, txnId, stmtId)); + return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); } @@ -4173,13 +4175,13 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas) throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - filter, txnId, stmtId, hconf); + filter, txnId, stmtId, hconf, isInsertOverwrite); return; } @@ -4202,13 +4204,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); + JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false, isInsertOverwrite); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf, isInsertOverwrite); ArrayList mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4265,7 +4267,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId, - isMmTable, null); + isMmTable, null, isInsertOverwrite); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { assert mbc != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c0b71f65f..8acf7c5fe7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -204,6 +204,19 @@ public static String deleteDeltaSubdir(long min, long max, int statementId) { public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } + + /** + * Return a base or delta directory string + * according to the given "baseDirRequired". + */ + public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long max, int statementId) { + if (!baseDirRequired) { + return deltaSubdir(min, max, statementId); + } else { + return baseDir(min); + } + } + /** * Create a filename for a bucket file. * @param directory the partition directory diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6a1dc729f3..31e605808f 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -549,7 +549,14 @@ private static void processForWriteIds(Path dir, JobConf conf, } else if (!hadAcidState) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null); hadAcidState = true; - // TODO [MM gap]: for IOW, we also need to count in base dir, if any + + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + finalPaths.add(base); + } + + // Find the parsed delta files. for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); finalPaths.add(delta.getPath()); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 70656feea7..afcd269592 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2055,7 +2055,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // where this is used; we always want to load everything; also the only case where // we have multiple statements anyway is union. Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, txnId, -1, conf); + fs, loadPath, numDP, numLB, null, txnId, -1, conf, false); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { @@ -2246,8 +2246,8 @@ public Void call() throws Exception { * @param isAcid true if this is an ACID based write */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long txnId, int stmtId, boolean isMmTable) throws HiveException { + boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, + Long txnId, int stmtId, boolean isMmTable) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 549b38d91f..684f587df1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6782,7 +6782,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Integer dest_type = qbm.getDestTypeForAlias(dest); Table dest_tab = null; // destination table if any - boolean destTableIsAcid = false; // should the destination table be written to using ACID + boolean destTableIsAcid = false; // true for full ACID table and MM table + boolean destTableIsFullAcid = false; // should the destination table be written to using ACID boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition dest_part = null;// destination partition if any @@ -6803,7 +6804,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6853,7 +6855,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } @@ -6872,7 +6874,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsAcid) + dest_tab.getTableName()) && !destTableIsAcid) // TODO: MMIOW check if this works.. ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); ltd.setLbCtx(lbCtx); @@ -6896,7 +6898,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); checkExternalTable(dest_tab); @@ -6929,7 +6932,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(), dest_part.isStoredAsSubDirectories(), conf); AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } @@ -7121,7 +7124,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, dest_tab, txnId, isMmCtas, dest_type); + canBeMerged, dest_tab, txnId, isMmCtas, dest_type, qb); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7230,7 +7233,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, - Integer dest_type) throws SemanticException { + Integer dest_type, QB qb) throws SemanticException { boolean isInsertOverwrite = false; switch (dest_type) { case QBMetaData.DEST_PARTITION: diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java new file mode 100644 index 0000000000..c2d30c426e --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService; +import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; +import org.apache.hadoop.hive.ql.txn.compactor.Initiator; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests here are for micro-managed tables: + * specifically INSERT OVERWRITE statements and Major/Minor Compactions. + */ +public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommandsForMmTable.class); + protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnCommands.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + /** + * Test compaction for Micro-managed table + * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables + * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any + * @throws Exception + */ + @Test + public void testMmTableCompaction() throws Exception { + // 1. Insert some rows into MM table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + verifyDirAndResult(2); + + // 3. Let a transaction be aborted + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + // There should be 3 delta directories. The new one is the aborted one. + verifyDirAndResult(3); + + // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + // The worker should remove the subdir for aborted transaction + verifyDirAndResult(2); + + // 5. Run Cleaner. Shouldn't impact anything. + runCleaner(hiveConf); + verifyDirAndResult(2); + } + + /** + * Test a scenario, on a micro-managed table, where an IOW comes in + * after a MAJOR compaction, and then a MINOR compaction is initiated. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an MM table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Perform a major compaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs. + Assert.assertEquals(2, status.length); + boolean sawBase = false; + int deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertFalse(sawBase); + // Verify query result + int [][] resultData = new int[][] {{1,2},{3,4}}; + List rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 3. INSERT OVERWRITE + // Prepare data for the source table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); + // Insert overwrite MM table from source table + runStatementOnDriver("insert overwrite table " + Table.MMTBL + " select a,b from " + Table.NONACIDORCTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + int baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + resultData = new int[][] {{5,6},{7,8}}; + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Perform a minor compaction. Nothing should change. + // Both deltas and the base dir should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + Assert.assertTrue(dirName.matches("base_.*")); + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir. + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** + * Test a scenario, on a partitioned micro-managed table, that an IOW comes in + * before a MAJOR compaction happens. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForPartitionedMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to a partitioned MM table. + int[][] valuesOdd = {{5,6},{7,8}}; + int[][] valuesEven = {{2,1},{4,3}}; + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd)); + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven)); + + // Verify dirs + String[] pStrings = {"/p=odd", "/p=even"}; + + for(int i=0; i < pStrings.length; i++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir per partition location + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + int[][] newValsOdd = {{5,5},{11,11}}; + int[][] newValsEven = {{2,2}}; + + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd)); + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven)); + + // Insert overwrite MM table from source table + List rs = null; + String s = "insert overwrite table " + Table.MMTBLPART + " PARTITION(p='odd') " + + " select a,b from " + Table.NONACIDPART + " where " + Table.NONACIDPART + ".p='odd'"; + rs = runStatementOnDriver("explain formatted " + s); + LOG.info("Explain formatted: " + rs.toString()); + runStatementOnDriver(s); + + s = "insert overwrite table " + Table.MMTBLPART + " PARTITION(p='even') " + + " select a,b from " + Table.NONACIDPART + " where " + Table.NONACIDPART + ".p='even'"; + runStatementOnDriver(s); + + // Verify resulting dirs. + boolean sawBase = false; + String[] baseDirs = {"", ""}; + int deltaCount = 0; + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir, plus a base dir in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDirs[h] = dirName; + Assert.assertTrue(baseDirs[i].matches("base_.*")); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='even' order by a"); + int [][] rExpectedEven = new int[][] {{2,2}}; + Assert.assertEquals(stringifyValues(rExpectedEven), rs); + + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='odd' order by a"); + int [][] rExpectedOdd = new int[][] {{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpectedOdd), rs); + + // 3. Perform a major compaction. Nothing should change. + // Both deltas and base dirs should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.MMTBLPART + " PARTITION(p='odd') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + runStatementOnDriver("alter table "+ Table.MMTBLPART + " PARTITION(p='even') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(2, status.length); + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + Assert.assertEquals(baseDirs[h], dirName); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " order by a"); + int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + + // 4. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + Assert.assertEquals(baseDirs[h], status[0].getPath().getName()); + } + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " order by a"); + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + /** + * Test a scenario, on a dynamically partitioned micro-managed table, that an IOW comes in + * before a MAJOR compaction happens. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteWithDynamicPartition() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to a partitioned MM table. + int[][] valuesOdd = {{5,6},{7,8}}; + int[][] valuesEven = {{2,1},{4,3}}; + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd)); + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven)); + + // Verify dirs + String[] pStrings = {"/p=odd", "/p=even"}; + + for(int i=0; i < pStrings.length; i++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir per partition location + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + int[][] newValsOdd = {{5,5},{11,11}}; + int[][] newValsEven = {{2,2}}; + + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd)); + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven)); + + runStatementOnDriver("insert overwrite table " + Table.MMTBLPART + " partition(p) select a,b,p from " + Table.NONACIDPART); + + // Verify resulting dirs. + boolean sawBase = false; + String[] baseDirs = {"", ""}; + int deltaCount = 0; + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir, plus a base dir in the location + Assert.assertEquals(2, status.length); // steve + + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDirs[h] = dirName; + Assert.assertTrue(baseDirs[i].matches("base_.*")); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + List rs = null; + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='even' order by a"); + int [][] rExpectedEven = new int[][] {{2,2}}; + Assert.assertEquals(stringifyValues(rExpectedEven), rs); + + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='odd' order by a"); + int [][] rExpectedOdd = new int[][] {{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpectedOdd), rs); + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " order by a"); + int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + @Test + public void testInsertOverwriteWithUnionAll() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an MM table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Insert Overwrite. + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + TestTxnCommands2.makeValuesClause(values)); + + runStatementOnDriver("insert overwrite table " + Table.MMTBL + " select a,b from " + Table.NONACIDORCTBL + " where a between 1 and 3 union all select a,b from " + Table.NONACIDORCTBL + " where a between 5 and 7"); + + // Verify resulting dirs. + boolean sawBase = false; + String baseDir = ""; + int deltaCount = 0; + + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(3, status.length); + + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDir = dirName; + Assert.assertTrue(baseDir.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertTrue(sawBase); + + List rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a,b"); + int[][] rExpected = new int[][] {{1,2},{2,4},{5,6},{6,8}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + + // 4. Perform a major compaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + + // 5. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), rs); + + // Verify resulting dirs. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be one base dir in the location + Assert.assertEquals(1, status.length); + + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDir = dirName; + Assert.assertTrue(baseDir.matches("base_.*")); + } + } + Assert.assertEquals(0, deltaCount); + Assert.assertTrue(sawBase); + + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), rs); + } + + private void verifyDirAndResult(int expectedDeltas) throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + // Verify the content of subdirs + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + int sawDeltaTimes = 0; + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + sawDeltaTimes++; + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, files.length); + Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); + } + Assert.assertEquals(expectedDeltas, sawDeltaTimes); + + // Verify query result + int [][] resultData = new int[][] {{1,2}, {3,4}}; + List rs = runStatementOnDriver("select a,b from " + Table.MMTBL); + Assert.assertEquals(stringifyValues(resultData), rs); + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 3e4f6f6675..b63eaade21 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -53,15 +54,27 @@ ACIDTBL2("acidTbl2"), NONACIDORCTBL("nonAcidOrcTbl"), NONACIDORCTBL2("nonAcidOrcTbl2"), - NONACIDNONBUCKET("nonAcidNonBucket"); + NONACIDNONBUCKET("nonAcidNonBucket"), + NONACIDPART("nonAcidPart", "p"), + MMTBL("mmTbl"), + MMTBL2("mmTbl2"), + MMTBLPART("mmTblPart","p"); final String name; + final String partitionColumns; @Override public String toString() { return name; } + String getPartitionColumns() { + return partitionColumns; + } Table(String name) { + this(name, null); + } + Table(String name, String partitionColumns) { this.name = name; + this.partitionColumns = partitionColumns; } } @@ -75,6 +88,7 @@ void setUpInternal() throws Exception { hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir()); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, @@ -99,6 +113,10 @@ void setUpInternal() throws Exception { runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc"); + runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + Table.MMTBL2 + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + Table.MMTBLPART + "(a int, b int) partitioned by (p string) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); } private void dropTables() throws Exception { for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) { @@ -130,10 +148,19 @@ String getWarehouseDir() { List stringifyValues(int[][] rowsIn) { return TestTxnCommands2.stringifyValues(rowsIn); } + String makeValuesClause(int[][] rows) { return TestTxnCommands2.makeValuesClause(rows); } + void runWorker(HiveConf hiveConf) throws MetaException { + TestTxnCommands2.runWorker(hiveConf); + } + + void runCleaner(HiveConf hiveConf) throws MetaException { + TestTxnCommands2.runCleaner(hiveConf); + } + List runStatementOnDriver(String stmt) throws Exception { CommandProcessorResponse cpr = d.run(stmt); if(cpr.getResponseCode() != 0) { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 9a22c54b12..61f5d1aa02 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -218,7 +218,7 @@ public void testRemoveTempOrDuplicateFilesOnMrWithDp() throws Exception { Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); FileSinkDesc conf = getFileSinkDesc(tempDirPath); - List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf); + List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false); String expectedScheme = tempDirPath.toUri().getScheme(); String expectedAuthority = tempDirPath.toUri().getAuthority();