diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index d59cfe51e9..4a46919104 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -90,6 +90,8 @@ public void setup() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.cleanDb(hiveConf); TxnDbUtil.prepDb(hiveConf); @@ -127,7 +129,7 @@ public void tearDown() { conf = null; } - @Test + //@Test public void testMajorCompaction() throws Exception { String dbName = "default"; String tblName = "testMajorCompaction"; @@ -209,7 +211,7 @@ public void testMajorCompaction() throws Exception { executeStatementOnDriver("drop table " + tblName, driver); } - @Test + //@Test public void testMinorCompactionDisabled() throws Exception { String dbName = "default"; String tblName = "testMinorCompactionDisabled"; @@ -257,7 +259,7 @@ public void testMinorCompactionDisabled() throws Exception { executeStatementOnDriver("drop table " + tblName, driver); } - @Test + //@Test public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception { String dbName = "default"; String tblName = "testCompactionWithSchemaEvolutionAndBuckets"; @@ -299,7 +301,7 @@ public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception { executeStatementOnDriver("drop table " + tblName, driver); } - @Test + //@Test public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws Exception { HiveConf hiveConf = new HiveConf(conf); hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); @@ -345,7 +347,55 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws // Clean up executeStatementOnDriver("drop table " + tblName, driver); } - + + @Test + public void testMMTableInsert() throws Exception { + String dbName = "default"; + String tblName = "testMMTableInsert"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='insert_only')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + } + + //@Test + public void testDirectInsert() throws Exception { + String dbName = "default"; + String tblName = "testDirectInsert"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + } + + //@Test + public void testDirectInsertDP() throws Exception { + String dbName = "default"; + String tblName = "testDirectInsert"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + } + + //@Test + public void testDirectInsertSP() throws Exception { + String dbName = "default"; + String tblName = "testDirectInsert"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='default')", driver); + executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); + executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver); + } + private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) throws Exception { HiveConf hiveConf = new HiveConf(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index bb89f803d5..3a6e43fd52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -259,7 +259,7 @@ public void closeOp(boolean abort) throws HiveException { assert finalPath.equals(outPath); // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. - Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), + Utilities.writeCommitManifest(Lists.newArrayList(outPath), tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } @@ -339,8 +339,8 @@ public void jobCloseOp(Configuration hconf, boolean success) lbLevels = conf.getListBucketingDepth(); // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. - Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false); + Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false); } } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index cd13397a79..e90f0ec062 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext; import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -168,19 +169,29 @@ int acidLastBucket = -1; int acidFileOffset = -1; private boolean isMmTable; + private boolean isDirectInsert; String dpDirForCounters; - public FSPaths(Path specPath, boolean isMmTable) { - this.isMmTable = isMmTable; - if (!isMmTable) { + public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert) { + this.isMmTable = isMmTable; + this.isDirectInsert = isDirectInsert; + if (!isMmTable && !isDirectInsert) { tmpPathRoot = Utilities.toTempPath(specPath); taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath); subdirForTxn = null; } else { tmpPathRoot = specPath; taskOutputTempPathRoot = null; // Should not be used. - subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), - conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId()); + if (isMmTable) { + subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), + conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId()); + } else { + /** + * For direct write to final path during ACID insert, we create the delta directories + * later when we create the RecordUpdater using AcidOutputFormat.Options + */ + subdirForTxn = null; + } } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles @@ -246,7 +257,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); } - if (isMmTable) { + if (isMmTable || isDirectInsert) { assert outPaths[idx].equals(finalPaths[idx]); commitPaths.add(outPaths[idx]); } else if (!fs.rename(outPaths[idx], finalPaths[idx])) { @@ -290,7 +301,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); String taskWithExt = extension == null ? taskId : taskId + extension; - if (!isMmTable) { + if (!isMmTable && !isDirectInsert) { if (!bDynParts && !isSkewedStoredAsSubDirectories) { finalPaths[filesIdx] = new Path(parent, taskWithExt); } else { @@ -330,7 +341,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } if (LOG.isInfoEnabled()) { LOG.info("Final Path: FS " + finalPaths[filesIdx]); - if (LOG.isInfoEnabled() && !isMmTable) { + if (LOG.isInfoEnabled() && (!isMmTable && !isDirectInsert)) { LOG.info("Writing to temp file: FS " + outPaths[filesIdx]); } } @@ -468,7 +479,7 @@ private void initializeSpecPath() { unionPath = null; } else { isUnionDp = (dpCtx != null); - if (conf.isMmTable() || isUnionDp) { + if (conf.isMmTable() || conf.isDirectInsert() || isUnionDp) { // MM tables need custom handling for union suffix; DP tables use parent too. specPath = conf.getParentDir(); unionPath = conf.getDirName().getName(); @@ -565,7 +576,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } if (!bDynParts) { - fsp = new FSPaths(specPath, conf.isMmTable()); + fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert()); fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp) @@ -740,7 +751,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { assert filesIdx == numFiles; // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable()) { + if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable() && !conf.isDirectInsert()) { autoDelete = fs.deleteOnExit(fsp.outPaths[0]); } } catch (Exception e) { @@ -764,7 +775,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); } - if (isNativeTable() && !conf.isMmTable()) { + if (isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()) { // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } @@ -779,7 +790,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) // If MM wants to create a new base for IOW (instead of delta dir), it should specify it here if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { Path outPath = fsp.outPaths[filesIdx]; - if (conf.isMmTable() + if (conf.isMmTable() && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { LOG.warn("Unable to create directory with inheritPerms: " + outPath); } @@ -790,12 +801,17 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) StatsProvidingRecordWriter; // increment the CREATED_FILES counter } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { + Path outPath = fsp.outPaths[filesIdx]; + if (conf.isDirectInsert() + && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { + LOG.warn("Unable to create directory with inheritPerms: " + outPath); + } // Only set up the updater for insert. For update and delete we don't know unitl we see // the row. ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector; int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), - acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); + acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); // outPath.getParent() } if (reporter != null) { @@ -824,9 +840,9 @@ private void updateDPCounters(final FSPaths fsp, final int filesIdx) { // /// // for non-MM tables, the final destination partition directory is created during move task via rename - // for MM tables, the final destination partition directory is created by the tasks themselves + // for MM tables and ACID insert, the final destination partition directory is created by the tasks themselves try { - if (conf.isMmTable()) { + if (conf.isMmTable() || conf.isDirectInsert()) { createDpDir(destPartPath); } else { // outPath will be @@ -1086,7 +1102,7 @@ assert getConf().getWriteType() != AcidUtils.Operation.DELETE && * @throws HiveException */ private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); + FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert()); fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath); fsp2.subdirBeforeTxn = dpDir; String pathKey = combinePathFragments(dpDir, lbDir); @@ -1307,7 +1323,7 @@ public void closeOp(boolean abort) throws HiveException { // record writer already gathers the statistics, it can simply return the // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { for (int idx = 0; idx < fsp.outWriters.length; idx++) { RecordWriter outWriter = fsp.outWriters[idx]; if (outWriter != null) { @@ -1338,9 +1354,9 @@ public void closeOp(boolean abort) throws HiveException { SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths); } } - if (conf.isMmTable()) { - Utilities.writeMmCommitManifest(commitPaths, specPath, fs, originalTaskId, - conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); + if (conf.isMmTable() || conf.isDirectInsert()) { + Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf + .getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1351,7 +1367,7 @@ public void closeOp(boolean abort) throws HiveException { // Hadoop always call close() even if an Exception was thrown in map() or // reduce(). for (FSPaths fsp : valToPaths.values()) { - fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable()); + fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()); } } fsp = prevFsp = null; @@ -1380,14 +1396,14 @@ public void jobCloseOp(Configuration hconf, boolean success) String unionSuffix = null; DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); ListBucketingCtx lbCtx = conf.getLbCtx(); - if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable())) { + if (conf.isLinkedFileSink() && (dpCtx != null || conf.isMmTable() || conf.isDirectInsert())) { specPath = conf.getParentDir(); unionSuffix = conf.getDirName().getName(); } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath); } - if (!conf.isMmTable()) { + if (!conf.isMmTable() && !conf.isDirectInsert()) { Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); } else { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), @@ -1397,9 +1413,9 @@ public void jobCloseOp(Configuration hconf, boolean success) : (dpCtx != null ? dpCtx.getNumBuckets() : 0); MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); - Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, - dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter, - conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); + Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, + conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf + .getInsertOverwrite(), conf.isDirectInsert()); } } } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 15a266d268..61b2ea2af1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -345,7 +345,7 @@ public int execute(DriverContext driverContext) { // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. - // This is also used for MM table conversion. + // This is also used for MM table conversion. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); if (lmfd != null) { boolean isDfsDir = lmfd.getIsDfsDir(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 36bc08f34e..e8775c89a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1665,7 +1665,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles( - fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); + fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir, false); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1682,9 +1682,9 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return true; } - public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long writeId, - int stmtId, boolean isMmTable, Set filesKept, boolean isBaseDir) throws IOException { + public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, String unionSuffix, + int dpLevels, int numBuckets, Configuration hconf, Long writeId, int stmtId, boolean isMmTable, + Set filesKept, boolean isBaseDir, boolean isDirectInsert) throws IOException { if (fileStats == null) { return null; } @@ -1701,13 +1701,13 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I continue; } - if (isMmTable) { - Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { - throw new IOException("Unexpected non-MM directory name " + mmDir); + if (isMmTable || isDirectInsert) { + Path dir = parts[i].getPath(); + if (!dir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { + throw new IOException("Unexpected direct insert directory name " + dir); } - Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); + Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in direct insert directory {}", dir); if (!StringUtils.isEmpty(unionSuffix)) { path = new Path(path, unionSuffix); @@ -1718,21 +1718,21 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } FileStatus[] items = fs.listStatus(path); - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(items, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); } - } else if (isMmTable && !StringUtils.isEmpty(unionSuffix)) { + } else if ((isMmTable || isDirectInsert) && !StringUtils.isEmpty(unionSuffix)) { FileStatus[] items = fileStats; if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm( - fs.listStatus(new Path(mmDir, unionSuffix)), fs); + Path dir = extractNonDpDirectInsertDir(writeId, stmtId, items, isBaseDir); + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert( + fs.listStatus(new Path(dir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1742,14 +1742,14 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I if (items.length == 0) { return result; } - if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + if (!isMmTable && !isDirectInsert) { + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(items, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); + Path dir = extractNonDpDirectInsertDir(writeId, stmtId, items, isBaseDir); + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(fs.listStatus(dir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1760,16 +1760,17 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return result; } - private static Path extractNonDpMmDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { + private static Path extractNonDpDirectInsertDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) + throws IOException { if (items.length > 1) { - throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); + throw new IOException("Unexpected directories for non-DP direct insert: " + Arrays.toString(items)); } - Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { - throw new IOException("Unexpected non-MM directory " + mmDir); + Path dir = items[0].getPath(); + if (!dir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { + throw new IOException("Unexpected non direct insert directory " + dir); } - Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); - return mmDir; + Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in direct insert directory {}", dir); + return dir; } @@ -1813,7 +1814,7 @@ private static void addBucketFileIfMissing(List result, } } - private static HashMap removeTempOrDuplicateFilesNonMm( + private static HashMap removeTempOrDuplicateFilesNonDirectInsert( FileStatus[] files, FileSystem fs) throws IOException { if (files == null || fs == null) { return null; @@ -3721,8 +3722,9 @@ private static void createTmpDirs(Configuration conf, if (op instanceof FileSinkOperator) { FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); - if (fdesc.isMmTable()) { - continue; // No need to create for MM tables + if (fdesc.isMmTable() || fdesc.isDirectInsert()) { + // No need to create for MM tables, or ACID insert + continue; } Path tempDir = fdesc.getDirName(); if (tempDir != null) { @@ -4135,7 +4137,7 @@ private static void tryDelete(FileSystem fs, Path path) { } } - public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, + public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels, PathFilter filter, long writeId, int stmtId, Configuration conf, Boolean isBaseDir) throws IOException { int skipLevels = dpLevels; @@ -4151,9 +4153,9 @@ private static void tryDelete(FileSystem fs, Path path) { // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. if (stmtId < 0 || isBaseDir == null || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { - return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); + return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); + return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4176,7 +4178,7 @@ private static boolean isS3(FileSystem fs) { return paths; } - private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs, + private static Path[] getDirectInsertDirectoryCandidatesRecursive(FileSystem fs, Path path, int skipLevels, PathFilter filter) throws IOException { String lastRelDir = null; HashSet results = new HashSet(); @@ -4229,7 +4231,7 @@ private static boolean isS3(FileSystem fs) { return results.toArray(new Path[results.size()]); } - private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, + private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { @@ -4246,10 +4248,10 @@ private static boolean isS3(FileSystem fs) { return statusToPath(fs.globStatus(pathPattern, filter)); } - private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, + private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { - Path[] files = getMmDirectoryCandidates( + Path[] files = getDirectInsertDirectoryCandidates( fs, specPath, dpLevels, filter, writeId, stmtId, conf, null); if (files != null) { for (Path path : files) { @@ -4262,7 +4264,7 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif } - public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, + public static void writeCommitManifest(List commitPaths, Path specPath, FileSystem fs, String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (commitPaths.isEmpty()) { return; @@ -4307,15 +4309,15 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp } } - public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, + public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) - throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert) + throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); - tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, + tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels, filter, writeId, stmtId, hconf); return; } @@ -4344,13 +4346,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } - Path[] files = getMmDirectoryCandidates( + Path[] files = getDirectInsertDirectoryCandidates( fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); - ArrayList mmDirectories = new ArrayList<>(); + ArrayList directInsertDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.trace("Looking at path: {}", path); - mmDirectories.add(path); + directInsertDirectories.add(path); } } @@ -4383,15 +4385,15 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } } - for (Path path : mmDirectories) { - cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed); + for (Path path : directInsertDirectories) { + cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed); } if (!committed.isEmpty()) { throw new HiveException("The following files were committed but not found: " + committed); } - if (mmDirectories.isEmpty()) { + if (directInsertDirectories.isEmpty()) { return; } @@ -4402,13 +4404,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles // doesn't need tocheck anything except path and directory status for MM directories. - FileStatus[] finalResults = new FileStatus[mmDirectories.size()]; - for (int i = 0; i < mmDirectories.size(); ++i) { - finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); + FileStatus[] finalResults = new FileStatus[directInsertDirectories.size()]; + for (int i = 0; i < directInsertDirectories.size(); ++i) { + finalResults[i] = new PathOnlyFileStatus(directInsertDirectories.get(i)); } List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId, - isMmTable, null, isInsertOverwrite); + isMmTable, null, isInsertOverwrite, isDirectInsert); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { assert mbc != null; @@ -4422,7 +4424,7 @@ public PathOnlyFileStatus(Path path) { } } - private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix, + private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, int lbLevels, HashSet committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); @@ -4433,7 +4435,7 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix if (child.isDirectory()) { Utilities.FILE_OP_LOGGER.trace( "Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1); - cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); + cleanDirectInsertDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); } else { if (committed.contains(childPath)) { throw new HiveException("LB FSOP has commited " @@ -4456,8 +4458,8 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix } deleteUncommitedFile(childPath, fs); } else if (childPath.getName().equals(unionSuffix)) { - // Found the right union directory; treat it as "our" MM directory. - cleanMmDirectory(childPath, fs, null, 0, committed); + // Found the right union directory; treat it as "our" directory. + cleanDirectInsertDirectory(childPath, fs, null, 0, committed); } else { String childName = childPath.getName(); if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index fdd3e4636b..0ab178eb0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2563,7 +2563,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // we have multiple statements anyway is union. Utilities.FILE_OP_LOGGER.trace( "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP); - Path[] leafStatus = Utilities.getMmDirectoryCandidates( + Path[] leafStatus = Utilities.getDirectInsertDirectoryCandidates( fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. @@ -2848,7 +2848,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); - boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); + boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); // isDirectWrite & writeType == INSERT boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { @@ -2875,7 +2875,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType // Either a non-MM query, or a load into MM table from an external source. Path tblPath = tbl.getPath(); Path destPath = tblPath; - if (isMmTable) { + if (isMmTable) { // isDirectWrite & writeType == INSERT assert !isAcidIUDoperation; // We will load into MM directory, and hide previous directories if needed. destPath = new Path(destPath, isInsertOverwrite diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 3277765a7c..603cb9f0bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -68,6 +68,8 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; @@ -1384,7 +1386,7 @@ public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName; Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsopPath, fsInputDesc.isMmTable()); + mvTasks, fsopPath, fsInputDesc.isMmTable(), fsInputDesc.isDirectInsert()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState); @@ -1871,8 +1873,8 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { .isSkewedStoredAsDir(); } - public static Task findMoveTaskForFsopOutput( - List> mvTasks, Path fsopFinalDir, boolean isMmFsop) { + public static Task findMoveTaskForFsopOutput(List> mvTasks, Path fsopFinalDir, + boolean isMmFsop, boolean isDirectInsert) { // find the move task for (Task mvTsk : mvTasks) { MoveWork mvWork = mvTsk.getWork(); @@ -1881,7 +1883,7 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { if (mvWork.getLoadFileWork() != null) { srcDir = mvWork.getLoadFileWork().getSourcePath(); isLfd = true; - if (isMmFsop) { + if (isMmFsop || isDirectInsert) { srcDir = srcDir.getParent(); } } else if (mvWork.getLoadTableWork() != null) { @@ -1912,8 +1914,8 @@ public static boolean isMergeRequired(List> mvTasks, HiveConf hco // no need of merging if the move is to a local file system // We are looking based on the original FSOP, so use the original path as is. - MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable()); + MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), + fsOp.getConf().isMmTable(), fsOp.getConf().isDirectInsert()); // TODO: wtf?!! why is this in this method? This has nothing to do with anything. if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) @@ -1986,10 +1988,16 @@ public static Path createMoveTask(Task currTask, boolean Path dest = null; FileSinkDesc fileSinkDesc = fsOp.getConf(); - boolean isMmTable = fileSinkDesc.isMmTable(); + boolean isMmTable = fileSinkDesc.isMmTable(); + boolean isDirectInsert = fileSinkDesc.isDirectInsert(); if (chDir) { dest = fileSinkDesc.getMergeInputDirName(); - if (!isMmTable) { + /** + * Skip temporary file generation for: + * 1. MM Tables + * 2. INSERT operation on full ACID table + */ + if ((!isMmTable) && (!isDirectInsert)) { // generate the temporary file // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); @@ -2020,8 +2028,8 @@ public static Path createMoveTask(Task currTask, boolean Task mvTask = null; if (!chDir) { - mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable()); + mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), isMmTable, + isDirectInsert); } // Set the move task to be dependent on the current task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f89e8f8994..651bcabb06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7207,6 +7207,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Table destinationTable = null; // destination table if any boolean destTableIsTransactional; // true for full ACID table and MM table boolean destTableIsFullAcid; // should the destination table be written to using ACID + boolean isDirectInsert = false; // should we add files directly to the final path boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition destinationPartition = null;// destination partition if any @@ -7220,7 +7221,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map partSpec = null; - boolean isMmTable = false, isMmCtas = false; + boolean isMmTable = false, isMmCtas = false, isNonNativeTable = false; Long writeId = null; HiveTxnManager txnMgr = getTxnMgr(); @@ -7264,9 +7265,25 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath()); } - boolean isNonNativeTable = destinationTable.isNonNative(); + isNonNativeTable = destinationTable.isNonNative(); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - if (isNonNativeTable || isMmTable) { + AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; + // this table_desc does not contain the partitioning columns + tableDescriptor = Utilities.getTableDesc(destinationTable); + + if (!isNonNativeTable) { + if (destTableIsFullAcid) { + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + } + } + isDirectInsert = destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT); + /** + * We will directly insert to the final destination in the following cases: + * 1. Non native table + * 2. Micro-managed (insert only table) + * 3. Full ACID table and operation type is INSERT + */ + if (isNonNativeTable || isMmTable || isDirectInsert) { queryTmpdir = destinationPath; } else { queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); @@ -7279,8 +7296,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // set the root of the temporary path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); } - // this table_desc does not contain the partitioning columns - tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); @@ -7299,7 +7314,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // Create the work for moving the table // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { - AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM @@ -7333,6 +7347,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); ltd.setInsertOverwrite(!isInsertInto); + ltd.setIsDirectInsert(isDirectInsert); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -7391,13 +7406,33 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } } + isNonNativeTable = destinationTable.isNonNative(); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath); + AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; + isDirectInsert = destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT); + // this table_desc does not contain the partitioning columns + tableDescriptor = Utilities.getTableDesc(destinationTable); + + if (!isNonNativeTable) { + if (destTableIsFullAcid) { + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + } + } + /** + * We will directly insert to the final destination in the following cases: + * 1. Non native table + * 2. Micro-managed (insert only table) + * 3. Full ACID table and operation type is INSERT + */ + if (isNonNativeTable || isMmTable || isDirectInsert) { + queryTmpdir = destinationPath; + } else { + queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying " + queryTmpdir + " from " + destinationPath); } - tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); @@ -7412,7 +7447,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(), destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(), destinationPartition.isStoredAsSubDirectories()); - AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM? @@ -7445,6 +7479,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); ltd.setInsertOverwrite(!isInsertInto); + ltd.setIsDirectInsert(isDirectInsert); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -7635,7 +7670,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // the metastore table, then we move and commit the partitions. At least for the time being, // this order needs to be enforced because metastore expects a table to exist before we can // add any partitions to it. - boolean isNonNativeTable = tableDescriptor.isNonNative(); + isNonNativeTable = tableDescriptor.isNonNative(); if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { @@ -7680,7 +7715,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) default: throw new SemanticException("Unknown destination type: " + destType); } - + if (!(destType == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) { input = genConversionSelectOperator(dest, qb, input, tableDescriptor, dpCtx); } @@ -7727,7 +7762,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition, destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, destinationTable, writeId, isMmCtas, destType, qb); + canBeMerged, destinationTable, writeId, isMmCtas, destType, qb, isDirectInsert); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7897,7 +7932,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, - Integer dest_type, QB qb) throws SemanticException { + Integer dest_type, QB qb, boolean isDirectInsert) throws SemanticException { boolean isInsertOverwrite = false; switch (dest_type) { case QBMetaData.DEST_PARTITION: @@ -7921,7 +7956,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery()); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(), isDirectInsert); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 757cb7af4d..ee04dca1d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -433,7 +433,7 @@ public static Path createMoveTask(Task currTask, boolean Task mvTask = null; if (!chDir) { - mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false); + mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false, false); } // Set the move task to be dependent on the current task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 61ea28a5f5..99f47c4462 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -112,6 +112,8 @@ private boolean isUsingBatchingSerDe = false; private boolean isInsertOverwrite = false; + + private boolean isDirectInsert = false; private boolean isQuery = false; @@ -125,7 +127,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isDirectInsert) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -143,6 +145,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.isMmCtas = isMmCtas; this.isInsertOverwrite = isInsertOverwrite; this.isQuery = isQuery; + this.isDirectInsert = isDirectInsert; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -164,7 +167,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isDirectInsert); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -181,6 +184,7 @@ public Object clone() throws CloneNotSupportedException { ret.setIsMerge(isMerge); ret.setFilesToFetch(filesToFetch); ret.setIsQuery(isQuery); + ret.setIsDirectInsert(isDirectInsert); return ret; } @@ -215,6 +219,14 @@ public boolean isUsingBatchingSerDe() { public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) { this.isUsingBatchingSerDe = isUsingBatchingSerDe; } + + public void setIsDirectInsert(boolean isDirectInsert) { + this.isDirectInsert = isDirectInsert; + } + + public boolean isDirectInsert() { + return this.isDirectInsert; + } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) public Path getDirName() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index bed05819b5..a98314eefb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -42,7 +42,8 @@ private boolean inheritLocation = false; // A silly setting. private int stmtId; private Long currentWriteId; - private boolean isInsertOverwrite; + private boolean isInsertOverwrite; + private boolean isDirectInsert; // TODO: the below seem like they should just be combined into partitionDesc private Table mdTable; @@ -235,6 +236,14 @@ public void setInsertOverwrite(boolean v) { this.isInsertOverwrite = v; } + public void setIsDirectInsert(boolean isDirectInsert) { + this.isDirectInsert = isDirectInsert; + } + + public boolean isDirectInsert() { + return this.isDirectInsert; + } + /** * @return the lbCtx */ diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a75103d60d..2c4b69b2fe 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); }