diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 9b8563b..e2ad8de 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -845,6 +845,7 @@ public void testIncrementalLoad() throws IOException { run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); verifyRun("SELECT * from " + dbName + "_dupe.unptned_late", unptn_data); + run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)"); run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1); @@ -874,6 +875,8 @@ public void testIncrementalLoad() throws IOException { verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1", ptn_data_1); verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=2", ptn_data_2); } @Test @@ -935,8 +938,7 @@ public void testIncrementalInserts() throws IOException { verifyRun("SELECT a from " + dbName + "_dupe.unptned_late ORDER BY a", unptn_data_after_ins); - // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in - //verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", data_after_ovwrite); } @Test @@ -984,8 +986,12 @@ public void testIncrementalInsertToPartition() throws IOException { verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2) ORDER BY a", ptn_data_2); String[] data_after_ovwrite = new String[] { "hundred" }; + // Insert overwrite on existing partition run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=2) values('" + data_after_ovwrite[0] + "')"); verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite); + // Insert overwrite on dynamic partition + run("INSERT OVERWRITE TABLE " + dbName + ".ptned partition(b=3) values('" + data_after_ovwrite[0] + "')"); + verifySetup("SELECT a from " + dbName + ".ptned where (b=3)", data_after_ovwrite); advanceDumpDir(); run("REPL DUMP " + dbName + " FROM " + replDumpId); @@ -997,8 +1003,8 @@ public void testIncrementalInsertToPartition() throws IOException { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); - // Commenting the below verifications for the replication of insert overwrites until HIVE-15642 patch is in - //verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=2)", data_after_ovwrite); + verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=3)", data_after_ovwrite); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 45c77a2..dec73a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1592,6 +1592,19 @@ public Database getDatabaseCurrent() throws HiveException { return getDatabase(currentDb); } + /** + * @param loadPath + * @param tableName + * @param partSpec + * @param replace + * @param inheritTableSpecs + * @param isSkewedStoreAsSubdir + * @param isSrcLocal + * @param isAcid + * @param hasFollowingStatsTask + * @return + * @throws HiveException + */ public void loadPartition(Path loadPath, String tableName, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, @@ -1620,7 +1633,11 @@ public void loadPartition(Path loadPath, String tableName, * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid true if this is an ACID operation + * @param isAcid + * true if this is an ACID operation + * @param hasFollowingStatsTask + * true if there is a following task which updates the stats, so, this method need not update. + * @return Partition object being loaded with data */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, @@ -1629,6 +1646,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Path tblDataLocationPath = tbl.getDataLocation(); try { + // Get the partition object if it already exists Partition oldPart = getPartition(tbl, partSpec, false); /** * Move files before creating the partition since down stream processes @@ -1666,15 +1684,19 @@ public Partition loadPartition(Path loadPath, Table tbl, List newFiles = null; PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); + + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { + newFiles = Collections.synchronizedList(new ArrayList()); + } + if (replace || (oldPart == null && !isAcid)) { boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), - isSrcLocal, isAutoPurge); + isSrcLocal, isAutoPurge, newFiles); } else { - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { - newFiles = Collections.synchronizedList(new ArrayList()); - } - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } @@ -1682,13 +1704,17 @@ public Partition loadPartition(Path loadPath, Table tbl, Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); validatePartition(newTPart); - if ((null != newFiles) || replace) { + + // Generate an insert event only if inserting into an existing partition + // When inserting into a new partition, the add partition event takes care of insert event + if ((null != oldPart) && (null != newFiles)) { fireInsertEvent(tbl, partSpec, replace, newFiles); } else { - LOG.debug("No new files were created, and is not a replace. Skipping generating INSERT event."); + LOG.debug("No new files were created, and is not a replace, or we're inserting into a " + + "partition that does not exist yet. Skipping generating INSERT event."); } - //column stats will be inaccurate + // column stats will be inaccurate StatsSetupConst.clearColumnStatsState(newTPart.getParameters()); // recreate the partition if it existed before @@ -2035,7 +2061,7 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean if (replace) { Path tableDest = tbl.getPath(); boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge); + replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge, newFiles); } else { FileSystem fs; try { @@ -3109,12 +3135,30 @@ public static void clearDestForSubDirSrc(final HiveConf conf, Path dest, } } + // List the new files in destination path which gets copied from source. + public static void listNewFilesRecursively(final FileSystem destFs, Path dest, + List newFiles) throws HiveException { + try { + for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) { + if (fileStatus.isDirectory()) { + // If it is a sub-directory, then recursively list the files. + listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles); + } else { + newFiles.add(fileStatus.getPath()); + } + } + } catch (IOException e) { + LOG.error("Failed to get source file statuses", e); + throw new HiveException(e.getMessage(), e); + } + } + //it is assumed that parent directory of the destf should already exist when this //method is called. when the replace value is true, this method works a little different //from mv command if the destf is a directory, it replaces the destf instead of moving under //the destf. in this case, the replaced destf still preserves the original destf's permission - public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, - boolean replace, boolean isSrcLocal) throws HiveException { + public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, + boolean isSrcLocal) throws HiveException { final FileSystem srcFs, destFs; try { destFs = destf.getFileSystem(conf); @@ -3125,7 +3169,7 @@ public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, try { srcFs = srcf.getFileSystem(conf); } catch (IOException e) { - LOG.error("Failed to get dest fs", e); + LOG.error("Failed to get src fs", e); throw new HiveException(e.getMessage(), e); } @@ -3406,9 +3450,11 @@ private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilt * When set to true files which needs to be deleted are not moved to Trash * @param isSrcLocal * If the source directory is LOCAL + * @param newFiles + * Output the list of new files replaced in the destination path */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, - boolean isSrcLocal, boolean purge) throws HiveException { + boolean isSrcLocal, boolean purge, List newFiles) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -3478,11 +3524,23 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } - } else { // its either a file or glob + + // Add file paths of the files that will be moved to the destination if the caller needs it + if (null != newFiles) { + listNewFilesRecursively(destFs, destf, newFiles); + } + } else { + // its either a file or glob for (FileStatus src : srcs) { - if (!moveFile(conf, src.getPath(), new Path(destf, src.getPath().getName()), true, isSrcLocal)) { + Path destFile = new Path(destf, src.getPath().getName()); + if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) { throw new IOException("Error moving: " + srcf + " into: " + destf); } + + // Add file paths of the files that will be moved to the destination if the caller needs it + if (null != newFiles) { + newFiles.add(destFile); + } } } } catch (IOException e) {