diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..f84f9a3643 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -761,6 +761,111 @@ public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable { .verifyResults(new String[]{"11", "21"}); } + @Test + public void testCheckPointingDataDumpFailureORCTableRegularCopy() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, + // data will also be rewritten for t1 and t2 as regular copy + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //File is copied again as we are using regular copy + assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime()); + assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t3"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingDataDumpFailureORCTableDistcpCopy() throws Throwable { + //Distcp copy + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName, dumpClause); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + @Test public void testCheckPointingWithSourceTableDataInserted() throws Throwable { //To force distcp copy diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 23cd128197..609f309a0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -375,6 +375,10 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, final Path finalDestination = destination; try { proxyUser.doAs((PrivilegedExceptionAction) () -> { + //Destination should be empty + destinationFs.delete(destination, true); + //Destination folder should be created + destinationFs.mkdirs(destination); FileUtil .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf); return true; @@ -383,6 +387,10 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, throw new IOException(e); } } else { + //Destination should be empty + destinationFs.delete(destination, true); + //Destination folder should be created + destinationFs.mkdirs(destination); FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); } }