diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 88d6a7a..f6f0c8d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -63,6 +63,46 @@ COPY } + public static class FileInfo { + FileSystem fs; + Path sourcePath; + Path cmPath; + String checkSum; + boolean useSourcePath; + public FileInfo(FileSystem fs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) { + this.fs = fs; + this.sourcePath = sourcePath; + this.cmPath = cmPath; + this.checkSum = checkSum; + this.useSourcePath = useSourcePath; + } + public FileSystem getFs() { + return fs; + } + public Path getSourcePath() { + return sourcePath; + } + public Path getCmPath() { + return cmPath; + } + public String getCheckSum() { + return checkSum; + } + public boolean isUseSourcePath() { + return useSourcePath; + } + public void setIsUseSourcePath(boolean useSourcePath) { + this.useSourcePath = useSourcePath; + } + public Path getEffectivePath() { + if (useSourcePath) { + return sourcePath; + } else { + return cmPath; + } + } + } + public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException { if (instance == null) { instance = new ReplChangeManager(hiveConf); @@ -258,25 +298,25 @@ static Path getCMPath(Configuration conf, String checkSum) throws IOException, M * @param src Original file location * @param checksumString Checksum of the original file * @param hiveConf - * @return Corresponding FileStatus object + * @return Corresponding FileInfo object */ - static public FileStatus getFileStatus(Path src, String checksumString, + static public FileInfo getFileInfo(Path src, String checksumString, HiveConf hiveConf) throws MetaException { try { FileSystem srcFs = src.getFileSystem(hiveConf); if (checksumString == null) { - return srcFs.getFileStatus(src); + return new FileInfo(srcFs, src, null, null, true); } if (!srcFs.exists(src)) { - return srcFs.getFileStatus(getCMPath(hiveConf, checksumString)); + return new FileInfo(srcFs, src, getCMPath(hiveConf, checksumString), checksumString, false); } String currentChecksumString = checksumFor(src, srcFs); if (currentChecksumString == null || checksumString.equals(currentChecksumString)) { - return srcFs.getFileStatus(src); + return new FileInfo(srcFs, src, getCMPath(hiveConf, checksumString), checksumString, true); } else { - return srcFs.getFileStatus(getCMPath(hiveConf, checksumString)); + return new FileInfo(srcFs, src, getCMPath(hiveConf, checksumString), checksumString, false); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 54746d3..04ab904 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.calcite.util.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -76,11 +77,10 @@ protected int execute(DriverContext driverContext) { // be a CM uri in the from path. if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); - Path sourcePath = ReplChangeManager - .getFileStatus(new Path(result[0]), result[1], conf) - .getPath(); + ReplChangeManager.FileInfo sourceInfo = ReplChangeManager + .getFileInfo(new Path(result[0]), result[1], conf); if (FileUtils.copy( - sourcePath.getFileSystem(conf), sourcePath, + sourceInfo.getFs(), sourceInfo.getSourcePath(), dstFs, toPath, false, false, conf)) { return 0; } else { @@ -90,13 +90,13 @@ protected int execute(DriverContext driverContext) { } } - List srcPaths = new ArrayList<>(); + List srcFiles = new ArrayList<>(); if (rwork.readSrcAsFilesList()) { // This flow is usually taken for REPL LOAD // Our input is the result of a _files listing, we should expand out _files. - srcPaths = filesInFileListing(srcFs, fromPath); - LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" : srcPaths.size())); - if ((srcPaths == null) || (srcPaths.isEmpty())) { + srcFiles = filesInFileListing(srcFs, fromPath); + LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size())); + if ((srcFiles == null) || (srcFiles.isEmpty())) { if (work.isErrorOnSrcEmpty()) { console.printError("No _files entry found on source: " + fromPath.toString()); return 5; @@ -120,17 +120,18 @@ protected int execute(DriverContext driverContext) { for (FileStatus oneSrc : srcs) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); - srcPaths.add(oneSrc.getPath()); + srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), + oneSrc.getPath(), null, null, true)); } } - LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size()); + LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths); + new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, srcFiles); return 0; } catch (Exception e) { @@ -140,7 +141,7 @@ protected int execute(DriverContext driverContext) { } } - private List filesInFileListing(FileSystem fs, Path dataPath) + private List filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); @@ -150,7 +151,7 @@ protected int execute(DriverContext driverContext) { // On success, but with nothing to return, we can return an empty list. } - List filePaths = new ArrayList<>(); + List filePaths = new ArrayList<>(); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing))); // TODO : verify if skipping charset here is okay @@ -160,9 +161,8 @@ protected int execute(DriverContext driverContext) { String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); try { - Path f = ReplChangeManager - .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], conf) - .getPath(); + ReplChangeManager.FileInfo f = ReplChangeManager + .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf); filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 28e7bcb..02bb3e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -24,10 +24,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.shims.ShimLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -43,6 +48,7 @@ private final long maxNumberOfFiles; private final boolean hiveInTest; private final String copyAsUser; + static public int MAX_COPY_RETRY = 3; public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -52,25 +58,104 @@ public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.copyAsUser = distCpDoAsUser; } + // Used by replication, copy files from source to destination. It is possible source file is changed/removed + // during copy, so double check the checksum after copy, if not match, copy again from cm + public void copyAndVerify(Path destination, List srcFiles) throws IOException { + Map> map = fsToFileMap(srcFiles); + FileSystem destinationFs = destination.getFileSystem(hiveConf); + + for (Map.Entry> entry : map.entrySet()) { + FileSystem sourceFs = entry.getKey(); + Function fileToPath = new Function() { + public Path apply(ReplChangeManager.FileInfo fileInfo) { return fileInfo.getEffectivePath(); } + }; + + boolean useRegularCopy = false; + List fileInfoList = entry.getValue(); + if (regularCopy(destinationFs, sourceFs, fileInfoList)) { + useRegularCopy = true; + } + + List pathList = Lists.transform(fileInfoList, fileToPath); + + doCopyRetry(sourceFs, pathList, destinationFs, destination, copyAsUser, hiveConf, useRegularCopy); + pathList = new ArrayList(); + + // Verify checksum + for (ReplChangeManager.FileInfo srcFile : srcFiles) { + Path srcPath = srcFile.getEffectivePath(); + String sourceChecksumString = srcFile.getCheckSum(); + if (sourceChecksumString != null) { + String verifySourceChecksumString = null; + try { + verifySourceChecksumString = ReplChangeManager.checksumFor(srcPath, sourceFs); + } catch (FileNotFoundException e) { + if (!srcFile.isUseSourcePath()) { + // If we already use CM path, that means the CM path is expired, throw exception + // as there is no remedy + throw e; + } + } + if (srcFile.isUseSourcePath() && (verifySourceChecksumString == null + || !sourceChecksumString.equals(verifySourceChecksumString)) && srcFile.getCmPath() != null) { + // If checksum does not match, likely the file is changed/removed, copy again from cm + pathList.add(srcFile.getCmPath()); + } + } + } + if (!pathList.isEmpty()) { + doCopyRetry(sourceFs, pathList, destinationFs, destination, copyAsUser, hiveConf, useRegularCopy); + } + } + } + + private void doCopyRetry(FileSystem sourceFs, List pathList, FileSystem destinationFs, Path destination, + String copyAsUser, HiveConf hiveConf, boolean useRegularCopy) throws IOException { + int repeat = 0; + while (repeat < MAX_COPY_RETRY) { + try { + doCopyOnce(sourceFs, pathList, destinationFs, destination, copyAsUser, hiveConf, useRegularCopy); + break; + } catch (FileNotFoundException e) { + // if source file does not exist, don't retry + break; + } catch (IOException e) { + destinationFs.delete(destination, true); + } + repeat++; + } + } + + // Copy without retry + private void doCopyOnce(FileSystem sourceFs, List pathList, FileSystem destinationFs, Path destination, + String copyAsUser, HiveConf hiveConf, boolean useRegularCopy) throws IOException { + if (useRegularCopy) { + Path[] paths = pathList.toArray(new Path[] {}); + FileUtil.copy(sourceFs, paths, destinationFs, destination, false, true, hiveConf); + } else { + FileUtils.distCp( + sourceFs, // source file system + pathList, // list of source paths + destination, + false, + copyAsUser, + hiveConf, + ShimLoader.getHadoopShims() + ); + } + } + public void doCopy(Path destination, List srcPaths) throws IOException { - Map> map = fsToFileMap(srcPaths); + Map> map = fsToPathMap(srcPaths); FileSystem destinationFs = destination.getFileSystem(hiveConf); for (Map.Entry> entry : map.entrySet()) { - if (regularCopy(destinationFs, entry)) { - Path[] paths = entry.getValue().toArray(new Path[] {}); - FileUtil.copy(entry.getKey(), paths, destinationFs, destination, false, true, hiveConf); - } else { - FileUtils.distCp( - entry.getKey(), // source file system - entry.getValue(), // list of source paths - destination, - false, - copyAsUser, - hiveConf, - ShimLoader.getHadoopShims() - ); - } + List fileList = Lists.transform(entry.getValue(), new Function() { + public ReplChangeManager.FileInfo apply(Path path) { return new ReplChangeManager.FileInfo(entry.getKey(), + path, null, null, true);} + }); + doCopyRetry(entry.getKey(), entry.getValue(), destinationFs, destination, copyAsUser, hiveConf, + regularCopy(destinationFs, entry.getKey(), fileList)); } } @@ -81,12 +166,11 @@ public void doCopy(Path destination, List srcPaths) throws IOException { 3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. 4. number of files of all source Paths(can be directory / file) is less than configured size. */ - private boolean regularCopy(FileSystem destinationFs, Map.Entry> entry) + private boolean regularCopy(FileSystem destinationFs, FileSystem sourceFs, List fileList) throws IOException { if (hiveInTest) { return true; } - FileSystem sourceFs = entry.getKey(); if (isLocal(sourceFs) || isLocal(destinationFs)) { return true; } @@ -97,8 +181,17 @@ private boolean regularCopy(FileSystem destinationFs, Map.Entry> fsToFileMap(List srcPaths) throws IOException { + private Map> fsToPathMap(List srcPaths) throws IOException { Map> result = new HashMap<>(); for (Path path : srcPaths) { FileSystem fileSystem = path.getFileSystem(hiveConf); @@ -133,4 +226,17 @@ private boolean isLocal(FileSystem fs) { } return result; } + + private Map> fsToFileMap( + List srcFiles) throws IOException { + Map> result = new HashMap<>(); + for (ReplChangeManager.FileInfo file : srcFiles) { + FileSystem fileSystem = file.getFs(); + if (!result.containsKey(fileSystem)) { + result.put(fileSystem, new ArrayList<>()); + } + result.get(fileSystem).add(file); + } + return result; + } } diff --git a/ql/src/test/scripts/testgrep_win.bat b/ql/src/test/scripts/testgrep_win.bat