diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 5fd0ef9161..4c63a8e822 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -107,7 +107,7 @@ protected void setUp() { db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, - LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false); + LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, false); i++; } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index d59cfe51e9..167a9afade 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -90,6 +90,8 @@ public void setup() throws Exception { hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false); TxnDbUtil.setConfValues(hiveConf); TxnDbUtil.cleanDb(hiveConf); TxnDbUtil.prepDb(hiveConf); @@ -345,7 +347,7 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws // Clean up executeStatementOnDriver("drop table " + tblName, driver); } - + private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames) throws Exception { HiveConf hiveConf = new HiveConf(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index bb89f803d5..3a6e43fd52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -259,7 +259,7 @@ public void closeOp(boolean abort) throws HiveException { assert finalPath.equals(outPath); // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. - Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), + Utilities.writeCommitManifest(Lists.newArrayList(outPath), tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } @@ -339,8 +339,8 @@ public void jobCloseOp(Configuration hconf, boolean success) lbLevels = conf.getListBucketingDepth(); // We don't expect missing buckets from mere (actually there should be no buckets), // so just pass null as bucketing context. Union suffix should also be accounted for. - Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success, - dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false); + Utilities.handleDirectInsertTableFinalPath(outputDir.getParent(), null, hconf, success, + dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false, false); } } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 9ad4e71482..515a2e708f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -161,6 +161,8 @@ private final String subdirForTxn; private Path taskOutputTempPathRoot; Path[] outPaths; + // The bucket files we successfully wrote to in this writer + Path[] outPathsCommitted; Path[] finalPaths; RecordWriter[] outWriters; RecordUpdater[] updaters; @@ -168,19 +170,29 @@ int acidLastBucket = -1; int acidFileOffset = -1; private boolean isMmTable; + private boolean isDirectInsert; String dpDirForCounters; - public FSPaths(Path specPath, boolean isMmTable) { - this.isMmTable = isMmTable; - if (!isMmTable) { + public FSPaths(Path specPath, boolean isMmTable, boolean isDirectInsert) { + this.isMmTable = isMmTable; + this.isDirectInsert = isDirectInsert; + if (!isMmTable && !isDirectInsert) { tmpPathRoot = Utilities.toTempPath(specPath); taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath); subdirForTxn = null; } else { tmpPathRoot = specPath; taskOutputTempPathRoot = null; // Should not be used. - subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), - conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId()); + if (isMmTable) { + subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), + conf.getTableWriteId(), conf.getTableWriteId(), conf.getStatementId()); + } else { + /** + * For direct write to final path during ACID insert, we create the delta directories + * later when we create the RecordUpdater using AcidOutputFormat.Options + */ + subdirForTxn = null; + } } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles @@ -188,6 +200,7 @@ public FSPaths(Path specPath, boolean isMmTable) { } outPaths = new Path[numFiles]; + outPathsCommitted = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; updaters = new RecordUpdater[numFiles]; @@ -211,7 +224,11 @@ public void closeWriters(boolean abort) throws HiveException { try { for (int i = 0; i < updaters.length; i++) { if (updaters[i] != null) { - updaters[i].close(abort); + // Ignore 0 row files + if (isDirectInsert && (row_count.get() > 0)) { + outPathsCommitted[i] = updaters[i].getUpdatedFilePath(); + } + updaters[i].close(abort); } } } catch (IOException e) { @@ -249,7 +266,9 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) if (isMmTable) { assert outPaths[idx].equals(finalPaths[idx]); commitPaths.add(outPaths[idx]); - } else if (!fs.rename(outPaths[idx], finalPaths[idx])) { + } else if (isDirectInsert && (outPathsCommitted[idx] != null)) { + commitPaths.add(outPathsCommitted[idx]); + } else if (!isDirectInsert && !fs.rename(outPaths[idx], finalPaths[idx])) { FileStatus fileStatus = FileUtils.getFileStatusOrNull(fs, finalPaths[idx]); if (fileStatus != null) { LOG.warn("Target path " + finalPaths[idx] + " with a size " + fileStatus.getLen() + " exists. Trying to delete it."); @@ -264,7 +283,6 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } } } - updateProgress(); } @@ -290,7 +308,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); String taskWithExt = extension == null ? taskId : taskId + extension; - if (!isMmTable) { + if (!isMmTable && !isDirectInsert) { if (!bDynParts && !isSkewedStoredAsSubDirectories) { finalPaths[filesIdx] = new Path(parent, taskWithExt); } else { @@ -307,8 +325,13 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT if (extension != null) { taskIdPath += extension; } - - Path finalPath = new Path(buildTmpPath(), taskIdPath); + + Path finalPath; + if (isDirectInsert) { + finalPath = buildTmpPath(); + } else { + finalPath = new Path(buildTmpPath(), taskIdPath); + } // In the cases that have multi-stage insert, e.g. a "hive.skewjoin.key"-based skew join, // it can happen that we want multiple commits into the same directory from different @@ -319,7 +342,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT // affects some less obscure scenario. try { FileSystem fpfs = finalPath.getFileSystem(hconf); - if (fpfs.exists(finalPath)) { + if ((!isDirectInsert) && fpfs.exists(finalPath)) { throw new RuntimeException(finalPath + " already exists"); } } catch (IOException e) { @@ -330,7 +353,7 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } if (LOG.isInfoEnabled()) { LOG.info("Final Path: FS " + finalPaths[filesIdx]); - if (LOG.isInfoEnabled() && !isMmTable) { + if (LOG.isInfoEnabled() && (!isMmTable && !isDirectInsert)) { LOG.info("Writing to temp file: FS " + outPaths[filesIdx]); } } @@ -468,10 +491,13 @@ private void initializeSpecPath() { unionPath = null; } else { isUnionDp = (dpCtx != null); - if (conf.isMmTable() || isUnionDp) { + if (conf.isMmTable() || isUnionDp) { // MM tables need custom handling for union suffix; DP tables use parent too. specPath = conf.getParentDir(); unionPath = conf.getDirName().getName(); + } else if (conf.isDirectInsert()) { + specPath = conf.getParentDir(); + unionPath = null; } else { // For now, keep the old logic for non-MM non-DP union case. Should probably be unified. specPath = conf.getDirName(); @@ -526,7 +552,11 @@ protected void initializeOp(Configuration hconf) throws HiveException { throw ex; } isCompressed = conf.getCompressed(); - parent = Utilities.toTempPath(conf.getDirName()); + if (conf.isLinkedFileSink() && conf.isDirectInsert()) { + parent = Utilities.toTempPath(conf.getFinalDirName()); + } else { + parent = Utilities.toTempPath(conf.getDirName()); + } statsFromRecordWriter = new boolean[numFiles]; serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties()); @@ -565,7 +595,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } if (!bDynParts) { - fsp = new FSPaths(specPath, conf.isMmTable()); + fsp = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert()); fsp.subdirAfterTxn = combinePathFragments(generateListBucketingDirName(null), unionPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp) @@ -740,7 +770,7 @@ protected void createBucketFiles(FSPaths fsp) throws HiveException { assert filesIdx == numFiles; // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable()) { + if (isNativeTable() && fs != null && fsp != null && !conf.isMmTable() && !conf.isDirectInsert()) { autoDelete = fs.deleteOnExit(fsp.outPaths[0]); } } catch (Exception e) { @@ -764,7 +794,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); } - if (isNativeTable() && !conf.isMmTable()) { + if (isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()) { // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } @@ -779,7 +809,7 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) // If MM wants to create a new base for IOW (instead of delta dir), it should specify it here if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { Path outPath = fsp.outPaths[filesIdx]; - if (conf.isMmTable() + if (conf.isMmTable() && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { LOG.warn("Unable to create directory with inheritPerms: " + outPath); } @@ -790,12 +820,18 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) StatsProvidingRecordWriter; // increment the CREATED_FILES counter } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { + Path outPath = fsp.outPaths[filesIdx]; + if (conf.isDirectInsert() + && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { + LOG.warn("Unable to create directory with inheritPerms: " + outPath); + } // Only set up the updater for insert. For update and delete we don't know unitl we see // the row. ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector; int acidBucketNum = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId)); fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), - acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); + acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); // outPath.getParent() + } if (reporter != null) { @@ -824,9 +860,9 @@ private void updateDPCounters(final FSPaths fsp, final int filesIdx) { // /// // for non-MM tables, the final destination partition directory is created during move task via rename - // for MM tables, the final destination partition directory is created by the tasks themselves + // for MM tables and ACID insert, the final destination partition directory is created by the tasks themselves try { - if (conf.isMmTable()) { + if (conf.isMmTable() || conf.isDirectInsert()) { createDpDir(destPartPath); } else { // outPath will be @@ -1086,7 +1122,7 @@ assert getConf().getWriteType() != AcidUtils.Operation.DELETE && * @throws HiveException */ private FSPaths createNewPaths(String dpDir, String lbDir) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); + FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable(), conf.isDirectInsert()); fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath); fsp2.subdirBeforeTxn = dpDir; String pathKey = combinePathFragments(dpDir, lbDir); @@ -1306,7 +1342,7 @@ public void closeOp(boolean abort) throws HiveException { // record writer already gathers the statistics, it can simply return the // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { for (int idx = 0; idx < fsp.outWriters.length; idx++) { RecordWriter outWriter = fsp.outWriters[idx]; if (outWriter != null) { @@ -1337,9 +1373,9 @@ public void closeOp(boolean abort) throws HiveException { SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths); } } - if (conf.isMmTable()) { - Utilities.writeMmCommitManifest(commitPaths, specPath, fs, originalTaskId, - conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); + if (conf.isMmTable() || conf.isDirectInsert()) { + Utilities.writeCommitManifest(commitPaths, specPath, fs, originalTaskId, conf.getTableWriteId(), conf + .getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1350,7 +1386,7 @@ public void closeOp(boolean abort) throws HiveException { // Hadoop always call close() even if an Exception was thrown in map() or // reduce(). for (FSPaths fsp : valToPaths.values()) { - fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable()); + fsp.abortWriters(fs, abort, !autoDelete && isNativeTable() && !conf.isMmTable() && !conf.isDirectInsert()); } } fsp = prevFsp = null; @@ -1383,10 +1419,14 @@ public void jobCloseOp(Configuration hconf, boolean success) specPath = conf.getParentDir(); unionSuffix = conf.getDirName().getName(); } + if (conf.isLinkedFileSink() && conf.isDirectInsert()) { + specPath = conf.getParentDir(); + unionSuffix = null; + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath); } - if (!conf.isMmTable()) { + if (!conf.isMmTable() && !conf.isDirectInsert()) { Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); } else { int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), @@ -1396,9 +1436,9 @@ public void jobCloseOp(Configuration hconf, boolean success) : (dpCtx != null ? dpCtx.getNumBuckets() : 0); MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); - Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, - dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter, - conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); + Utilities.handleDirectInsertTableFinalPath(specPath, unionSuffix, hconf, success, dpLevels, lbLevels, mbc, + conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf + .getInsertOverwrite(), conf.isDirectInsert()); } } } catch (IOException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 695d08bbe2..7b9a8f9478 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -346,7 +346,7 @@ public int execute(DriverContext driverContext) { // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. - // This is also used for MM table conversion. + // This is also used for MM table conversion. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); if (lmfd != null) { boolean isDfsDir = lmfd.getIsDfsDir(); @@ -422,7 +422,7 @@ public int execute(DriverContext driverContext) { db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(), - tbd.isInsertOverwrite()); + tbd.isInsertOverwrite(), tbd.isDirectInsert()); if (work.getOutputs() != null) { DDLUtils.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -524,7 +524,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), resetStatisticsProps(table), tbd.getWriteId(), tbd.getStmtId(), - tbd.isInsertOverwrite()); + tbd.isInsertOverwrite(), tbd.isDirectInsert()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); // See the comment inside updatePartitionBucketSortColumns. @@ -571,7 +571,9 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, tbd.getStmtId(), resetStatisticsProps(table), work.getLoadTableWork().getWriteType(), - tbd.isInsertOverwrite()); + tbd.isInsertOverwrite(), + tbd.isDirectInsert() + ); // publish DP columns to its subscribers if (dps != null && dps.size() > 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 437266355a..d1e67ad60e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1535,7 +1535,7 @@ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; return removeTempOrDuplicateFiles( - fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); + fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir, false); } private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { @@ -1552,9 +1552,9 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I return true; } - public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long writeId, - int stmtId, boolean isMmTable, Set filesKept, boolean isBaseDir) throws IOException { + public static List removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, String unionSuffix, + int dpLevels, int numBuckets, Configuration hconf, Long writeId, int stmtId, boolean isMmTable, + Set filesKept, boolean isBaseDir, boolean isDirectInsert) throws IOException { if (fileStats == null) { return null; } @@ -1571,13 +1571,13 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I continue; } - if (isMmTable) { - Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { - throw new IOException("Unexpected non-MM directory name " + mmDir); + if (isMmTable || isDirectInsert) { + Path dir = parts[i].getPath(); + if (!dir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { + throw new IOException("Unexpected direct insert directory name " + dir); } - Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); + Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in direct insert directory {}", dir); if (!StringUtils.isEmpty(unionSuffix)) { path = new Path(path, unionSuffix); @@ -1588,102 +1588,116 @@ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws I } FileStatus[] items = fs.listStatus(path); - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(items, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } - addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); + addBucketFileToResults(taskIDToFile, numBuckets, hconf, result, isDirectInsert); } - } else if (isMmTable && !StringUtils.isEmpty(unionSuffix)) { + } else if ((isMmTable || isDirectInsert) && !StringUtils.isEmpty(unionSuffix)) { FileStatus[] items = fileStats; if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm( - fs.listStatus(new Path(mmDir, unionSuffix)), fs); + Path dir = extractNonDpDirectInsertDir(writeId, stmtId, items, isBaseDir); + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert( + fs.listStatus(new Path(dir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } - addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result); + addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result, isDirectInsert); } else { FileStatus[] items = fileStats; if (items.length == 0) { return result; } - if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + if (!isMmTable && !isDirectInsert) { + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(items, fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); - if (filesKept != null && taskIDToFile != null) { + Path dir = extractNonDpDirectInsertDir(writeId, stmtId, items, isBaseDir); + PathFilter orcAcidVersionFilter = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.equals(AcidUtils.OrcAcidVersion.ACID_FORMAT); + } + }; + taskIDToFile = removeTempOrDuplicateFilesNonDirectInsert(fs.listStatus(dir, orcAcidVersionFilter), fs); + if (filesKept != null && taskIDToFile != null) {// filesKept??? addFilesToPathSet(taskIDToFile.values(), filesKept); } } - addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result); + addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result, isDirectInsert); } return result; } - private static Path extractNonDpMmDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { + private static Path extractNonDpDirectInsertDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) + throws IOException { if (items.length > 1) { - throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); + throw new IOException("Unexpected directories for non-DP direct insert: " + Arrays.toString(items)); } - Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { - throw new IOException("Unexpected non-MM directory " + mmDir); + Path dir = items[0].getPath(); + if (!dir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { + throw new IOException("Unexpected non direct insert directory " + dir); } - Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); - return mmDir; + Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in direct insert directory {}", dir); + return dir; } // TODO: not clear why two if conditions are different. Preserve the existing logic for now. private static void addBucketFileToResults2(HashMap taskIDToFile, - int numBuckets, Configuration hconf, List result) { + int numBuckets, Configuration hconf, List result, boolean isDirectInsert) { if (MapUtils.isNotEmpty(taskIDToFile) && (numBuckets > taskIDToFile.size()) && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { - addBucketsToResultsCommon(taskIDToFile, numBuckets, result); + addBucketsToResultsCommon(taskIDToFile, numBuckets, result, isDirectInsert); } } // TODO: not clear why two if conditions are different. Preserve the existing logic for now. private static void addBucketFileToResults(HashMap taskIDToFile, - int numBuckets, Configuration hconf, List result) { + int numBuckets, Configuration hconf, List result, boolean isDirectInsert) { // if the table is bucketed and enforce bucketing, we should check and generate all buckets if (numBuckets > 0 && taskIDToFile != null && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { - addBucketsToResultsCommon(taskIDToFile, numBuckets, result); + addBucketsToResultsCommon(taskIDToFile, numBuckets, result, isDirectInsert); } } - private static void addBucketsToResultsCommon( - HashMap taskIDToFile, int numBuckets, List result) { + private static void addBucketsToResultsCommon(HashMap taskIDToFile, int numBuckets, + List result, boolean isDirectInsert) { String taskID1 = taskIDToFile.keySet().iterator().next(); Path bucketPath = taskIDToFile.values().iterator().next().getPath(); for (int j = 0; j < numBuckets; ++j) { - addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); + addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j, isDirectInsert); } } - private static void addBucketFileIfMissing(List result, - HashMap taskIDToFile, String taskID1, Path bucketPath, int j) { + private static void addBucketFileIfMissing(List result, HashMap taskIDToFile, + String taskID1, Path bucketPath, int j, boolean isDirectInsert) { String taskID2 = replaceTaskId(taskID1, j); if (!taskIDToFile.containsKey(taskID2)) { // create empty bucket, file name should be derived from taskID2 URI bucketUri = bucketPath.toUri(); - String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); - Utilities.FILE_OP_LOGGER.trace("Creating an empty bucket file {}", path2); - result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); + if (isDirectInsert) { + Path missingBucketPath = AcidUtils.createBucketFile(bucketPath.getParent(), j); + Utilities.FILE_OP_LOGGER.trace("Creating an empty bucket file {}", missingBucketPath.toString()); + result.add(missingBucketPath); + } else { + String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); + Utilities.FILE_OP_LOGGER.trace("Creating an empty bucket file {}", path2); + result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); + } } } - private static HashMap removeTempOrDuplicateFilesNonMm( + private static HashMap removeTempOrDuplicateFilesNonDirectInsert( FileStatus[] files, FileSystem fs) throws IOException { if (files == null || fs == null) { return null; @@ -3555,8 +3569,9 @@ private static void createTmpDirs(Configuration conf, if (op instanceof FileSinkOperator) { FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); - if (fdesc.isMmTable()) { - continue; // No need to create for MM tables + if (fdesc.isMmTable() || fdesc.isDirectInsert()) { + // No need to create for MM tables, or ACID insert + continue; } Path tempDir = fdesc.getDirName(); if (tempDir != null) { @@ -3969,7 +3984,7 @@ private static void tryDelete(FileSystem fs, Path path) { } } - public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, + public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels, PathFilter filter, long writeId, int stmtId, Configuration conf, Boolean isBaseDir) throws IOException { int skipLevels = dpLevels; @@ -3985,9 +4000,9 @@ private static void tryDelete(FileSystem fs, Path path) { // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. if (stmtId < 0 || isBaseDir == null || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { - return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); + return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); + return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4010,7 +4025,7 @@ private static boolean isS3(FileSystem fs) { return paths; } - private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs, + private static Path[] getDirectInsertDirectoryCandidatesRecursive(FileSystem fs, Path path, int skipLevels, PathFilter filter) throws IOException { String lastRelDir = null; HashSet results = new HashSet(); @@ -4063,7 +4078,7 @@ private static boolean isS3(FileSystem fs) { return results.toArray(new Path[results.size()]); } - private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, + private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { @@ -4080,10 +4095,10 @@ private static boolean isS3(FileSystem fs) { return statusToPath(fs.globStatus(pathPattern, filter)); } - private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, + private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir, int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { - Path[] files = getMmDirectoryCandidates( + Path[] files = getDirectInsertDirectoryCandidates( fs, specPath, dpLevels, filter, writeId, stmtId, conf, null); if (files != null) { for (Path path : files) { @@ -4096,7 +4111,7 @@ private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manif } - public static void writeMmCommitManifest(List commitPaths, Path specPath, FileSystem fs, + public static void writeCommitManifest(List commitPaths, Path specPath, FileSystem fs, String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (commitPaths.isEmpty()) { return; @@ -4141,15 +4156,15 @@ public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isComp } } - public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, + public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, - Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) - throws IOException, HiveException { + Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert) + throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); - tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, + tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels, filter, writeId, stmtId, hconf); return; } @@ -4178,13 +4193,13 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } - Path[] files = getMmDirectoryCandidates( + Path[] files = getDirectInsertDirectoryCandidates( fs, specPath, dpLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); - ArrayList mmDirectories = new ArrayList<>(); + ArrayList directInsertDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.trace("Looking at path: {}", path); - mmDirectories.add(path); + directInsertDirectories.add(path); } } @@ -4217,15 +4232,15 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } } - for (Path path : mmDirectories) { - cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed); + for (Path path : directInsertDirectories) { + cleanDirectInsertDirectory(path, fs, unionSuffix, lbLevels, committed); } if (!committed.isEmpty()) { throw new HiveException("The following files were committed but not found: " + committed); } - if (mmDirectories.isEmpty()) { + if (directInsertDirectories.isEmpty()) { return; } @@ -4236,15 +4251,15 @@ public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Con } // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles // doesn't need tocheck anything except path and directory status for MM directories. - FileStatus[] finalResults = new FileStatus[mmDirectories.size()]; - for (int i = 0; i < mmDirectories.size(); ++i) { - finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); + FileStatus[] finalResults = new FileStatus[directInsertDirectories.size()]; + for (int i = 0; i < directInsertDirectories.size(); ++i) { + finalResults[i] = new PathOnlyFileStatus(directInsertDirectories.get(i)); } List emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId, - isMmTable, null, isInsertOverwrite); + isMmTable, null, isInsertOverwrite, isDirectInsert); // create empty buckets if necessary - if (!emptyBuckets.isEmpty()) { + if (!emptyBuckets.isEmpty() && !isDirectInsert) { assert mbc != null; Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter); } @@ -4256,7 +4271,7 @@ public PathOnlyFileStatus(Path path) { } } - private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix, + private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, int lbLevels, HashSet committed) throws IOException, HiveException { for (FileStatus child : fs.listStatus(dir)) { Path childPath = child.getPath(); @@ -4267,7 +4282,7 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix if (child.isDirectory()) { Utilities.FILE_OP_LOGGER.trace( "Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1); - cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); + cleanDirectInsertDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); } else { if (committed.contains(childPath)) { throw new HiveException("LB FSOP has commited " @@ -4282,7 +4297,9 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix if (committed.remove(childPath)) { continue; // A good file. } - deleteUncommitedFile(childPath, fs); + if (!childPath.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) { + deleteUncommitedFile(childPath, fs); + } } else if (!child.isDirectory()) { if (committed.contains(childPath)) { throw new HiveException("Union FSOP has commited " @@ -4290,8 +4307,8 @@ private static void cleanMmDirectory(Path dir, FileSystem fs, String unionSuffix } deleteUncommitedFile(childPath, fs); } else if (childPath.getName().equals(unionSuffix)) { - // Found the right union directory; treat it as "our" MM directory. - cleanMmDirectory(childPath, fs, null, 0, committed); + // Found the right union directory; treat it as "our" directory. + cleanDirectInsertDirectory(childPath, fs, null, 0, committed); } else { String childName = childPath.getName(); if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 295fe7cbd0..e3ed0026db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -2138,7 +2138,7 @@ public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOExc */ public static final class OrcAcidVersion { private static final String ACID_VERSION_KEY = "hive.acid.version"; - private static final String ACID_FORMAT = "_orc_acid_version"; + public static final String ACID_FORMAT = "_orc_acid_version"; private static final Charset UTF8 = Charset.forName("UTF-8"); public static final int ORC_ACID_VERSION_DEFAULT = 0; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 737e6774b7..9cefd11472 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.SerDeStats; /** @@ -79,4 +80,9 @@ * @return - buffered row count */ long getBufferedRowCount(); + + /** + * Returns the path of the file this updater wrote to + */ + public Path getUpdatedFilePath(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index c4c56f8477..202f78b81c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -268,6 +268,11 @@ private String stringifyObject(Object obj, stringifyObject(buffer, obj, inspector); return buffer.toString(); } + + @Override + public Path getUpdatedFilePath() { + return null; + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 3fa61d3560..160485cc1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -571,7 +571,7 @@ public void close(boolean abort) throws IOException { LOG.debug("Closing writer for path: {} acid stats: {}", path, indexBuilder.acidStats); } writer.close(); // normal close, when there are inserts. - } + } } else { if (options.isWritingBase()) { // With insert overwrite we need the empty file to delete the previous content of the table @@ -810,4 +810,9 @@ private int setBucket(int bucketProperty, int operation) { bucket.set(bucketProperty); return currentBucketProperty; } + + @Override + public Path getUpdatedFilePath() { + return path; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 691f3ee2e9..0b8f4cc09e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1983,7 +1983,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean resetStatistics, Long writeId, - int stmtId, boolean isInsertOverwrite) throws HiveException { + int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_PARTITION); @@ -2000,7 +2000,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par Partition newTPart = loadPartitionInternal(loadPath, tbl, partSpec, oldPart, loadFileType, inheritTableSpecs, inheritLocation, isSkewedStoreAsSubdir, isSrcLocal, isAcidIUDoperation, - resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert); AcidUtils.TableSnapshot tableSnapshot = isTxnTable ? getTableSnapshot(tbl, writeId) : null; if (tableSnapshot != null) { @@ -2071,9 +2071,9 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map newFiles) throws HiveException { + boolean isTxnTable, List newFiles, boolean isDirectInsert) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); - boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); + boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); try { @@ -2118,13 +2118,12 @@ private Partition loadPartitionInternal(Path loadPath, Table tbl, Map getValidPartitionsInPath( + private Set getValidPartitionsInPath( // pass isDirectInsert from tbd int numDP, int numLB, Path loadPath, Long writeId, int stmtId, - boolean isMmTable, boolean isInsertOverwrite) throws HiveException { + boolean isMmTable, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException { Set validPartitions = new HashSet(); try { FileSystem fs = loadPath.getFileSystem(conf); - if (!isMmTable) { + if (!isMmTable || !isDirectInsert) { List leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs); // Check for empty partitions for (FileStatus s : leafStatus) { @@ -2584,7 +2582,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // we have multiple statements anyway is union. Utilities.FILE_OP_LOGGER.trace( "Looking for dynamic partitions in {} ({} levels)", loadPath, numDP); - Path[] leafStatus = Utilities.getMmDirectoryCandidates( + Path[] leafStatus = Utilities.getDirectInsertDirectoryCandidates( fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite); for (Path p : leafStatus) { Path dpPath = p.getParent(); // Skip the MM directory that we have found. @@ -2632,7 +2630,7 @@ private void constructOneLBLocationMap(FileStatus fSta, final String tableName, final Map partSpec, final LoadFileType loadFileType, final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId, final boolean resetStatistics, final AcidUtils.Operation operation, - boolean isInsertOverwrite) throws HiveException { + boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_DYNAMIC_PARTITIONS); @@ -2640,7 +2638,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // Get all valid partition paths and existing partitions for them (if any) final Table tbl = getTable(tableName); final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId, - AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite); + AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite, isDirectInsert); final int partsToLoad = validPartitions.size(); final AtomicInteger partitionsLoaded = new AtomicInteger(0); @@ -2708,7 +2706,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // load the partition Partition partition = loadPartitionInternal(entry.getKey(), tbl, fullPartSpec, oldPartition, loadFileType, true, false, numLB > 0, false, isAcid, - resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles); + resetStatistics, writeId, stmtId, isInsertOverwrite, isTxnTable, newFiles, isDirectInsert); // if the partition already existed before the loading, no need to add it again to the // metastore @@ -2860,7 +2858,7 @@ private void constructOneLBLocationMap(FileStatus fSta, */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean resetStatistics, - Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + Long writeId, int stmtId, boolean isInsertOverwrite, boolean isDirectInsert) throws HiveException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE); @@ -2877,7 +2875,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } // Note: this assumes both paths are qualified; which they are, currently. - if (((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) { + if (((isMmTable || isDirectInsert || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) { /** * some operations on Transactional tables (e.g. Import) write directly to the final location * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be @@ -2897,7 +2895,6 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType Path tblPath = tbl.getPath(); Path destPath = tblPath; if (isMmTable) { - assert !isAcidIUDoperation; // We will load into MM directory, and hide previous directories if needed. destPath = new Path(destPath, isInsertOverwrite ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 5d6143d6a4..089f6642fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -68,6 +68,8 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; @@ -1382,7 +1384,7 @@ public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName; Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsopPath, fsInputDesc.isMmTable()); + mvTasks, fsopPath, fsInputDesc.isMmTable(), fsInputDesc.isDirectInsert()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState); @@ -1869,8 +1871,8 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { .isSkewedStoredAsDir(); } - public static Task findMoveTaskForFsopOutput( - List> mvTasks, Path fsopFinalDir, boolean isMmFsop) { + public static Task findMoveTaskForFsopOutput(List> mvTasks, Path fsopFinalDir, + boolean isMmFsop, boolean isDirectInsert) { // find the move task for (Task mvTsk : mvTasks) { MoveWork mvWork = mvTsk.getWork(); @@ -1879,7 +1881,7 @@ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { if (mvWork.getLoadFileWork() != null) { srcDir = mvWork.getLoadFileWork().getSourcePath(); isLfd = true; - if (isMmFsop) { + if (isMmFsop || isDirectInsert) { srcDir = srcDir.getParent(); } } else if (mvWork.getLoadTableWork() != null) { @@ -1910,8 +1912,8 @@ public static boolean isMergeRequired(List> mvTasks, HiveConf hco // no need of merging if the move is to a local file system // We are looking based on the original FSOP, so use the original path as is. - MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable()); + MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), + fsOp.getConf().isMmTable(), fsOp.getConf().isDirectInsert()); // TODO: wtf?!! why is this in this method? This has nothing to do with anything. if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) @@ -1984,10 +1986,16 @@ public static Path createMoveTask(Task currTask, boolean Path dest = null; FileSinkDesc fileSinkDesc = fsOp.getConf(); - boolean isMmTable = fileSinkDesc.isMmTable(); + boolean isMmTable = fileSinkDesc.isMmTable(); + boolean isDirectInsert = fileSinkDesc.isDirectInsert(); if (chDir) { dest = fileSinkDesc.getMergeInputDirName(); - if (!isMmTable) { + /** + * Skip temporary file generation for: + * 1. MM Tables + * 2. INSERT operation on full ACID table + */ + if ((!isMmTable) && (!isDirectInsert)) { // generate the temporary file // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); @@ -2018,8 +2026,8 @@ public static Path createMoveTask(Task currTask, boolean Task mvTask = null; if (!chDir) { - mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( - mvTasks, fsOp.getConf().getFinalDirName(), fsOp.getConf().isMmTable()); + mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fsOp.getConf().getFinalDirName(), isMmTable, + isDirectInsert); } // Set the move task to be dependent on the current task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d47457857c..a2b1527396 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7349,6 +7349,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) Table destinationTable = null; // destination table if any boolean destTableIsTransactional; // true for full ACID table and MM table boolean destTableIsFullAcid; // should the destination table be written to using ACID + boolean isDirectInsert = false; // should we add files directly to the final path boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; Partition destinationPartition = null;// destination partition if any @@ -7362,7 +7363,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map partSpec = null; - boolean isMmTable = false, isMmCtas = false; + boolean isMmTable = false, isMmCtas = false, isNonNativeTable = false; Long writeId = null; HiveTxnManager txnMgr = getTxnMgr(); @@ -7406,9 +7407,25 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath()); } - boolean isNonNativeTable = destinationTable.isNonNative(); + isNonNativeTable = destinationTable.isNonNative(); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - if (isNonNativeTable || isMmTable) { + AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; + // this table_desc does not contain the partitioning columns + tableDescriptor = Utilities.getTableDesc(destinationTable); + + if (!isNonNativeTable) { + if (destTableIsFullAcid) { + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + } + } + isDirectInsert = destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT); + /** + * We will directly insert to the final destination in the following cases: + * 1. Non native table + * 2. Micro-managed (insert only table) + * 3. Full ACID table and operation type is INSERT + */ + if (isNonNativeTable || isMmTable || isDirectInsert) { queryTmpdir = destinationPath; } else { queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); @@ -7421,8 +7438,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // set the root of the temporary path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); } - // this table_desc does not contain the partitioning columns - tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); @@ -7452,7 +7467,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // Create the work for moving the table // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { - AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM @@ -7484,8 +7498,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) destinationTable.getDbName(), destinationTable.getTableName()); LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; + if (isDirectInsert) { + loadType = LoadFileType.IGNORE; + } ltd.setLoadFileType(loadType); ltd.setInsertOverwrite(!isInsertInto); + ltd.setIsDirectInsert(isDirectInsert); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -7544,13 +7562,33 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } } + isNonNativeTable = destinationTable.isNonNative(); isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); - queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath); + AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; + isDirectInsert = destTableIsFullAcid && (acidOp == AcidUtils.Operation.INSERT); + // this table_desc does not contain the partitioning columns + tableDescriptor = Utilities.getTableDesc(destinationTable); + + if (!isNonNativeTable) { + if (destTableIsFullAcid) { + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + } + } + /** + * We will directly insert to the final destination in the following cases: + * 1. Non native table + * 2. Micro-managed (insert only table) + * 3. Full ACID table and operation type is INSERT + */ + if (isNonNativeTable || isMmTable || isDirectInsert) { + queryTmpdir = destinationPath; + } else { + queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying " + queryTmpdir + " from " + destinationPath); } - tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); @@ -7576,7 +7614,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(), destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(), destinationPartition.isStoredAsSubDirectories()); - AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM? @@ -7607,8 +7644,12 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // deltas and base and leave them up to the cleaner to clean up LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; + if (isDirectInsert) { + loadType = LoadFileType.IGNORE; + } ltd.setLoadFileType(loadType); ltd.setInsertOverwrite(!isInsertInto); + ltd.setIsDirectInsert(isDirectInsert); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@ -7834,7 +7875,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) // the metastore table, then we move and commit the partitions. At least for the time being, // this order needs to be enforced because metastore expects a table to exist before we can // add any partitions to it. - boolean isNonNativeTable = tableDescriptor.isNonNative(); + isNonNativeTable = tableDescriptor.isNonNative(); if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { @@ -7879,7 +7920,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) default: throw new SemanticException("Unknown destination type: " + destType); } - + if (!(destType == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) { input = genConversionSelectOperator(dest, qb, input, tableDescriptor, dpCtx); } @@ -7926,7 +7967,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition, destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, destinationTable, writeId, isMmCtas, destType, qb); + canBeMerged, destinationTable, writeId, isMmCtas, destType, qb, isDirectInsert); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -8136,7 +8177,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, - Integer dest_type, QB qb) throws SemanticException { + Integer dest_type, QB qb, boolean isDirectInsert) throws SemanticException { boolean isInsertOverwrite = false; switch (dest_type) { case QBMetaData.DEST_PARTITION: @@ -8160,7 +8201,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery()); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(), isDirectInsert); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 757cb7af4d..ee04dca1d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -433,7 +433,7 @@ public static Path createMoveTask(Task currTask, boolean Task mvTask = null; if (!chDir) { - mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false); + mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false, false); } // Set the move task to be dependent on the current task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 61ea28a5f5..99f47c4462 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -112,6 +112,8 @@ private boolean isUsingBatchingSerDe = false; private boolean isInsertOverwrite = false; + + private boolean isDirectInsert = false; private boolean isQuery = false; @@ -125,7 +127,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isDirectInsert) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -143,6 +145,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.isMmCtas = isMmCtas; this.isInsertOverwrite = isInsertOverwrite; this.isQuery = isQuery; + this.isDirectInsert = isDirectInsert; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -164,7 +167,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isDirectInsert); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -181,6 +184,7 @@ public Object clone() throws CloneNotSupportedException { ret.setIsMerge(isMerge); ret.setFilesToFetch(filesToFetch); ret.setIsQuery(isQuery); + ret.setIsDirectInsert(isDirectInsert); return ret; } @@ -215,6 +219,14 @@ public boolean isUsingBatchingSerDe() { public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) { this.isUsingBatchingSerDe = isUsingBatchingSerDe; } + + public void setIsDirectInsert(boolean isDirectInsert) { + this.isDirectInsert = isDirectInsert; + } + + public boolean isDirectInsert() { + return this.isDirectInsert; + } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED }) public Path getDirName() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index bed05819b5..a98314eefb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -42,7 +42,8 @@ private boolean inheritLocation = false; // A silly setting. private int stmtId; private Long currentWriteId; - private boolean isInsertOverwrite; + private boolean isInsertOverwrite; + private boolean isDirectInsert; // TODO: the below seem like they should just be combined into partitionDesc private Table mdTable; @@ -235,6 +236,14 @@ public void setInsertOverwrite(boolean v) { this.isInsertOverwrite = v; } + public void setIsDirectInsert(boolean isDirectInsert) { + this.isDirectInsert = isDirectInsert; + } + + public boolean isDirectInsert() { + return this.isDirectInsert; + } + /** * @return the lbCtx */ diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index 78f25856a4..c80a5d5e2e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, true, null, 0, false); + true, false, false, true, null, 0, false, false); i++; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a75103d60d..f195dc22be 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } @@ -778,6 +778,11 @@ public SerDeStats getStats() { public long getBufferedRowCount() { return records.size(); } + + @Override + public Path getUpdatedFilePath() { + return null; + } }; }