diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 1802f37..9bfd171 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -117,7 +117,7 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, FileSystem fs) deletePath = createTargetPath(targetPath, fs); } Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, null)) { try { if (deletePath != null) { fs.delete(deletePath, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index c5b3517..c698c22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1602,13 +1602,26 @@ public void loadPartition(Path loadPath, String tableName, * If the source directory is LOCAL * @param isAcid true if this is an ACID operation */ - public Partition loadPartition(Path loadPath, Table tbl, - Map partSpec, boolean replace, - boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + /** + * @param loadPath + * @param tbl + * @param partSpec + * @param replace + * @param inheritTableSpecs + * @param isSkewedStoreAsSubdir + * @param isSrcLocal + * @param isAcid + * @param hasFollowingStatsTask + * @return + * @throws HiveException + */ + public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, + boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { + // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes @@ -1646,28 +1659,38 @@ public Partition loadPartition(Path loadPath, Table tbl, List newFiles = null; PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); + + // If config is set, table is not temporary and partition being inserted to exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files + // added. + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { + newFiles = Collections.synchronizedList(new ArrayList()); + } + if (replace || (oldPart == null && !isAcid)) { - replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), - isSrcLocal); + replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal, + newFiles); } else { - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } + perfLogger.PerfLogEnd("MoveTask", "FileMoves"); Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - if ((null != newFiles) || replace) { + + // Generate an insert event only if inserting into an existing partition + // When inserting into a new partition, the add partition event takes care of insert event + if ((oldPart != null) && (newFiles != null)) { fireInsertEvent(tbl, partSpec, newFiles); } else { - LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event."); + LOG.debug("No new files were created, and is not a replace, or we're inserting into a " + + "partition that does not exist yet. Skipping generating INSERT event."); } - //column stats will be inaccurate + // column stats will be inaccurate StatsSetupConst.clearColumnStatsState(newTPart.getParameters()); // recreate the partition if it existed before @@ -2013,7 +2036,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean } if (replace) { Path tableDest = tbl.getPath(); - replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal); + replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, null); } else { FileSystem fs; try { @@ -3087,8 +3110,8 @@ public static void clearDestForSubDirSrc(final HiveConf conf, Path dest, //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission - public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, - boolean replace, boolean isSrcLocal) throws HiveException { + public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, + boolean isSrcLocal, List newFiles) throws HiveException { final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); @@ -3103,6 +3126,22 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, throw new HiveException(e.getMessage(), e); } + // Add file paths of the files that will be copied to the destination if the caller needs it + if (newFiles != null) { + List results = new ArrayList(); + try { + for (FileStatus fileStatus : srcFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER)) { + FileUtils.listStatusRecursively(srcFs, fileStatus, results); + } + for (FileStatus fileStatus : results) { + newFiles.add(new Path(destf, fileStatus.getPath().getName())); + } + } catch (IOException e) { + LOG.error("Failed to get source file statuses", e); + throw new HiveException(e.getMessage(), e); + } + } + //needed for perm inheritance. final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); @@ -3401,9 +3440,10 @@ private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilt * as destf, unless its across FileSystem boundaries. * @param isSrcLocal * If the source directory is LOCAL + * @param newFiles */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, - boolean isSrcLocal) throws HiveException { + boolean isSrcLocal, List newFiles) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -3472,12 +3512,14 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer // in both cases, we move the file under destf if (srcs.length == 1 && srcs[0].isDirectory()) { - if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { + if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal, newFiles)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } - } else { // its either a file or glob + } else { + // its either a file or glob for (FileStatus src : srcs) { - if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) { + if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, + isSrcLocal, newFiles)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } }