diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 2557121f2d..bd7e8f5ea6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -98,19 +99,40 @@ private void doCopyRetry(FileSystem sourceFs, List s while (!pathList.isEmpty() && (repeat < MAX_COPY_RETRY)) { LOG.info("Attempt: " + (repeat+1) + ". Copying files: " + pathList); try { + // if its retrying after copy error, first regenerate the path list. + if (isCopyError) { + pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, true); + } isCopyError = false; doCopyOnce(sourceFs, pathList, destinationFs, destination, useRegularCopy); + pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, false); } catch (IOException e) { + if (repeat >= (MAX_COPY_RETRY - 1)) { + //no need to wait in the last iteration + break; + } // If copy fails, fall through the retry logic + LOG.info("file operation failed with error : " + e.getMessage()); + + if (!(e instanceof FileNotFoundException)) { + LOG.info(" sleep for 100 milliseconds before retry " + (repeat+1)); + try { + Thread.sleep(100); + } catch (InterruptedException timerEx) { + LOG.info("sleep interrupted", timerEx.getMessage()); + } + // looks like some network outrage, reset the file system object and retry. + sourceFs = pathList.get(0).getFileSystem(hiveConf); + destinationFs = destination.getFileSystem(hiveConf); + } isCopyError = true; } - pathList = getFilesToRetry(sourceFs, srcFileList, destinationFs, destination, isCopyError); repeat++; } // If still files remains to be copied due to failure/checksum mismatch after several attempts, then throw error if (!pathList.isEmpty()) { - LOG.error("File copy failed even after several attempts. Files list: " + srcFileList); + LOG.error("File copy failed even after several attempts. Files list: " + pathList); throw new IOException("File copy failed even after several attempts."); } } @@ -150,23 +172,24 @@ private void doCopyRetry(FileSystem sourceFs, List s } } else { // If destination file is missing, then retry copy - if (sourceFs.exists(srcPath)) { + if (srcFile.isUseSourcePath() && sourceFs.exists(srcPath)) { // If checksum does not match, likely the file is changed/removed, retry from CM path if (isSourceFileMismatch(sourceFs, srcFile)) { srcFile.setIsUseSourcePath(false); } } else { - if (srcFile.isUseSourcePath()) { - // Source file missing, then try with CM path - srcFile.setIsUseSourcePath(false); - } else { - // CM path itself is missing, cannot recover from this error - LOG.error("File Copy Failed. Both source and CM files are missing from source. " - + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " - + "Try setting higher value for hive.repl.cm.retain in source warehouse. " - + "Also, bootstrap the system again to get back the consistent replicated state."); - throw new IOException("Both source and CM path are missing from source."); - } + // The file copy might have failed for network issue. So retry with cm path again if the + // original file is missing. + srcFile.setIsUseSourcePath(false); + } + + if (!srcFile.isUseSourcePath() && !sourceFs.exists(srcFile.getCmPath())) { + // CM path itself is missing, cannot recover from this error + LOG.error("File Copy Failed. Both source and CM files are missing from source. " + + "Missing Source File: " + srcFile.getSourcePath() + ", CM File: " + srcFile.getCmPath() + ". " + + "Try setting higher value for hive.repl.cm.retain in source warehouse. " + + "Also, bootstrap the system again to get back the consistent replicated state."); + throw new IOException("Both source and CM path are missing from source."); } } srcPath = srcFile.getEffectivePath(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 085f4a1cb1..2bf47f8b7f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -53,8 +53,9 @@ private final Path exportRootDataDir; private final String distCpDoAsUser; private HiveConf hiveConf; - private final FileSystem dataFileSystem, exportFileSystem; + private FileSystem exportFileSystem, dataFileSystem; private final MmContext mmCtx; + public static final int MAX_IO_ERROR_RETRY = 5; public FileOperations(List dataPathList, Path exportRootDataDir, String distCpDoAsUser, HiveConf hiveConf, MmContext mmCtx) throws IOException { @@ -136,18 +137,41 @@ private void copyMmPath() throws LoginException, IOException { } } - - /** * This needs the root data directory to which the data needs to be exported to. * The data export here is a list of files either in table/partition that are written to the _files * in the exportRootDataDir provided. */ private void exportFilesAsList() throws SemanticException, IOException { - // This is only called for replication that handles MM tables; no need for mmCtx. - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + if (dataPathList.isEmpty()) { + return; + } + boolean done = false; + int repeat = 0; + while (!done) { + // This is only called for replication that handles MM tables; no need for mmCtx. + try (BufferedWriter writer = writer()) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + done = true; + } catch (IOException e) { + repeat++; + logger.info("writeFilesList failed with exception : " + e.getMessage() + " for " + repeat + " time"); + if (repeat >= MAX_IO_ERROR_RETRY) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw e; + } + try { + logger.info(" sleep for 100 milliseconds before retry " + (repeat)); + Thread.sleep(100); + } catch (InterruptedException timerEx) { + logger.info("thread sleep interrupted", timerEx.getMessage()); + } + // in case of io error, reset the file system object + dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + exportFileSystem.deleteOnExit(new Path(exportRootDataDir, EximUtil.FILES_NAME)); } } }