diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index f4ebd3bd7a..49cb8e0af9 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -38,7 +38,8 @@ * Hive. */ public final class JavaUtils { - + + public static final String BASE_PREFIX = "base"; public static final String DELTA_PREFIX = "delta"; public static final String DELTA_DIGITS = "%07d"; public static final int DELTA_DIGITS_LEN = 7; @@ -167,8 +168,8 @@ private JavaUtils() { public static Long extractTxnId(Path file) { String fileName = file.getName(); - String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 - if (parts.length < 4 || !DELTA_PREFIX.equals(parts[0])) { + String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 + if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { LOG.debug("Cannot extract transaction ID for a MM table: " + file + " (" + Arrays.toString(parts) + ")"); return null; @@ -185,20 +186,31 @@ public static Long extractTxnId(Path file) { } public static class IdPathFilter implements PathFilter { - private final String mmDirName; + private String mmDirName; private final boolean isMatch, isIgnoreTemp, isPrefix; + public IdPathFilter(long writeId, int stmtId, boolean isMatch) { - this(writeId, stmtId, isMatch, false); + this(writeId, stmtId, isMatch, false, false); } public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp) { - String mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + - String.format(DELTA_DIGITS, writeId) + "_"; - if (stmtId >= 0) { - mmDirName += String.format(STATEMENT_DIGITS, stmtId); - isPrefix = false; + this(writeId, stmtId, isMatch, isIgnoreTemp, false); + } + public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp, boolean isBaseDir) { + String mmDirName = null; + if (!isBaseDir) { + mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" + + String.format(DELTA_DIGITS, writeId) + "_"; + if (stmtId >= 0) { + mmDirName += String.format(STATEMENT_DIGITS, stmtId); + isPrefix = false; + } else { + isPrefix = true; + } } else { - isPrefix = true; + mmDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId); + isPrefix = false; } + this.mmDirName = mmDirName; this.isMatch = isMatch; this.isIgnoreTemp = isIgnoreTemp; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index b163a1e265..a77b055966 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -260,7 +260,7 @@ public void closeOp(boolean abort) throws HiveException { // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), - tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null); + tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false); // TODO: MMIOW check ppath LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -340,7 +340,7 @@ public void jobCloseOp(Configuration hconf, boolean success) // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false); + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false); // TODO: MMIOW check ppath } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 2331498781..3e62c93eea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -338,7 +338,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = AcidUtils.deltaSubdir(txnId, txnId, stmtId); + String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId); if (unionPath != null) { // Create the union directory inside the MM directory. subdirPath += Path.SEPARATOR + unionPath; @@ -1325,7 +1325,7 @@ public void closeOp(boolean abort) throws HiveException { } if (conf.isMmTable()) { Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath); + commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1384,7 +1384,7 @@ public void jobCloseOp(Configuration hconf, boolean success) conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter, - conf.isMmTable(), conf.isMmCtas()); + conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); } } } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 65b2f87357..8732348a1b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -234,7 +234,7 @@ private void mvFileToFinalPath(Path specPath, Configuration hconf, Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")"); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files - Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); + Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, false); // Step3: move to the file destination Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b78c930cf5..57481c1cc3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1464,7 +1464,7 @@ public static void mvFileToFinalPath(Path specPath, Configuration hconf, perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files List emptyBuckets = Utilities.removeTempOrDuplicateFiles( - fs, statuses, dpCtx, conf, hconf, filesKept); + fs, statuses, dpCtx, conf, hconf, filesKept, false); perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { @@ -1552,37 +1552,38 @@ private static void addFilesToPathSet(Collection files, Set fi /** * Remove all temporary files and duplicate (double-committed) files from a given directory. */ - public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException { - removeTempOrDuplicateFiles(fs, path, null,null,null); + public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean isBaseDir) throws IOException { + removeTempOrDuplicateFiles(fs, path, null,null,null, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, Path path, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { if (path == null) { return null; } FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); - return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf); + return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir); } public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException { - return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null); + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { + return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null, isBaseDir); } /** * Remove all temporary files and duplicate (double-committed) files from a given directory. + * @param isBaseDir * * @return a list of path names corresponding to should-be-created empty buckets. */ public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept) + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set filesKept, boolean isBaseDir) throws IOException { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles( - fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept); + fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1599,7 +1600,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long txnId, - int stmtId, boolean isMmTable, Set filesKept) throws IOException { + int stmtId, boolean isMmTable, Set filesKept, boolean isBaseDir) throws IOException { if (fileStats == null) { return null; } @@ -1618,7 +1619,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (isMmTable) { Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); } @@ -1642,7 +1643,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( fs.listStatus(new Path(mmDir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { @@ -1660,7 +1661,7 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(txnId, stmtId, items); + Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1672,12 +1673,12 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return result; } - private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items) throws IOException { + private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { if (items.length > 1) { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); @@ -4014,7 +4015,7 @@ private static void tryDelete(FileSystem fs, Path path) { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf) + int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { @@ -4029,7 +4030,7 @@ private static void tryDelete(FileSystem fs, Path path) { || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId); + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4098,7 +4099,7 @@ private static boolean isS3(FileSystem fs) { } private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, - Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException { + Path path, int skipLevels, PathFilter filter, long txnId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append('*'); @@ -4108,7 +4109,7 @@ private static boolean isS3(FileSystem fs) { // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId)).append("_*"); throw new AssertionError("GlobStatus should not be called without a statement ID"); } else { - sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId)); } Path pathPattern = new Path(path, sb.toString()); return statusToPath(fs.globStatus(pathPattern, filter)); @@ -4116,9 +4117,9 @@ private static boolean isS3(FileSystem fs) { private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, - long txnId, int stmtId, Configuration conf) throws IOException { + long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf, isBaseDir); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4131,12 +4132,12 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, - String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException { + String taskId, Long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (CollectionUtils.isEmpty(commitPaths)) { return; } // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4155,8 +4156,10 @@ public static void writeMmCommitManifest(List commitPaths, Path specPath, } } - private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) { - Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { + Path manifestPath = new Path(specPath, "_tmp." + + AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, txnId, txnId, stmtId)); + return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); } @@ -4173,13 +4176,13 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas) throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix); + Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - filter, txnId, stmtId, hconf); + filter, txnId, stmtId, hconf, isInsertOverwrite); return; } @@ -4202,13 +4205,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); + JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false, isInsertOverwrite); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf); + fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf, isInsertOverwrite); ArrayList mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4265,7 +4268,7 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId, - isMmTable, null); + isMmTable, null, isInsertOverwrite); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { assert mbc != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c0b71f65f..6018104cf5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -204,6 +204,19 @@ public static String deleteDeltaSubdir(long min, long max, int statementId) { public static String baseDir(long txnId) { return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); } + + /** + * Return a base or delta directory string + * according to the given "baseDirRequired". + */ + public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long max, int statementId) { + if (!baseDirRequired) { + return deltaSubdir(min, max, statementId); + } else { + return baseDir(min); + } + } + /** * Create a filename for a bucket file. * @param directory the partition directory diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 6a1dc729f3..3a4972dcdb 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -549,7 +549,14 @@ private static void processForWriteIds(Path dir, JobConf conf, } else if (!hadAcidState) { AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null); hadAcidState = true; - // TODO [MM gap]: for IOW, we also need to count in base dir, if any + + // Find the base, created for IOW. + Path base = dirInfo.getBaseDirectory(); + if (base != null) { + finalPaths.add(base); + } + + // Find the parsed delta files. for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) { Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath()); finalPaths.add(delta.getPath()); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 70656feea7..48ac53fea2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2055,7 +2055,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // where this is used; we always want to load everything; also the only case where // we have multiple statements anyway is union. Path[] leafStatus = Utilities.getMmDirectoryCandidates( - fs, loadPath, numDP, numLB, null, txnId, -1, conf); + fs, loadPath, numDP, numLB, null, txnId, -1, conf, false); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. for (int i = 0; i < numLB; ++i) { @@ -2246,9 +2246,9 @@ public Void call() throws Exception { * @param isAcid true if this is an ACID based write */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, - boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, - Long txnId, int stmtId, boolean isMmTable) throws HiveException { - + boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask, + Long txnId, int stmtId, boolean isMmTable) throws HiveException { + List newFiles = null; Table tbl = getTable(tableName); HiveConf sessionConf = SessionState.getSessionConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 549b38d91f..f23efc0771 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6782,7 +6782,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Integer dest_type = qbm.getDestTypeForAlias(dest); Table dest_tab = null; // destination table if any - boolean destTableIsAcid = false; // should the destination table be written to using ACID + boolean destTableIsAcid = false; // true for full ACID table and MM table + boolean destTableIsFullAcid = false; // should the destination table be written to using ACID boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition dest_part = null;// destination partition if any @@ -6803,7 +6804,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6853,7 +6855,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } @@ -6872,7 +6874,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName()) && !destTableIsAcid) + dest_tab.getTableName()) && !destTableIsAcid) // TODO: MMIOW check if this works.. ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); ltd.setLbCtx(lbCtx); @@ -6896,7 +6898,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); checkExternalTable(dest_tab); @@ -6929,7 +6932,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(), dest_part.isStoredAsSubDirectories(), conf); AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; - if (destTableIsAcid) { + if (destTableIsFullAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 2faf098f6a..7094e702a7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -94,7 +94,8 @@ NONACIDPART("nonAcidPart", "p"), NONACIDPART2("nonAcidPart2", "p2"), ACIDNESTEDPART("acidNestedPart", "p,q"), - MMTBL("mmTbl"); + MMTBL("mmTbl"), + MMTBLPART("mmTblPart","p"); private final String name; private final String partitionColumns; @@ -156,6 +157,7 @@ protected void setUpWithTableProperties(String tableProperties) throws Exception "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")"); runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); + runStatementOnDriver("create table " + Table.MMTBLPART + "(a int, b int) partitioned by (p string) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')"); } protected void dropTables() throws Exception { @@ -1993,6 +1995,256 @@ public void testInsertOverwrite2() throws Exception { Assert.assertEquals(stringifyValues(resultData), rs); } + /** + * Test a scenario, on a micro-managed table, where an IOW comes in + * after a MAJOR compaction, and then a MINOR compaction is initiated. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an ACID table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Perform a major compaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs. + Assert.assertEquals(2, status.length); + boolean sawBase = false; + int deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertFalse(sawBase); + // Verify query result + int [][] resultData = new int[][] {{1,2},{3,4}}; + List rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 3. INSERT OVERWRITE + // Prepare data for the source table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); + // Insert overwrite ACID table from source table + runStatementOnDriver("insert overwrite table " + Table.MMTBL + " select a,b from " + Table.NONACIDORCTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + int baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + resultData = new int[][] {{5,6},{7,8}}; + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Perform a minor compaction. Nothing should change. + // Both deltas and the base dir should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 1 base dir in the location + Assert.assertEquals(3, status.length); + baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + Assert.assertTrue(dirName.matches("base_.*")); + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(1, baseCount); + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir. + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by a"); // steve.. check here for getAcidState() + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** + * Test a scenario, on a partitioned micro-managed table, that an IOW comes in + * before a MAJOR compaction happens. + * + * @throws Exception + */ + @Test + public void testInsertOverwriteForPartitionedMmTable() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to a partitioned MM table. + int[][] valuesOdd = {{5,6},{7,8}}; + int[][] valuesEven = {{2,1},{4,3}}; + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd)); + runStatementOnDriver("insert into " + Table.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven)); + + // Verify dirs + String[] pStrings = {"/p=odd", "/p=even"}; + + for(int i=0; i < pStrings.length; i++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir per partition location + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + int[][] newValsOdd = {{5,5},{11,11}}; + int[][] newValsEven = {{2,2}}; + + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd)); + runStatementOnDriver("insert into " + Table.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven)); + + // Insert overwrite MM table from source table + List rs = null; + String s = "insert overwrite table " + Table.MMTBLPART + " PARTITION(p='odd') " + + " select a,b from " + Table.NONACIDPART + " where " + Table.NONACIDPART + ".p='odd'"; + rs = runStatementOnDriver("explain formatted " + s); + LOG.info("Explain formatted: " + rs.toString()); + runStatementOnDriver(s); + + s = "insert overwrite table " + Table.MMTBLPART + " PARTITION(p='even') " + + " select a,b from " + Table.NONACIDPART + " where " + Table.NONACIDPART + ".p='even'"; + runStatementOnDriver(s); + + // Verify resulting dirs. + boolean sawBase = false; + String[] baseDirs = {"", ""}; + int deltaCount = 0; + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 1 delta dir, plus a base dir in the location + //Assert.assertEquals(2, status.length); // steve + + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDirs[h] = dirName; + Assert.assertTrue(baseDirs[i].matches("base_.*")); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='even' order by a"); + int [][] rExpectedEven = new int[][] {{2,2}}; + Assert.assertEquals(stringifyValues(rExpectedEven), rs); + + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " where p='odd' order by a"); + int [][] rExpectedOdd = new int[][] {{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpectedOdd), rs); + + // 3. Perform a major compaction. Nothing should change. + // Both deltas and base dirs should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.MMTBLPART + " PARTITION(p='odd') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + runStatementOnDriver("alter table "+ Table.MMTBLPART + " PARTITION(p='even') " + " compact 'MAJOR'" ); + runWorker(hiveConf); + + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(2, status.length); + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + Assert.assertEquals(baseDirs[h], dirName); + } + } + Assert.assertEquals(1, deltaCount); + Assert.assertTrue(sawBase); + deltaCount = 0; + sawBase = false; + } + + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " order by a"); + int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), rs); + + // 4. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + for(int h=0; h < pStrings.length; h++) { + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + Assert.assertEquals(baseDirs[h], status[0].getPath().getName()); + } + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.MMTBLPART + " order by a"); + Assert.assertEquals(stringifyValues(rExpected), rs); + } + /** * Test compaction for Micro-managed table * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 9a22c54b12..61f5d1aa02 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -218,7 +218,7 @@ public void testRemoveTempOrDuplicateFilesOnMrWithDp() throws Exception { Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); FileSinkDesc conf = getFileSinkDesc(tempDirPath); - List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf); + List paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false); String expectedScheme = tempDirPath.toUri().getScheme(); String expectedAuthority = tempDirPath.toUri().getAuthority();