diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..f84f9a3643 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -761,6 +761,111 @@ public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable { .verifyResults(new String[]{"11", "21"}); } + @Test + public void testCheckPointingDataDumpFailureORCTableRegularCopy() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, + // data will also be rewritten for t1 and t2 as regular copy + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //File is copied again as we are using regular copy + assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime()); + assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t3"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingDataDumpFailureORCTableDistcpCopy() throws Throwable { + //Distcp copy + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName, dumpClause); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + @Test public void testCheckPointingWithSourceTableDataInserted() throws Throwable { //To force distcp copy diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 51c3b6ff61..984166dc12 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -221,7 +221,7 @@ public int execute() { } // Copy the files from different source file systems to one destination directory CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); - copyUtils.copyAndVerify(toPath, srcFiles, fromPath); + copyUtils.copyAndVerify(toPath, srcFiles, fromPath, true); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 23cd128197..ea020792ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -71,7 +71,8 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPtah) + public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPtah, + boolean overwrite) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); @@ -80,7 +81,7 @@ public void copyAndVerify(Path destRoot, List srcFil if (!useRegularCopy) { srcFiles.clear(); srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); - doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } else { Map>> map = fsToFileMap(srcFiles, destRoot); for (Map.Entry>> entry : map.entrySet()) { @@ -98,7 +99,7 @@ public void copyAndVerify(Path destRoot, List srcFil } // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true); + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true, overwrite); } } } @@ -111,7 +112,8 @@ public void copyAndVerify(Path destRoot, List srcFil private void doCopyRetry(FileSystem sourceFs, List srcFileList, Path destination, UserGroupInformation proxyUser, - boolean useRegularCopy) throws IOException, LoginException, HiveFatalException { + boolean useRegularCopy, boolean overwrite) throws IOException, + LoginException, HiveFatalException { int repeat = 0; boolean isCopyError = false; List pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath); @@ -130,7 +132,7 @@ private void doCopyRetry(FileSystem sourceFs, List s // if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry. isCopyError = true; - doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser); + doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser, overwrite); // if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry. isCopyError = false; @@ -330,9 +332,10 @@ private UserGroupInformation getProxyUser() throws LoginException, IOException { // Copy without retry private void doCopyOnce(FileSystem sourceFs, List srcList, Path destination, - boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException { + boolean useRegularCopy, UserGroupInformation proxyUser, + boolean overwrite) throws IOException { if (useRegularCopy) { - doRegularCopyOnce(sourceFs, srcList, destination, proxyUser); + doRegularCopyOnce(sourceFs, srcList, destination, proxyUser, overwrite); } else { doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser); } @@ -365,7 +368,7 @@ private void doDistCpCopyOnce(FileSystem sourceFs, List srcList, Path dest } private void doRegularCopyOnce(FileSystem sourceFs, List srcList, - Path destination, UserGroupInformation proxyUser) throws IOException { + Path destination, UserGroupInformation proxyUser, boolean overWrite) throws IOException { /* even for regular copy we have to use the same user permissions that distCp will use since hive-server user might be different that the super user required to copy relevant files. @@ -375,6 +378,12 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, final Path finalDestination = destination; try { proxyUser.doAs((PrivilegedExceptionAction) () -> { + //Destination should be empty + if (overWrite) { + destinationFs.delete(destination, true); + //Destination folder should be created + destinationFs.mkdirs(destination); + } FileUtil .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf); return true; @@ -383,6 +392,12 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, throw new IOException(e); } } else { + //Destination should be empty + if (overWrite) { + destinationFs.delete(destination, true); + //Destination folder should be created + destinationFs.mkdirs(destination); + } FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); } } @@ -398,7 +413,7 @@ public void doCopy(Path destination, List srcPaths) throws IOException, Lo path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); doCopyOnce(sourceFs, entry.getValue(), destination, - regularCopy(sourceFs, fileList), proxyUser); + regularCopy(sourceFs, fileList), proxyUser, false); } } finally { if (proxyUser != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index fed0d50c00..82bf384a74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -108,7 +108,7 @@ protected void writeFileEntry(Table table, Partition ptn, String file, Context w filePaths.add(fileInfo); FileSystem dstFs = dataPath.getFileSystem(hiveConf); CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); - copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath); + copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, false); copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths); } }