diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 39722a0dd2..f84f9a3643 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -761,6 +761,111 @@ public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable { .verifyResults(new String[]{"11", "21"}); } + @Test + public void testCheckPointingDataDumpFailureORCTableRegularCopy() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, + // data will also be rewritten for t1 and t2 as regular copy + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + //File is copied again as we are using regular copy + assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime()); + assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t3"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + + @Test + public void testCheckPointingDataDumpFailureORCTableDistcpCopy() throws Throwable { + //Distcp copy + List dumpClause = Arrays.asList( + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("CREATE TABLE t1(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (1)") + .run("insert into t1 values (2)") + .run("insert into t1 values (3)") + .run("insert into t2 values (11)") + .run("insert into t2 values (21)") + .dump(primaryDbName, dumpClause); + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME); + long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime(); + Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME); + Path dbPath = new Path(dataPath, primaryDbName.toLowerCase()); + Path tablet1Path = new Path(dbPath, "t1"); + Path tablet2Path = new Path(dbPath, "t2"); + //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2 + fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true); + assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + FileStatus[] statuses = fs.listStatus(tablet2Path); + //Delete t2 data + fs.delete(statuses[0].getPath(), true); + long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime(); + long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime(); + long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime(); + //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater + WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause); + assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation); + assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); + assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime()); + assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime()); + assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime()); + assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime()); + replica.load(replicatedDbName, primaryDbName) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[] {"1", "2", "3"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"11", "21"}); + } + @Test public void testCheckPointingWithSourceTableDataInserted() throws Throwable { //To force distcp copy diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 51c3b6ff61..66c3ced91a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -221,7 +221,7 @@ public int execute() { } // Copy the files from different source file systems to one destination directory CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); - copyUtils.copyAndVerify(toPath, srcFiles, fromPath); + copyUtils.copyAndVerify(toPath, srcFiles, fromPath, work.isOverWrite()); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice @@ -329,10 +329,18 @@ public String getName() { public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable, boolean readSourceAsFileList) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, + copyToMigratedTxnTable, readSourceAsFileList, false); + } + + private static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable, boolean readSourceAsFileList, + boolean overWrite) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ - ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite); rcwork.setReadSrcAsFilesList(readSourceAsFileList); if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); @@ -360,8 +368,13 @@ public String getName() { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); } + /* + * Invoked in the bootstrap path. + * Overwrite set to true + */ public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, - HiveConf conf, boolean readSourceAsFileList) { - return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList); + HiveConf conf, boolean readSourceAsFileList, boolean overWrite) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, + false, readSourceAsFileList, overWrite); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2e0af02094..254274eb80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -640,6 +640,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) //clear the metadata. We need to rewrite the metadata as the write id list will be changed //We can't reuse the previous write id as it might be invalid due to compaction metadataPath.getFileSystem(conf).delete(metadataPath, true); + work.setShouldOverwrite(true); } for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { LOG.debug("Dumping db: " + dbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 1f0d70212c..b76cd46d9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -54,6 +54,7 @@ private transient Iterator managedTableCopyPathIterator; private Path currentDumpPath; private List resultValues; + private boolean shouldOverwrite; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; @@ -162,11 +163,15 @@ public void setResultValues(List resultValues) { EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next(); Task copyTask = ReplCopyTask.getLoadCopyTask( managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(), - managedTableCopyPath.getTargetPath(), conf, false); + managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite); tasks.add(copyTask); tracker.addTask(copyTask); LOG.debug("added task for {}", managedTableCopyPath); } return tasks; } + + public void setShouldOverwrite(boolean shouldOverwrite) { + this.shouldOverwrite = shouldOverwrite; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index b98f1f3b38..b36c4a531f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -243,7 +243,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc event.replicationSpec(), new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), stagingDir, - context.hiveConf, false + context.hiveConf, false, false ); Task movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index bb20687f6f..6cea22c01f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -295,7 +295,8 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false); + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, + false, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 23cd128197..437072cfa4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -71,7 +71,8 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf, FileSystem destinatio // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPtah) + public void copyAndVerify(Path destRoot, List srcFiles, Path origSrcPtah, + boolean overwrite) throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); @@ -80,7 +81,7 @@ public void copyAndVerify(Path destRoot, List srcFil if (!useRegularCopy) { srcFiles.clear(); srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); - doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, overwrite); } else { Map>> map = fsToFileMap(srcFiles, destRoot); for (Map.Entry>> entry : map.entrySet()) { @@ -98,7 +99,7 @@ public void copyAndVerify(Path destRoot, List srcFil } // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true); + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true, overwrite); } } } @@ -111,7 +112,8 @@ public void copyAndVerify(Path destRoot, List srcFil private void doCopyRetry(FileSystem sourceFs, List srcFileList, Path destination, UserGroupInformation proxyUser, - boolean useRegularCopy) throws IOException, LoginException, HiveFatalException { + boolean useRegularCopy, boolean overwrite) throws IOException, + LoginException, HiveFatalException { int repeat = 0; boolean isCopyError = false; List pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath); @@ -130,7 +132,7 @@ private void doCopyRetry(FileSystem sourceFs, List s // if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry. isCopyError = true; - doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser); + doCopyOnce(sourceFs, pathList, destination, useRegularCopy, proxyUser, overwrite); // if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry. isCopyError = false; @@ -330,9 +332,10 @@ private UserGroupInformation getProxyUser() throws LoginException, IOException { // Copy without retry private void doCopyOnce(FileSystem sourceFs, List srcList, Path destination, - boolean useRegularCopy, UserGroupInformation proxyUser) throws IOException { + boolean useRegularCopy, UserGroupInformation proxyUser, + boolean overwrite) throws IOException { if (useRegularCopy) { - doRegularCopyOnce(sourceFs, srcList, destination, proxyUser); + doRegularCopyOnce(sourceFs, srcList, destination, proxyUser, overwrite); } else { doDistCpCopyOnce(sourceFs, srcList, destination, proxyUser); } @@ -365,7 +368,7 @@ private void doDistCpCopyOnce(FileSystem sourceFs, List srcList, Path dest } private void doRegularCopyOnce(FileSystem sourceFs, List srcList, - Path destination, UserGroupInformation proxyUser) throws IOException { + Path destination, UserGroupInformation proxyUser, boolean overWrite) throws IOException { /* even for regular copy we have to use the same user permissions that distCp will use since hive-server user might be different that the super user required to copy relevant files. @@ -375,6 +378,10 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, final Path finalDestination = destination; try { proxyUser.doAs((PrivilegedExceptionAction) () -> { + //Destination should be empty + if (overWrite) { + deleteSubDirs(destinationFs, destination); + } FileUtil .copy(sourceFs, paths, destinationFs, finalDestination, false, true, hiveConf); return true; @@ -383,10 +390,22 @@ private void doRegularCopyOnce(FileSystem sourceFs, List srcList, throw new IOException(e); } } else { + //Destination should be empty + if (overWrite) { + deleteSubDirs(destinationFs, destination); + } FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); } } + private void deleteSubDirs(FileSystem fs, Path path) throws IOException { + //Delete the root path instead of doing a listing + //This is more optimised + fs.delete(path, true); + //Recreate just the Root folder + fs.mkdirs(path); + } + public void doCopy(Path destination, List srcPaths) throws IOException, LoginException { Map> map = fsToPathMap(srcPaths); @@ -398,7 +417,7 @@ public void doCopy(Path destination, List srcPaths) throws IOException, Lo path -> new ReplChangeManager.FileInfo(sourceFs, path, null)); doCopyOnce(sourceFs, entry.getValue(), destination, - regularCopy(sourceFs, fileList), proxyUser); + regularCopy(sourceFs, fileList), proxyUser, false); } } finally { if (proxyUser != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index fed0d50c00..82bf384a74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -108,7 +108,7 @@ protected void writeFileEntry(Table table, Partition ptn, String file, Context w filePaths.add(fileInfo); FileSystem dstFs = dataPath.getFileSystem(hiveConf); CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); - copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath); + copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath, false); copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index c631f3d6e7..dd01b21034 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -61,10 +61,17 @@ private boolean checkDuplicateCopy = false; + private boolean overWrite = false; + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { super(srcPath, destPath, errorOnSrcEmpty); } + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty, boolean overWrite) { + this(srcPath, destPath, errorOnSrcEmpty); + this.overWrite = overWrite; + } + public void setReadSrcAsFilesList(boolean readSrcAsFilesList) { this.readSrcAsFilesList = readSrcAsFilesList; } @@ -120,4 +127,8 @@ public boolean isNeedCheckDuplicateCopy() { public void setCheckDuplicateCopy(boolean flag) { checkDuplicateCopy = flag; } + + public boolean isOverWrite() { + return overWrite; + } }