diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 724752bb49..ec2f9f0ac8 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -70,6 +70,8 @@ public final class FileUtils { private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class.getName()); private static final Random random = new Random(); + public static final int MAX_IO_ERROR_RETRY = 5; + public static final int IO_ERROR_SLEEP_TIME = 100; public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() { @Override @@ -1054,4 +1056,13 @@ public static void readFully(InputStream stream, int length, ByteBuffer bb) thro bb.position(bb.position() + fullLen); } } + + /** + * Returns the incremented sleep time in milli seconds. + * @param repeatNum number of retry done so far. + */ + public static int getSleepTime(int repeatNum) { + return IO_ERROR_SLEEP_TIME * (int)(Math.pow(2.0, repeatNum)); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 2557121f2d..79b4652404 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -96,21 +97,54 @@ private void doCopyRetry(FileSystem sourceFs, List s boolean isCopyError = false; List pathList = Lists.transform(srcFileList, ReplChangeManager.FileInfo::getEffectivePath); while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { - LOG.info("Attempt: " + (repeat+1) + ". Copying files: " + pathList); try { - isCopyError = false; + // if its retrying, first regenerate the path list. + if (repeat > 0) { + pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError); + if (pathList.isEmpty()) { + // all files were copied successfully in last try. So can break from here. + break; + } + } + + LOG.info("Attempt: " + (repeat+1) + ". Copying files: " + pathList); + + // if exception happens during doCopyOnce, then need to call getFilesToRetry with copy error as true in retry. + isCopyError = true; doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); + + // if exception happens after doCopyOnce, then need to call getFilesToRetry with copy error as false in retry. + isCopyError = false; } catch (IOException e) { // If copy fails, fall through the retry logic - isCopyError = true; + LOG.info("file operation failed", e); + + if (repeat >= (MAX_COPY_RETRY - 1)) { + //no need to wait in the last iteration + break; + } + + if (!(e instanceof FileNotFoundException)) { + int sleepTime = FileUtils.getSleepTime(repeat); + LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (repeat+1)); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + LOG.info("sleep interrupted", timerEx.getMessage()); + } + + // looks like some network outrage, reset the file system object and retry. + FileSystem.closeAllForUGI(Utils.getUGI()); + sourceFs = pathList.get(0).getFileSystem(hiveConf); + destinationFs = destination.getFileSystem(hiveConf); + } } - pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError); repeat++; } // If still files remains to be copied due to failure/checksum mismatch after several attempts, then throw error if (!pathList.isEmpty()) { - LOG.error("File copy failed even after several attempts. Files list: " + srcFileList); + LOG.error("File copy failed even after several attempts. Files list: " + pathList); throw new IOException("File copy failed even after several attempts."); } } @@ -148,27 +182,11 @@ private void doCopyRetry(FileSystem sourceFs, List s continue; } } - } else { - // If destination file is missing, then retry copy - if (sourceFs.exists(srcPath)) { - // If checksum does not match, likely the file is changed/removed, retry from CM path - if (isSourceFileMismatch(sourceFs, srcFile)) { - srcFile.setIsUseSourcePath(false); - } - } else { - if (srcFile.isUseSourcePath()) { - // Source file missing, then try with CM path - srcFile.setIsUseSourcePath(false); - } else { - // CM path itself is missing, cannot recover from this error - LOG.error("File Copy Failed. Both source and CM files are missing from source. " - + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " - + "Try setting higher value for hive.repl.cm.retain in source warehouse. " - + "Also, bootstrap the system again to get back the consistent replicated state."); - throw new IOException("Both source and CM path are missing from source."); - } - } + } else if (isSourceFileMismatch(sourceFs, srcFile)) { + // If checksum does not match, likely the file is changed/removed, retry from CM path + srcFile.setIsUseSourcePath(false); } + srcPath = srcFile.getEffectivePath(); if (null == srcPath) { // This case possible if CM path is not enabled. @@ -176,6 +194,16 @@ private void doCopyRetry(FileSystem sourceFs, List s + "Source File: " + srcFile.getSourcePath()); throw new IOException("File copy failed and likely source file is deleted or modified."); } + + if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) { + // CM path itself is missing, cannot recover from this error + LOG.error("File Copy Failed. Both source and CM files are missing from source. " + + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " + + "Try setting higher value for hive.repl.cm.retain in source warehouse. " + + "Also, bootstrap the system again to get back the consistent replicated state."); + throw new IOException("Both source and CM path are missing from source."); + } + pathList.add(srcPath); } return pathList; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 085f4a1cb1..c923121cd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; +import org.apache.hadoop.hive.shims.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +55,7 @@ private final Path exportRootDataDir; private final String distCpDoAsUser; private HiveConf hiveConf; - private final FileSystem dataFileSystem, exportFileSystem; + private FileSystem exportFileSystem, dataFileSystem; private final MmContext mmCtx; public FileOperations(List dataPathList, Path exportRootDataDir, String distCpDoAsUser, @@ -136,18 +138,48 @@ private void copyMmPath() throws LoginException, IOException { } } - - /** * This needs the root data directory to which the data needs to be exported to. * The data export here is a list of files either in table/partition that are written to the _files * in the exportRootDataDir provided. */ - private void exportFilesAsList() throws SemanticException, IOException { - // This is only called for replication that handles MM tables; no need for mmCtx. - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + private void exportFilesAsList() throws SemanticException, IOException, LoginException { + if (dataPathList.isEmpty()) { + return; + } + boolean done = false; + int repeat = 0; + while (!done) { + // This is only called for replication that handles MM tables; no need for mmCtx. + try (BufferedWriter writer = writer()) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + done = true; + } catch (IOException e) { + repeat++; + logger.info("writeFilesList failed", e); + if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw e; + } + + int sleepTime = FileUtils.getSleepTime(repeat - 1); + logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException timerEx) { + logger.info("thread sleep interrupted", timerEx.getMessage()); + } + + // in case of io error, reset the file system object + FileSystem.closeAllForUGI(Utils.getUGI()); + dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); + if (exportFileSystem.exists(exportPath)) { + exportFileSystem.delete(exportPath, true); + } } } }