commit 972067d33b890e702f350997d00485af1ef3e900 Author: Sahil Takiar Date: Thu Apr 26 13:40:59 2018 -0500 HIVE-16295: Add support for using Hadoop's S3A OutputCommitter diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b12a7a4d40..85ef77f392 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4411,7 +4411,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This parameter enables a number of optimizations when running on blobstores:\n" + "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" + - "See HIVE-15121 for details."); + "See HIVE-15121 for details."), + + HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER("hive.blobstore.use.output-committer", false, "Whether to " + + "use a custom PathOutputCommitter to commit data. For all the URIs specified in " + + "hive.blobstore.supported.schemes, Hive will honor the config " + + "mapreduce.outputcommitter.factory.scheme.[uri-scheme]. This overrides the behavior " + + "described in hive.blobstore.optimizations.enabled. See HIVE-16295 for details."); public final String varname; public final String altName; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 9b50fd4f30..a8f5d0162c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.HiveDataCommitter; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo; @@ -107,7 +108,7 @@ protected void setUp() { db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, - LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false); + LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, new HiveDataCommitter()); i++; } diff --git a/ql/pom.xml b/ql/pom.xml index fedb5f1f80..1b6c63792a 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -206,6 +206,11 @@ ${hadoop.version} true + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + org.apache.hadoop hadoop-yarn-registry diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DataCommitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DataCommitter.java new file mode 100644 index 0000000000..89af85e74f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DataCommitter.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; + +import java.util.List; + + +/** + * Defines how Hive will commit data to its final directory. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface DataCommitter { + + void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir, HiveConf conf, + SessionState.LogHelper console) throws HiveException; + + boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, + boolean isSrcLocal) throws HiveException; + + void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, + boolean isSrcLocal, boolean isAcidIUD, + boolean isOverwrite, List newFiles, boolean isBucketed, + boolean isFullAcidTable) throws HiveException; + + void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + boolean isSrcLocal, boolean purge, List newFiles, + PathFilter deletePathFilter, + boolean isNeedRecycle, Hive hive) throws HiveException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 9c57eff2e8..96e97bd527 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -18,20 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; - -import java.io.IOException; -import java.io.Serializable; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -83,17 +69,21 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; -import org.apache.hive.common.util.Murmur3; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -104,6 +94,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; + /** * File Sink operator implementation. **/ @@ -147,6 +138,8 @@ private transient boolean isInsertOverwrite; private transient String counterGroup; private transient BiFunction hashFunc; + private transient PathOutputCommitter pathOutputCommitter; + private TaskAttemptContext taskAttemptContext; /** * Counters. */ @@ -248,7 +241,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf); } - if(outPaths[idx] != null && fs.exists(outPaths[idx])) { + if(pathOutputCommitter == null && outPaths[idx] != null && fs.exists(outPaths[idx])) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); @@ -272,6 +265,13 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } } + if (pathOutputCommitter != null && outPaths[idx] != null && + outPaths[idx].getFileSystem(hconf).exists(outPaths[idx])) { + if (pathOutputCommitter.needsTaskCommit(taskAttemptContext)) { + pathOutputCommitter.commitTask(taskAttemptContext); + } + } + updateProgress(); } @@ -293,7 +293,7 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi } public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable, - boolean isSkewedStoredAsSubDirectories) { + boolean isSkewedStoredAsSubDirectories) throws IOException { if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); String taskWithExt = extension == null ? taskId : taskId + extension; @@ -303,7 +303,11 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } else { finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt); } - outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId)); + if (pathOutputCommitter != null) { + outPaths[filesIdx] = getPathOutputCommitterPath(taskId); + } else { + outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId)); + } } else { String taskIdPath = taskId; if (conf.isMerge()) { @@ -430,7 +434,7 @@ private void initializeSpecPath() { // 'Parent' boolean isLinked = conf.isLinkedFileSink(); if (!isLinked) { - // Simple case - no union. + // Simple case - no union. specPath = conf.getDirName(); unionPath = null; } else { @@ -501,6 +505,13 @@ protected void initializeOp(Configuration hconf) throws HiveException { destTablePath = conf.getDestPath(); isInsertOverwrite = conf.getInsertOverwrite(); counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP); + + if (conf.getHasOutputCommitter()) { + taskAttemptContext = createTaskAttemptContext(); + pathOutputCommitter = createPathOutputCommitter(); + pathOutputCommitter.setupTask(taskAttemptContext); + } + if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -1579,4 +1590,22 @@ private boolean isNativeTable() { return !conf.getTableInfo().isNonNative(); } + private PathOutputCommitter createPathOutputCommitter() throws IOException { + return PathOutputCommitterFactory.createCommitter(new Path(conf.getTargetDirName()), + taskAttemptContext); + } + + private TaskAttemptContextImpl createTaskAttemptContext() { + TaskAttemptID origId = MapredContext.get().getTaskAttemptID(); + + TaskAttemptID taskAttemptID = new TaskAttemptID(org.apache.commons.lang.StringUtils.EMPTY, 0, + origId.getTaskType(), origId.getTaskID().getId(), origId.getId()); + + return new TaskAttemptContextImpl(hconf, taskAttemptID); + } + + private Path getPathOutputCommitterPath(String taskId) throws IOException { + return new Path(pathOutputCommitter.getWorkPath(), + taskId + "-" + hconf.get(ConfVars.HIVEQUERYID.varname)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveDataCommitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveDataCommitter.java new file mode 100644 index 0000000000..af3d43a2c8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveDataCommitter.java @@ -0,0 +1,704 @@ +package org.apache.hadoop.hive.ql.exec; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.io.FilenameUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.io.HdfsUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * A {@link DataCommitter} that commits Hive data using a {@link FileSystem}. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HiveDataCommitter implements DataCommitter { + + private static final Logger LOG = LoggerFactory.getLogger(DataCommitter.class.getName()); + + @Override + public void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir, HiveConf conf, + SessionState.LogHelper console) throws HiveException { + try { + String mesg = "Moving data to " + (isDfsDir ? "" : "local ") + "directory " + + targetPath.toString(); + String mesg_detail = " from " + sourcePath.toString(); + console.printInfo(mesg, mesg_detail); + + FileSystem fs = sourcePath.getFileSystem(conf); + if (isDfsDir) { + moveFileInDfs(sourcePath, targetPath, conf); + } else { + // This is a local file + FileSystem dstFs = FileSystem.getLocal(conf); + moveFileFromDfsToLocal(sourcePath, targetPath, fs, dstFs, conf); + } + } catch (Exception e) { + throw new HiveException("Unable to move source " + sourcePath + " to destination " + + targetPath, e); + } + } + + private void moveFileInDfs(Path sourcePath, Path targetPath, HiveConf conf) + throws HiveException, IOException { + final FileSystem srcFs, tgtFs; + try { + tgtFs = targetPath.getFileSystem(conf); + } catch (IOException e) { + LOG.error("Failed to get dest fs", e); + throw new HiveException(e.getMessage(), e); + } + try { + srcFs = sourcePath.getFileSystem(conf); + } catch (IOException e) { + LOG.error("Failed to get src fs", e); + throw new HiveException(e.getMessage(), e); + } + + // if source exists, rename. Otherwise, create a empty directory + if (srcFs.exists(sourcePath)) { + Path deletePath = null; + // If it multiple level of folder are there fs.rename is failing so first + // create the targetpath.getParent() if it not exist + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { + deletePath = createTargetPath(targetPath, tgtFs); + } + Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); + if (!moveFile(conf, sourcePath, targetPath, true, false)) { + try { + if (deletePath != null) { + tgtFs.delete(deletePath, true); + } + } catch (IOException e) { + LOG.info("Unable to delete the path created for facilitating rename: {}", + deletePath); + } + throw new HiveException("Unable to rename: " + sourcePath + + " to: " + targetPath); + } + } else if (!tgtFs.mkdirs(targetPath)) { + throw new HiveException("Unable to make directory: " + targetPath); + } + } + + private void moveFileFromDfsToLocal(Path sourcePath, Path targetPath, FileSystem fs, + FileSystem dstFs, HiveConf conf) throws HiveException, IOException { + // RawLocalFileSystem seems not able to get the right permissions for a local file, it + // always returns hdfs default permission (00666). So we can not overwrite a directory + // by deleting and recreating the directory and restoring its permissions. We should + // delete all its files and subdirectories instead. + if (dstFs.exists(targetPath)) { + if (dstFs.isDirectory(targetPath)) { + FileStatus[] destFiles = dstFs.listStatus(targetPath); + for (FileStatus destFile : destFiles) { + if (!dstFs.delete(destFile.getPath(), true)) { + throw new IOException("Unable to clean the destination directory: " + targetPath); + } + } + } else { + throw new HiveException("Target " + targetPath + " is not a local directory."); + } + } else { + if (!FileUtils.mkdir(dstFs, targetPath, conf)) { + throw new HiveException("Failed to create local target directory " + targetPath); + } + } + + if (fs.exists(sourcePath)) { + FileStatus[] srcs = fs.listStatus(sourcePath, FileUtils.HIDDEN_FILES_PATH_FILTER); + for (FileStatus status : srcs) { + fs.copyToLocalFile(status.getPath(), targetPath); + } + } + } + + private Path createTargetPath(Path targetPath, FileSystem fs) throws IOException { + Path deletePath = null; + Path mkDirPath = targetPath.getParent(); + if (mkDirPath != null && !fs.exists(mkDirPath)) { + Path actualPath = mkDirPath; + // targetPath path is /x/y/z/1/2/3 here /x/y/z is present in the file system + // create the structure till /x/y/z/1/2 to work rename for multilevel directory + // and if rename fails delete the path /x/y/z/1 + // If targetPath have multilevel directories like /x/y/z/1/2/3 , /x/y/z/1/2/4 + // the renaming of the directories are not atomic the execution will happen one + // by one + while (actualPath != null && !fs.exists(actualPath)) { + deletePath = actualPath; + actualPath = actualPath.getParent(); + } + fs.mkdirs(mkDirPath); + } + return deletePath; + } + + //it is assumed that parent directory of the destf should already exist when this + //method is called. when the replace value is true, this method works a little different + //from mv command if the destf is a directory, it replaces the destf instead of moving under + //the destf. in this case, the replaced destf still preserves the original destf's permission + @Override + public boolean moveFile(HiveConf conf, Path srcf, Path destf, boolean replace, + boolean isSrcLocal) throws HiveException { + final FileSystem srcFs, destFs; + try { + destFs = destf.getFileSystem(conf); + } catch (IOException e) { + LOG.error("Failed to get dest fs", e); + throw new HiveException(e.getMessage(), e); + } + try { + srcFs = srcf.getFileSystem(conf); + } catch (IOException e) { + LOG.error("Failed to get src fs", e); + throw new HiveException(e.getMessage(), e); + } + + HdfsUtils.HadoopFileStatus destStatus = null; + + // If source path is a subdirectory of the destination path (or the other way around): + // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; + // where the staging directory is a subdirectory of the destination directory + // (1) Do not delete the dest dir before doing the move operation. + // (2) It is assumed that subdir and dir are in same encryption zone. + // (3) Move individual files from scr dir to dest dir. + boolean srcIsSubDirOfDest = Hive.isSubDir(srcf, destf, srcFs, destFs, isSrcLocal), + destIsSubDirOfSrc = Hive.isSubDir(destf, srcf, destFs, srcFs, false); + final String msg = "Unable to move source " + srcf + " to destination " + destf; + try { + if (replace) { + try{ + destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf); + //if destf is an existing directory: + //if replace is true, delete followed by rename(mv) is equivalent to replace + //if replace is false, rename (mv) actually move the src under dest dir + //if destf is an existing file, rename is actually a replace, and do not need + // to delete the file first + if (replace && !srcIsSubDirOfDest) { + destFs.delete(destf, true); + LOG.debug("The path " + destf.toString() + " is deleted"); + } + } catch (FileNotFoundException ignore) { + } + } + final HdfsUtils.HadoopFileStatus desiredStatus = destStatus; + final SessionState parentSession = SessionState.get(); + if (isSrcLocal) { + // For local src file, copy to hdfs + destFs.copyFromLocalFile(srcf, destf); + return true; + } else { + if (needToCopy(srcf, destf, srcFs, destFs)) { + //copy if across file system or encryption zones. + LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); + return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, + true, // delete source + replace, // overwrite destination + conf); + } else { + if (srcIsSubDirOfDest || destIsSubDirOfSrc) { + FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); + + List> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + if (destIsSubDirOfSrc && !destFs.exists(destf)) { + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Creating " + destf); + } + destFs.mkdirs(destf); + } + /* Move files one by one because source is a subdirectory of destination */ + for (final FileStatus srcStatus : srcs) { + + final Path destFile = new Path(destf, srcStatus.getPath().getName()); + + final String poolMsg = + "Unable to move source " + srcStatus.getPath() + " to destination " + destFile; + + if (null == pool) { + boolean success = false; + if (destFs instanceof DistributedFileSystem) { + ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); + success = true; + } else { + destFs.delete(destFile, false); + success = destFs.rename(srcStatus.getPath(), destFile); + } + if(!success) { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest:" + + destf + " returned false"); + } + } else { + futures.add(pool.submit(new Callable() { + @Override + public Void call() throws HiveException { + SessionState.setCurrentSessionState(parentSession); + final String group = srcStatus.getGroup(); + try { + boolean success = false; + if (destFs instanceof DistributedFileSystem) { + ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); + success = true; + } else { + destFs.delete(destFile, false); + success = destFs.rename(srcStatus.getPath(), destFile); + } + if (!success) { + throw new IOException( + "rename for src path: " + srcStatus.getPath() + " to dest path:" + + destFile + " returned false"); + } + } catch (Exception e) { + throw Hive.getHiveException(e, poolMsg); + } + return null; + } + })); + } + } + if (null != pool) { + pool.shutdown(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + throw handlePoolException(pool, e); + } + } + } + return true; + } else { + if (destFs.rename(srcf, destf)) { + return true; + } + return false; + } + } + } + } catch (Exception e) { + throw Hive.getHiveException(e, msg); + } + } + + /** + * Copy files. This handles building the mapping for buckets and such between the source and + * destination + * @param conf Configuration object + * @param srcf source directory, if bucketed should contain bucket files + * @param destf directory to move files into + * @param fs Filesystem + * @param isSrcLocal true if source is on local file system + * @param isAcidIUD true if this is an ACID based Insert/Update/Delete + * @param isOverwrite if true, then overwrite if destination file exist, else add a duplicate copy + * @param newFiles if this is non-null, a list of files that were created as a result of this + * move will be returned. + * @throws HiveException + */ + @Override + public void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, boolean isSrcLocal, + boolean isAcidIUD, boolean isOverwrite, List newFiles, + boolean isBucketed, boolean isFullAcidTable) throws HiveException { + try { + // create the destination if it does not exist + if (!fs.exists(destf)) { + FileUtils.mkdir(fs, destf, conf); + } + } catch (IOException e) { + throw new HiveException( + "copyFiles: error while checking/creating destination directory!!!", + e); + } + + FileStatus[] srcs; + FileSystem srcFs; + try { + srcFs = srcf.getFileSystem(conf); + srcs = srcFs.globStatus(srcf); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e); + } + if (srcs == null) { + LOG.info("No sources specified to move: " + srcf); + return; + // srcs = new FileStatus[0]; Why is this needed? + } + + // If we're moving files around for an ACID write then the rules and paths are all different. + // You can blame this on Owen. + if (isAcidIUD) { + Hive.moveAcidFiles(srcFs, srcs, destf, newFiles); + } else { + // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, + // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. + // The extension is only maintained for files which are compressed. + copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, + newFiles, isFullAcidTable && !isBucketed); + } + } + + private void copyFiles(HiveConf conf, FileSystem destFs, FileStatus[] srcs, + FileSystem srcFs, Path destf, boolean isSrcLocal, + boolean isOverwrite, List newFiles, + boolean acidRename) throws HiveException { + final HdfsUtils.HadoopFileStatus fullDestStatus; + try { + fullDestStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf); + } catch (IOException e1) { + throw new HiveException(e1); + } + + if (!fullDestStatus.getFileStatus().isDirectory()) { + throw new HiveException(destf + " is not a directory."); + } + final List>> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, + // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. + // The extension is only maintained for files which are compressed. + int taskId = 0; + // Sort the files + Arrays.sort(srcs); + for (FileStatus src : srcs) { + FileStatus[] files; + if (src.isDirectory()) { + try { + files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } catch (IOException e) { + if (null != pool) { + pool.shutdownNow(); + } + throw new HiveException(e); + } + } else { + files = new FileStatus[] {src}; + } + + final SessionState parentSession = SessionState.get(); + // Sort the files + Arrays.sort(files); + for (final FileStatus srcFile : files) { + final Path srcP = srcFile.getPath(); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + + final boolean isRenameAllowed = !needToCopy && !isSrcLocal; + + final String msg = "Unable to move source " + srcP + " to destination " + destf; + + // If we do a rename for a non-local file, we will be transfering the original + // file permissions from source to the destination. Else, in case of mvFile() where we + // copy from source to destination, we will inherit the destination's parent group ownership. + if (null == pool) { + try { + Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, + isRenameAllowed, + acidRename ? taskId++ : -1); + + if (null != newFiles) { + newFiles.add(destPath); + } + } catch (Exception e) { + throw Hive.getHiveException(e, msg, "Failed to move: {}"); + } + } else { + // future only takes final or seemingly final values. Make a final copy of taskId + final int finalTaskId = acidRename ? taskId++ : -1; + futures.add(pool.submit(new Callable>() { + @Override + public ObjectPair call() throws HiveException { + SessionState.setCurrentSessionState(parentSession); + + try { + Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, + isRenameAllowed, finalTaskId); + + if (null != newFiles) { + newFiles.add(destPath); + } + return ObjectPair.create(srcP, destPath); + } catch (Exception e) { + throw Hive.getHiveException(e, msg); + } + } + })); + } + } + } + if (null != pool) { + pool.shutdown(); + for (Future> future : futures) { + try { + ObjectPair pair = future.get(); + LOG.debug("Moved src: {}, to dest: {}", pair.getFirst().toString(), pair.getSecond().toString()); + } catch (Exception e) { + throw handlePoolException(pool, e); + } + } + } + } + + /** + * Replaces files in the partition with new data set specified by srcf. Works + * by renaming directory of srcf to the destination file. + * srcf, destf, and tmppath should resident in the same DFS, but the oldPath can be in a + * different DFS. + * + * @param tablePath path of the table. Used to identify permission inheritance. + * @param srcf + * Source directory to be renamed to tmppath. It should be a + * leaf directory where the final data files reside. However it + * could potentially contain subdirectories as well. + * @param destf + * The directory where the final data needs to go + * @param oldPath + * The directory where the old data location, need to be cleaned up. Most of time, will be the same + * as destf, unless its across FileSystem boundaries. + * @param purge + * When set to true files which needs to be deleted are not moved to Trash + * @param isSrcLocal + * If the source directory is LOCAL + * @param newFiles + * Output the list of new files replaced in the destination path + */ + @Override + public void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + boolean isSrcLocal, boolean purge, List newFiles, + PathFilter deletePathFilter, + boolean isNeedRecycle, Hive hive) throws HiveException { + try { + + FileSystem destFs = destf.getFileSystem(conf); + // check if srcf contains nested sub-directories + FileStatus[] srcs; + FileSystem srcFs; + try { + srcFs = srcf.getFileSystem(conf); + srcs = srcFs.globStatus(srcf); + } catch (IOException e) { + throw new HiveException("Getting globStatus " + srcf.toString(), e); + } + if (srcs == null) { + LOG.info("No sources specified to move: " + srcf); + return; + } + + if (oldPath != null) { + hive.deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, + isNeedRecycle); + } + + // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates + // destf + boolean destfExist = FileUtils.mkdir(destFs, destf, conf); + if(!destfExist) { + throw new IOException("Directory " + destf.toString() + + " does not exist and could not be created."); + } + + // Two cases: + // 1. srcs has only a src directory, if rename src directory to destf, we also need to + // Copy/move each file under the source directory to avoid to delete the destination + // directory if it is the root of an HDFS encryption zone. + // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer + // in both cases, we move the file under destf + if (srcs.length == 1 && srcs[0].isDirectory()) { + if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { + throw new IOException("Error moving: " + srcf + " into: " + destf); + } + + // Add file paths of the files that will be moved to the destination if the caller needs it + if (null != newFiles) { + listNewFilesRecursively(destFs, destf, newFiles); + } + } else { + // its either a file or glob + for (FileStatus src : srcs) { + Path destFile = new Path(destf, src.getPath().getName()); + if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) { + throw new IOException("Error moving: " + srcf + " into: " + destf); + } + + // Add file paths of the files that will be moved to the destination if the caller needs it + if (null != newFiles) { + newFiles.add(destFile); + } + } + } + } catch (IOException e) { + throw new HiveException(e.getMessage(), e); + } + } + + /** + *

+ * Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the + * {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if + * {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else + * {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used. + *

+ * + *

+ * If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter + * is an integer starting from 1. + *

+ * + * @param conf the {@link HiveConf} to use if copying data + * @param sourceFs the {@link FileSystem} where the source file exists + * @param sourcePath the {@link Path} to move + * @param destFs the {@link FileSystem} to move the file to + * @param destDirPath the {@link Path} to move the file to + * @param isSrcLocal if the source file is on the local filesystem + * @param isOverwrite if true, then overwrite destination file if exist else make a duplicate copy + * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise + * + * @return the {@link Path} the source file was moved to + * + * @throws IOException if there was an issue moving the file + */ + private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, + Path destDirPath, boolean isSrcLocal, boolean isOverwrite, + boolean isRenameAllowed, int taskId) throws IOException { + + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + final String fullname = sourcePath.getName(); + final String name; + if (taskId == -1) { // non-acid + name = FilenameUtils.getBaseName(sourcePath.getName()); + } else { // acid + name = getPathName(taskId); + } + final String type = FilenameUtils.getExtension(sourcePath.getName()); + + // Incase of ACID, the file is ORC so the extension is not relevant and should not be inherited. + Path destFilePath = new Path(destDirPath, taskId == -1 ? fullname : name); + + /* + * The below loop may perform bad when the destination file already exists and it has too many _copy_ + * files as well. A desired approach was to call listFiles() and get a complete list of files from + * the destination, and check whether the file exists or not on that list. However, millions of files + * could live on the destination directory, and on concurrent situations, this can cause OOM problems. + * + * I'll leave the below loop for now until a better approach is found. + */ + for (int counter = 1; destFs.exists(destFilePath); counter++) { + if (isOverwrite) { + destFs.delete(destFilePath, false); + break; + } + destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + + ((taskId == -1 && !type.isEmpty()) ? "." + type : "")); + } + + if (isRenameAllowed) { + destFs.rename(sourcePath, destFilePath); + } else if (isSrcLocal) { + destFs.copyFromLocalFile(sourcePath, destFilePath); + } else { + FileUtils.copy(sourceFs, sourcePath, destFs, destFilePath, + true, // delete source + false, // overwrite destination + conf); + } + return destFilePath; + } + + /** + * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename. + * TODO- consider if need to do this for different file authority. + * @throws HiveException + */ + static private boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) + throws HiveException { + //Check if different FileSystems + if (!FileUtils.equalsFileSystem(srcFs, destFs)) { + return true; + } + + //Check if different encryption zones + HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs); + HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs); + try { + return srcHdfsEncryptionShim != null + && destHdfsEncryptionShim != null + && (srcHdfsEncryptionShim.isPathEncrypted(srcf) || destHdfsEncryptionShim.isPathEncrypted(destf)) + && !srcHdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf, destHdfsEncryptionShim); + } catch (IOException e) { + throw new HiveException(e); + } + } + + static private HiveException handlePoolException(ExecutorService pool, Exception e) { + HiveException he = null; + + if (e instanceof HiveException) { + he = (HiveException) e; + if (he.getCanonicalErrorMsg() != ErrorMsg.GENERIC_ERROR) { + if (he.getCanonicalErrorMsg() == ErrorMsg.UNRESOLVED_RT_EXCEPTION) { + LOG.error("Failed to move: {}", he.getMessage()); + } else { + LOG.error("Failed to move: {}", he.getRemoteErrorMsg()); + } + } + } else { + LOG.error("Failed to move: {}", e.getMessage()); + he = new HiveException(e.getCause()); + } + pool.shutdownNow(); + return he; + } + + // List the new files in destination path which gets copied from source. + private static void listNewFilesRecursively(final FileSystem destFs, Path dest, + List newFiles) throws HiveException { + try { + for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) { + if (fileStatus.isDirectory()) { + // If it is a sub-directory, then recursively list the files. + listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles); + } else { + newFiles.add(fileStatus.getPath()); + } + } + } catch (IOException e) { + LOG.error("Failed to get source file statuses", e); + throw new HiveException(e.getMessage(), e); + } + } + + private static String getPathName(int taskId) { + return Utilities.replaceTaskId("000000", taskId) + "_0"; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index 09cbf32f9c..683db2a250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,6 +74,7 @@ public static void close() { private final List udfs; private Reporter reporter; + private TaskAttemptID taskAttemptID; protected MapredContext(boolean isMap, JobConf jobConf) { this.isMap = isMap; @@ -105,6 +107,14 @@ public void setReporter(Reporter reporter) { this.reporter = reporter; } + public TaskAttemptID getTaskAttemptID() { + return this.taskAttemptID; + } + + public void setTaskAttemptID(TaskAttemptID taskAttemptID) { + this.taskAttemptID = taskAttemptID; + } + private void registerCloseable(Closeable closeable) { udfs.add(closeable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index dbda5fdef4..d38ea67f7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -85,122 +85,6 @@ public MoveTask() { super(); } - private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir) - throws HiveException { - try { - String mesg = "Moving data to " + (isDfsDir ? "" : "local ") + "directory " - + targetPath.toString(); - String mesg_detail = " from " + sourcePath.toString(); - console.printInfo(mesg, mesg_detail); - - FileSystem fs = sourcePath.getFileSystem(conf); - if (isDfsDir) { - moveFileInDfs (sourcePath, targetPath, conf); - } else { - // This is a local file - FileSystem dstFs = FileSystem.getLocal(conf); - moveFileFromDfsToLocal(sourcePath, targetPath, fs, dstFs); - } - } catch (Exception e) { - throw new HiveException("Unable to move source " + sourcePath + " to destination " - + targetPath, e); - } - } - - private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) - throws HiveException, IOException { - - final FileSystem srcFs, tgtFs; - try { - tgtFs = targetPath.getFileSystem(conf); - } catch (IOException e) { - LOG.error("Failed to get dest fs", e); - throw new HiveException(e.getMessage(), e); - } - try { - srcFs = sourcePath.getFileSystem(conf); - } catch (IOException e) { - LOG.error("Failed to get src fs", e); - throw new HiveException(e.getMessage(), e); - } - - // if source exists, rename. Otherwise, create a empty directory - if (srcFs.exists(sourcePath)) { - Path deletePath = null; - // If it multiple level of folder are there fs.rename is failing so first - // create the targetpath.getParent() if it not exist - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { - deletePath = createTargetPath(targetPath, tgtFs); - } - Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { - try { - if (deletePath != null) { - tgtFs.delete(deletePath, true); - } - } catch (IOException e) { - LOG.info("Unable to delete the path created for facilitating rename: {}", - deletePath); - } - throw new HiveException("Unable to rename: " + sourcePath - + " to: " + targetPath); - } - } else if (!tgtFs.mkdirs(targetPath)) { - throw new HiveException("Unable to make directory: " + targetPath); - } - } - - private void moveFileFromDfsToLocal(Path sourcePath, Path targetPath, FileSystem fs, - FileSystem dstFs) throws HiveException, IOException { - // RawLocalFileSystem seems not able to get the right permissions for a local file, it - // always returns hdfs default permission (00666). So we can not overwrite a directory - // by deleting and recreating the directory and restoring its permissions. We should - // delete all its files and subdirectories instead. - if (dstFs.exists(targetPath)) { - if (dstFs.isDirectory(targetPath)) { - FileStatus[] destFiles = dstFs.listStatus(targetPath); - for (FileStatus destFile : destFiles) { - if (!dstFs.delete(destFile.getPath(), true)) { - throw new IOException("Unable to clean the destination directory: " + targetPath); - } - } - } else { - throw new HiveException("Target " + targetPath + " is not a local directory."); - } - } else { - if (!FileUtils.mkdir(dstFs, targetPath, conf)) { - throw new HiveException("Failed to create local target directory " + targetPath); - } - } - - if (fs.exists(sourcePath)) { - FileStatus[] srcs = fs.listStatus(sourcePath, FileUtils.HIDDEN_FILES_PATH_FILTER); - for (FileStatus status : srcs) { - fs.copyToLocalFile(status.getPath(), targetPath); - } - } - } - - private Path createTargetPath(Path targetPath, FileSystem fs) throws IOException { - Path deletePath = null; - Path mkDirPath = targetPath.getParent(); - if (mkDirPath != null && !fs.exists(mkDirPath)) { - Path actualPath = mkDirPath; - // targetPath path is /x/y/z/1/2/3 here /x/y/z is present in the file system - // create the structure till /x/y/z/1/2 to work rename for multilevel directory - // and if rename fails delete the path /x/y/z/1 - // If targetPath have multilevel directories like /x/y/z/1/2/3 , /x/y/z/1/2/4 - // the renaming of the directories are not atomic the execution will happen one - // by one - while (actualPath != null && !fs.exists(actualPath)) { - deletePath = actualPath; - actualPath = actualPath.getParent(); - } - fs.mkdirs(mkDirPath); - } - return deletePath; - } - // Release all the locks acquired for this object // This becomes important for multi-table inserts when one branch may take much more // time than the others. It is better to release the lock for this particular insert. @@ -282,6 +166,15 @@ public int execute(DriverContext driverContext) { } Hive db = getHive(); + + DataCommitter dataCommitter; + if (work.getPathOutputCommitterWork() != null) { + dataCommitter = new PathOutputCommitterDataCommitter(work.getPathOutputCommitterWork() + .getJobContext(), work.getPathOutputCommitterWork().createPathOutputCommitter()); + } else { + dataCommitter = new HiveDataCommitter(); + } + // Do any hive related operations like moving tables and files // to appropriate locations LoadFileDesc lfd = work.getLoadFileWork(); @@ -306,7 +199,7 @@ public int execute(DriverContext driverContext) { } } else { - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + dataCommitter.moveFile(sourcePath, targetPath, lfd.getIsDfsDir(), conf, console); } } } @@ -327,7 +220,7 @@ public int execute(DriverContext driverContext) { destFs.mkdirs(destPath.getParent()); } Utilities.FILE_OP_LOGGER.debug("MoveTask moving (multi-file) " + srcPath + " to " + destPath); - moveFile(srcPath, destPath, isDfsDir); + dataCommitter.moveFile(srcPath, destPath, isDfsDir, conf, console); } else { if (!destFs.exists(destPath)) { destFs.mkdirs(destPath); @@ -339,7 +232,7 @@ public int execute(DriverContext driverContext) { Path childSrc = child.getPath(); Path childDest = new Path(destPath, filePrefix + childSrc.getName()); Utilities.FILE_OP_LOGGER.debug("MoveTask moving (multi-file) " + childSrc + " to " + childDest); - moveFile(childSrc, childDest, isDfsDir); + dataCommitter.moveFile(childSrc, childDest, isDfsDir, conf, console); } } else { Utilities.FILE_OP_LOGGER.debug("MoveTask skipping empty directory (multi-file) " + srcPath); @@ -372,7 +265,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), dataCommitter); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -386,9 +279,9 @@ public int execute(DriverContext driverContext) { // deal with dynamic partitions DynamicPartitionCtx dpCtx = tbd.getDPCtx(); if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions - dc = handleDynParts(db, table, tbd, ti, dpCtx); + dc = handleDynParts(db, table, tbd, ti, dpCtx, dataCommitter); } else { // static partitions - dc = handleStaticParts(db, table, tbd, ti); + dc = handleStaticParts(db, table, tbd, ti, dataCommitter); } } if (dc != null) { @@ -460,7 +353,8 @@ public void logMessage(LoadTableDesc tbd) { } private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, - TaskInformation ti) throws HiveException, IOException, InvalidOperationException { + TaskInformation ti, DataCommitter dataCommitter) throws HiveException, IOException, + InvalidOperationException { List partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { @@ -474,7 +368,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), dataCommitter); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); // See the comment inside updatePartitionBucketSortColumns. @@ -493,7 +387,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, } private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, - TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException, + TaskInformation ti, DynamicPartitionCtx dpCtx, DataCommitter dataCommitter) throws HiveException, IOException, InvalidOperationException { DataContainer dc; List> dps = Utilities.getFullDPSpecs(conf, dpCtx); @@ -521,7 +415,8 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, tbd.getStmtId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType(), - tbd.isInsertOverwrite()); + tbd.isInsertOverwrite(), + dataCommitter); // publish DP columns to its subscribers if (dps != null && dps.size() > 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java new file mode 100644 index 0000000000..be849547cf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterDataCommitter.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +import java.io.IOException; +import java.util.List; + +/** + * A {@link DataCommitter} that commits Hive data using a {@link PathOutputCommitter}. + */ +class PathOutputCommitterDataCommitter implements DataCommitter { + + private final JobContext jobContext; + private final PathOutputCommitter pathOutputCommitter; + + PathOutputCommitterDataCommitter(JobContext jobContext, + PathOutputCommitter pathOutputCommitter) { + this.jobContext = jobContext; + this.pathOutputCommitter = pathOutputCommitter; + } + + @Override + public void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir, HiveConf conf, + SessionState.LogHelper console) throws HiveException { + commitJob(); + } + + @Override + public boolean moveFile(HiveConf conf, Path srcf, Path destf, boolean replace, + boolean isSrcLocal) throws HiveException { + commitJob(); + return true; + } + + @Override + public void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, boolean isSrcLocal, + boolean isAcidIUD, boolean isOverwrite, List newFiles, + boolean isBucketed, boolean isFullAcidTable) throws HiveException { + commitJob(); + } + + @Override + public void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, + boolean isSrcLocal, boolean purge, List newFiles, + PathFilter deletePathFilter, boolean isNeedRecycle, Hive hive) throws HiveException { + commitJob(); + } + + private void commitJob() throws HiveException { + try { + this.pathOutputCommitter.commitJob(this.jobContext); + } catch (IOException e) { + throw new HiveException(e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java new file mode 100644 index 0000000000..dc1c62bf3d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PathOutputCommitterSetupTask extends Task { + + private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterSetupTask.class); + + private static final long serialVersionUID = -8867710739987754989L; + + @Override + protected int execute(DriverContext driverContext) { + try { + LOG.info("Running setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName()); + work.createPathOutputCommitter().setupJob(getWork().getJobContext()); + } catch (Exception e) { + LOG.error("Failed run setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName(), e); + setException(e); + return 1; + } + return 0; + } + + @Override + public StageType getType() { + return null; + } + + @Override + public String getName() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 3a107b7e81..51562f3c42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -113,6 +114,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); + taskvec.add(new TaskTuple<>(PathOutputCommitterWork.class, PathOutputCommitterSetupTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 99b33a3aad..1a60f3e46f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +74,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - try { @@ -110,6 +113,7 @@ public void configure(JobConf job) { execContext.setLocalWork(localWork); MapredContext.init(true, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); mo.passExecContext(execContext); mo.initializeLocalWork(jc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 829006d375..c0b9e34a87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -90,6 +91,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -141,6 +144,7 @@ public void configure(JobConf job) { } MapredContext.init(false, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); // initialize reduce operator tree try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e8554f9ce5..ec2d00f0c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -169,9 +169,11 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; +import org.apache.hadoop.hive.ql.exec.DataCommitter; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.HiveDataCommitter; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -1647,7 +1649,7 @@ public Database getDatabaseCurrent() throws HiveException { public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, - int stmtId, boolean isInsertOverwrite) throws HiveException { + int stmtId, boolean isInsertOverwrite, DataCommitter dataCommitter) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -1737,13 +1739,13 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + dataCommitter.replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), + isSrcLocal, isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary(), this); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable); + dataCommitter.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, + tbl.getNumBuckets() > 0, isFullAcidTable); } } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); @@ -2120,7 +2122,7 @@ private void constructOneLBLocationMap(FileStatus fSta, final String tableName, final Map partSpec, final LoadFileType loadFileType, final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId, final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, - boolean isInsertOverwrite) throws HiveException { + boolean isInsertOverwrite, DataCommitter dataCommitter) throws HiveException { final Map, Partition> partitionsMap = Collections.synchronizedMap(new LinkedHashMap, Partition>()); @@ -2169,7 +2171,7 @@ public Void call() throws Exception { // load the partition Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType, true, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId, - isInsertOverwrite); + isInsertOverwrite, dataCommitter); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { @@ -2263,7 +2265,8 @@ public Void call() throws Exception { */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, - Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + Long writeId, int stmtId, boolean isInsertOverwrite, DataCommitter committer) + throws HiveException { List newFiles = null; Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -2304,12 +2307,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, - newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + committer.replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, + newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary(), this); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + committer.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); } catch (IOException e) { @@ -3257,109 +3260,8 @@ public PrincipalPrivilegeSet get_privilege_set(HiveObjectType objectType, } } - private static void copyFiles(final HiveConf conf, final FileSystem destFs, - FileStatus[] srcs, final FileSystem srcFs, final Path destf, - final boolean isSrcLocal, boolean isOverwrite, - final List newFiles, boolean acidRename) throws HiveException { - - final HdfsUtils.HadoopFileStatus fullDestStatus; - try { - fullDestStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf); - } catch (IOException e1) { - throw new HiveException(e1); - } - - if (!fullDestStatus.getFileStatus().isDirectory()) { - throw new HiveException(destf + " is not a directory."); - } - final List>> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; - // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, - // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. - // The extension is only maintained for files which are compressed. - int taskId = 0; - // Sort the files - Arrays.sort(srcs); - for (FileStatus src : srcs) { - FileStatus[] files; - if (src.isDirectory()) { - try { - files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } catch (IOException e) { - if (null != pool) { - pool.shutdownNow(); - } - throw new HiveException(e); - } - } else { - files = new FileStatus[] {src}; - } - - final SessionState parentSession = SessionState.get(); - // Sort the files - Arrays.sort(files); - for (final FileStatus srcFile : files) { - final Path srcP = srcFile.getPath(); - final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); - - final boolean isRenameAllowed = !needToCopy && !isSrcLocal; - - final String msg = "Unable to move source " + srcP + " to destination " + destf; - - // If we do a rename for a non-local file, we will be transfering the original - // file permissions from source to the destination. Else, in case of mvFile() where we - // copy from source to destination, we will inherit the destination's parent group ownership. - if (null == pool) { - try { - Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed, - acidRename ? taskId++ : -1); - - if (null != newFiles) { - newFiles.add(destPath); - } - } catch (Exception e) { - throw getHiveException(e, msg, "Failed to move: {}"); - } - } else { - // future only takes final or seemingly final values. Make a final copy of taskId - final int finalTaskId = acidRename ? taskId++ : -1; - futures.add(pool.submit(new Callable>() { - @Override - public ObjectPair call() throws HiveException { - SessionState.setCurrentSessionState(parentSession); - - try { - Path destPath = - mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isOverwrite, isRenameAllowed, finalTaskId); - - if (null != newFiles) { - newFiles.add(destPath); - } - return ObjectPair.create(srcP, destPath); - } catch (Exception e) { - throw getHiveException(e, msg); - } - } - })); - } - } - } - if (null != pool) { - pool.shutdown(); - for (Future> future : futures) { - try { - ObjectPair pair = future.get(); - LOG.debug("Moved src: {}, to dest: {}", pair.getFirst().toString(), pair.getSecond().toString()); - } catch (Exception e) { - throw handlePoolException(pool, e); - } - } - } - } - - private static boolean isSubDir(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs, boolean isSrcLocal) { + public static boolean isSubDir(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs, + boolean isSrcLocal) { if (srcf == null) { LOG.debug("The source path is null for isSubDir method."); return false; @@ -3402,84 +3304,6 @@ private static Path getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileSys return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path); } - private static String getPathName(int taskId) { - return Utilities.replaceTaskId("000000", taskId) + "_0"; - } - - /** - *

- * Moves a file from one {@link Path} to another. If {@code isRenameAllowed} is true then the - * {@link FileSystem#rename(Path, Path)} method is used to move the file. If its false then the data is copied, if - * {@code isSrcLocal} is true then the {@link FileSystem#copyFromLocalFile(Path, Path)} method is used, else - * {@link FileUtils#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, HiveConf)} is used. - *

- * - *

- * If the destination file already exists, then {@code _copy_[counter]} is appended to the file name, where counter - * is an integer starting from 1. - *

- * - * @param conf the {@link HiveConf} to use if copying data - * @param sourceFs the {@link FileSystem} where the source file exists - * @param sourcePath the {@link Path} to move - * @param destFs the {@link FileSystem} to move the file to - * @param destDirPath the {@link Path} to move the file to - * @param isSrcLocal if the source file is on the local filesystem - * @param isOverwrite if true, then overwrite destination file if exist else make a duplicate copy - * @param isRenameAllowed true if the data should be renamed and not copied, false otherwise - * - * @return the {@link Path} the source file was moved to - * - * @throws IOException if there was an issue moving the file - */ - private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem destFs, Path destDirPath, - boolean isSrcLocal, boolean isOverwrite, boolean isRenameAllowed, - int taskId) throws IOException { - - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - final String fullname = sourcePath.getName(); - final String name; - if (taskId == -1) { // non-acid - name = FilenameUtils.getBaseName(sourcePath.getName()); - } else { // acid - name = getPathName(taskId); - } - final String type = FilenameUtils.getExtension(sourcePath.getName()); - - // Incase of ACID, the file is ORC so the extension is not relevant and should not be inherited. - Path destFilePath = new Path(destDirPath, taskId == -1 ? fullname : name); - - /* - * The below loop may perform bad when the destination file already exists and it has too many _copy_ - * files as well. A desired approach was to call listFiles() and get a complete list of files from - * the destination, and check whether the file exists or not on that list. However, millions of files - * could live on the destination directory, and on concurrent situations, this can cause OOM problems. - * - * I'll leave the below loop for now until a better approach is found. - */ - for (int counter = 1; destFs.exists(destFilePath); counter++) { - if (isOverwrite) { - destFs.delete(destFilePath, false); - break; - } - destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) + - ((taskId == -1 && !type.isEmpty()) ? "." + type : "")); - } - - if (isRenameAllowed) { - destFs.rename(sourcePath, destFilePath); - } else if (isSrcLocal) { - destFs.copyFromLocalFile(sourcePath, destFilePath); - } else { - FileUtils.copy(sourceFs, sourcePath, destFs, destFilePath, - true, // delete source - false, // overwrite destination - conf); - } - return destFilePath; - } - // Clears the dest dir when src is sub-dir of dest. public static void clearDestForSubDirSrc(final HiveConf conf, Path dest, Path src, boolean isSrcLocal) throws IOException { @@ -3505,24 +3329,6 @@ public static void clearDestForSubDirSrc(final HiveConf conf, Path dest, } } - // List the new files in destination path which gets copied from source. - public static void listNewFilesRecursively(final FileSystem destFs, Path dest, - List newFiles) throws HiveException { - try { - for (FileStatus fileStatus : destFs.listStatus(dest, FileUtils.HIDDEN_FILES_PATH_FILTER)) { - if (fileStatus.isDirectory()) { - // If it is a sub-directory, then recursively list the files. - listNewFilesRecursively(destFs, fileStatus.getPath(), newFiles); - } else { - newFiles.add(fileStatus.getPath()); - } - } - } catch (IOException e) { - LOG.error("Failed to get source file statuses", e); - throw new HiveException(e.getMessage(), e); - } - } - /** * Recycles the files recursively from the input path to the cmroot directory either by copying or moving it. * @@ -3539,179 +3345,11 @@ public void recycleDirToCmPath(Path dataPath, boolean isPurge) throws HiveExcept } } - //it is assumed that parent directory of the destf should already exist when this - //method is called. when the replace value is true, this method works a little different - //from mv command if the destf is a directory, it replaces the destf instead of moving under - //the destf. in this case, the replaced destf still preserves the original destf's permission - public static boolean moveFile(final HiveConf conf, Path srcf, final Path destf, boolean replace, - boolean isSrcLocal) throws HiveException { - final FileSystem srcFs, destFs; - try { - destFs = destf.getFileSystem(conf); - } catch (IOException e) { - LOG.error("Failed to get dest fs", e); - throw new HiveException(e.getMessage(), e); - } - try { - srcFs = srcf.getFileSystem(conf); - } catch (IOException e) { - LOG.error("Failed to get src fs", e); - throw new HiveException(e.getMessage(), e); - } - - HdfsUtils.HadoopFileStatus destStatus = null; - - // If source path is a subdirectory of the destination path (or the other way around): - // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; - // where the staging directory is a subdirectory of the destination directory - // (1) Do not delete the dest dir before doing the move operation. - // (2) It is assumed that subdir and dir are in same encryption zone. - // (3) Move individual files from scr dir to dest dir. - boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal), - destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false); - final String msg = "Unable to move source " + srcf + " to destination " + destf; - try { - if (replace) { - try{ - destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf); - //if destf is an existing directory: - //if replace is true, delete followed by rename(mv) is equivalent to replace - //if replace is false, rename (mv) actually move the src under dest dir - //if destf is an existing file, rename is actually a replace, and do not need - // to delete the file first - if (replace && !srcIsSubDirOfDest) { - destFs.delete(destf, true); - LOG.debug("The path " + destf.toString() + " is deleted"); - } - } catch (FileNotFoundException ignore) { - } - } - final HdfsUtils.HadoopFileStatus desiredStatus = destStatus; - final SessionState parentSession = SessionState.get(); - if (isSrcLocal) { - // For local src file, copy to hdfs - destFs.copyFromLocalFile(srcf, destf); - return true; - } else { - if (needToCopy(srcf, destf, srcFs, destFs)) { - //copy if across file system or encryption zones. - LOG.debug("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different."); - return FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf, - true, // delete source - replace, // overwrite destination - conf); - } else { - if (srcIsSubDirOfDest || destIsSubDirOfSrc) { - FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); - - List> futures = new LinkedList<>(); - final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? - Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; - if (destIsSubDirOfSrc && !destFs.exists(destf)) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Creating " + destf); - } - destFs.mkdirs(destf); - } - /* Move files one by one because source is a subdirectory of destination */ - for (final FileStatus srcStatus : srcs) { - - final Path destFile = new Path(destf, srcStatus.getPath().getName()); - - final String poolMsg = - "Unable to move source " + srcStatus.getPath() + " to destination " + destFile; - - if (null == pool) { - boolean success = false; - if (destFs instanceof DistributedFileSystem) { - ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); - success = true; - } else { - destFs.delete(destFile, false); - success = destFs.rename(srcStatus.getPath(), destFile); - } - if(!success) { - throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest:" - + destf + " returned false"); - } - } else { - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws HiveException { - SessionState.setCurrentSessionState(parentSession); - final String group = srcStatus.getGroup(); - try { - boolean success = false; - if (destFs instanceof DistributedFileSystem) { - ((DistributedFileSystem)destFs).rename(srcStatus.getPath(), destFile, Options.Rename.OVERWRITE); - success = true; - } else { - destFs.delete(destFile, false); - success = destFs.rename(srcStatus.getPath(), destFile); - } - if (!success) { - throw new IOException( - "rename for src path: " + srcStatus.getPath() + " to dest path:" - + destFile + " returned false"); - } - } catch (Exception e) { - throw getHiveException(e, poolMsg); - } - return null; - } - })); - } - } - if (null != pool) { - pool.shutdown(); - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - throw handlePoolException(pool, e); - } - } - } - return true; - } else { - if (destFs.rename(srcf, destf)) { - return true; - } - return false; - } - } - } - } catch (Exception e) { - throw getHiveException(e, msg); - } - } - - static private HiveException getHiveException(Exception e, String msg) { + static public HiveException getHiveException(Exception e, String msg) { return getHiveException(e, msg, null); } - static private HiveException handlePoolException(ExecutorService pool, Exception e) { - HiveException he = null; - - if (e instanceof HiveException) { - he = (HiveException) e; - if (he.getCanonicalErrorMsg() != ErrorMsg.GENERIC_ERROR) { - if (he.getCanonicalErrorMsg() == ErrorMsg.UNRESOLVED_RT_EXCEPTION) { - LOG.error("Failed to move: {}", he.getMessage()); - } else { - LOG.error("Failed to move: {}", he.getRemoteErrorMsg()); - } - } - } else { - LOG.error("Failed to move: {}", e.getMessage()); - he = new HiveException(e.getCause()); - } - pool.shutdownNow(); - return he; - } - - static private HiveException getHiveException(Exception e, String msg, String logMsg) { + static public HiveException getHiveException(Exception e, String msg, String logMsg) { // The message from remote exception includes the entire stack. The error thrown from // hive based on the remote exception needs only the first line. String hiveErrMsg = null; @@ -3737,87 +3375,6 @@ static private HiveException getHiveException(Exception e, String msg, String lo } } - /** - * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename. - * TODO- consider if need to do this for different file authority. - * @throws HiveException - */ - static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException { - //Check if different FileSystems - if (!FileUtils.equalsFileSystem(srcFs, destFs)) { - return true; - } - - //Check if different encryption zones - HadoopShims.HdfsEncryptionShim srcHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(srcFs); - HadoopShims.HdfsEncryptionShim destHdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim(destFs); - try { - return srcHdfsEncryptionShim != null - && destHdfsEncryptionShim != null - && (srcHdfsEncryptionShim.isPathEncrypted(srcf) || destHdfsEncryptionShim.isPathEncrypted(destf)) - && !srcHdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf, destHdfsEncryptionShim); - } catch (IOException e) { - throw new HiveException(e); - } - } - - /** - * Copy files. This handles building the mapping for buckets and such between the source and - * destination - * @param conf Configuration object - * @param srcf source directory, if bucketed should contain bucket files - * @param destf directory to move files into - * @param fs Filesystem - * @param isSrcLocal true if source is on local file system - * @param isAcidIUD true if this is an ACID based Insert/Update/Delete - * @param isOverwrite if true, then overwrite if destination file exist, else add a duplicate copy - * @param newFiles if this is non-null, a list of files that were created as a result of this - * move will be returned. - * @throws HiveException - */ - static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, - boolean isSrcLocal, boolean isAcidIUD, - boolean isOverwrite, List newFiles, boolean isBucketed, - boolean isFullAcidTable) throws HiveException { - try { - // create the destination if it does not exist - if (!fs.exists(destf)) { - FileUtils.mkdir(fs, destf, conf); - } - } catch (IOException e) { - throw new HiveException( - "copyFiles: error while checking/creating destination directory!!!", - e); - } - - FileStatus[] srcs; - FileSystem srcFs; - try { - srcFs = srcf.getFileSystem(conf); - srcs = srcFs.globStatus(srcf); - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e); - } - if (srcs == null) { - LOG.info("No sources specified to move: " + srcf); - return; - // srcs = new FileStatus[0]; Why is this needed? - } - - // If we're moving files around for an ACID write then the rules and paths are all different. - // You can blame this on Owen. - if (isAcidIUD) { - moveAcidFiles(srcFs, srcs, destf, newFiles); - } else { - // For ACID non-bucketed case, the filenames have to be in the format consistent with INSERT/UPDATE/DELETE Ops, - // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. - // The extension is only maintained for files which are compressed. - copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, - newFiles, isFullAcidTable && !isBucketed); - } - } - public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket @@ -3948,96 +3505,7 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F } } - /** - * Replaces files in the partition with new data set specified by srcf. Works - * by renaming directory of srcf to the destination file. - * srcf, destf, and tmppath should resident in the same DFS, but the oldPath can be in a - * different DFS. - * - * @param tablePath path of the table. Used to identify permission inheritance. - * @param srcf - * Source directory to be renamed to tmppath. It should be a - * leaf directory where the final data files reside. However it - * could potentially contain subdirectories as well. - * @param destf - * The directory where the final data needs to go - * @param oldPath - * The directory where the old data location, need to be cleaned up. Most of time, will be the same - * as destf, unless its across FileSystem boundaries. - * @param purge - * When set to true files which needs to be deleted are not moved to Trash - * @param isSrcLocal - * If the source directory is LOCAL - * @param newFiles - * Output the list of new files replaced in the destination path - */ - protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, - boolean isSrcLocal, boolean purge, List newFiles, PathFilter deletePathFilter, - boolean isNeedRecycle) throws HiveException { - try { - - FileSystem destFs = destf.getFileSystem(conf); - // check if srcf contains nested sub-directories - FileStatus[] srcs; - FileSystem srcFs; - try { - srcFs = srcf.getFileSystem(conf); - srcs = srcFs.globStatus(srcf); - } catch (IOException e) { - throw new HiveException("Getting globStatus " + srcf.toString(), e); - } - if (srcs == null) { - LOG.info("No sources specified to move: " + srcf); - return; - } - - if (oldPath != null) { - deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isNeedRecycle); - } - - // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates - // destf - boolean destfExist = FileUtils.mkdir(destFs, destf, conf); - if(!destfExist) { - throw new IOException("Directory " + destf.toString() - + " does not exist and could not be created."); - } - - // Two cases: - // 1. srcs has only a src directory, if rename src directory to destf, we also need to - // Copy/move each file under the source directory to avoid to delete the destination - // directory if it is the root of an HDFS encryption zone. - // 2. srcs must be a list of files -- ensured by LoadSemanticAnalyzer - // in both cases, we move the file under destf - if (srcs.length == 1 && srcs[0].isDirectory()) { - if (!moveFile(conf, srcs[0].getPath(), destf, true, isSrcLocal)) { - throw new IOException("Error moving: " + srcf + " into: " + destf); - } - - // Add file paths of the files that will be moved to the destination if the caller needs it - if (null != newFiles) { - listNewFilesRecursively(destFs, destf, newFiles); - } - } else { - // its either a file or glob - for (FileStatus src : srcs) { - Path destFile = new Path(destf, src.getPath().getName()); - if (!moveFile(conf, src.getPath(), destFile, true, isSrcLocal)) { - throw new IOException("Error moving: " + srcf + " into: " + destf); - } - - // Add file paths of the files that will be moved to the destination if the caller needs it - if (null != newFiles) { - newFiles.add(destFile); - } - } - } - } catch (IOException e) { - throw new HiveException(e.getMessage(), e); - } - } - - private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge, + public void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge, PathFilter pathFilter, boolean isNeedRecycle) throws HiveException { Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath + " and old path " + oldPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java new file mode 100644 index 0000000000..e4d3818ae3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; + +import org.apache.hadoop.hive.common.BlobStorageUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PathOutputCommitterSetupTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.stream.Collectors; + + +public class PathOutputCommitterResolver implements PhysicalPlanResolver { + + private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterResolver.class); + + // Useful for testing + private static final String HIVE_BLOBSTORE_COMMIT_DISABLE_EXPLAIN = "hive.blobstore.commit." + + "disable.explain"; + + private final Map, Collection> taskToFsOps = new HashMap<>(); + private final List> mvTasks = new ArrayList<>(); + private HiveConf hconf; + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + this.hconf = pctx.getConf(); + + // Collect all MoveTasks and FSOPs + TaskGraphWalker graphWalker = new TaskGraphWalker(new PathOutputCommitterDispatcher()); + List rootTasks = new ArrayList<>(pctx.getRootTasks()); + graphWalker.startWalking(rootTasks, null); + + // Find MoveTasks with no child MoveTask + List> sinkMoveTasks = mvTasks.stream() + .filter(mvTask -> !containsChildTask(mvTask.getChildTasks(), MoveTask.class)) + .collect(Collectors.toList()); + + // Iterate through each FSOP + for (Map.Entry, Collection> entry : taskToFsOps.entrySet()) { + for (FileSinkOperator fsOp : entry.getValue()) { + try { + processFsOp(entry.getKey(), fsOp, sinkMoveTasks); + } catch (HiveException | MetaException e) { + throw new SemanticException(e); + } + } + } + + return pctx; + } + + private boolean containsChildTask(List> mvTasks, Class + taskClass) { + if (mvTasks == null) { + return false; + } + boolean containsChildTask = false; + for (Task mvTask : mvTasks) { + if (taskClass.isInstance(mvTask)) { + return true; + } + containsChildTask = containsChildTask(mvTask.getChildTasks(), taskClass); + } + return containsChildTask; + } + + private class PathOutputCommitterDispatcher implements Dispatcher { + + @Override + public Object dispatch(Node nd, Stack stack, + Object... nodeOutputs) throws SemanticException { + + Task task = (Task) nd; + Collection fsOps = getAllFsOps(task); + if (!fsOps.isEmpty()) { + taskToFsOps.put((Task) nd, fsOps); + } + if (nd instanceof MoveTask) { + mvTasks.add((MoveTask) nd); + } + return null; + } + } + + private Collection getAllFsOps(Task task) { + Collection> fsOps = new ArrayList<>(); + if (task instanceof MapRedTask) { + fsOps.addAll(((MapRedTask) task).getWork().getAllOperators()); + } else if (task instanceof SparkTask) { + for (BaseWork work : ((SparkTask) task).getWork().getAllWork()) { + fsOps.addAll(work.getAllOperators()); + } + } + return fsOps.stream() + .filter(FileSinkOperator.class::isInstance) + .map(FileSinkOperator.class::cast) + .collect(Collectors.toList()); + } + + private void processFsOp(Task task, FileSinkOperator fsOp, + List> sinkMoveTasks) throws HiveException, MetaException { + FileSinkDesc fileSinkDesc = fsOp.getConf(); + + // Get the MoveTask that will process the output of the fsOp + Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(sinkMoveTasks, + fileSinkDesc.getFinalDirName(), fileSinkDesc.isMmTable()); + + if (mvTask != null) { + + MoveWork mvWork = mvTask.getWork(); + + // Don't process the mvTask if it requires committing data for DP queries + if (mvWork.getLoadMultiFilesWork() == null && (mvWork.getLoadTableWork() == null || mvWork + .getLoadTableWork().getDPCtx() == null)) { + + // The final output path we will commit data to + Path outputPath = null; + + // Instead of picking between load table work and load file work, throw an exception if + // they are both set (this should never happen) + if (mvWork.getLoadTableWork() != null && mvWork.getLoadFileWork() != null) { + throw new IllegalArgumentException("Load Table Work and Load File Work cannot both be " + + "set"); + } + + // If there is a load file work, get its output path + if (mvWork.getLoadFileWork() != null) { + outputPath = getLoadFileOutputPath(mvWork); + } + + // If there is a load table work, get is output path + if (mvTask.getWork().getLoadTableWork() != null) { + outputPath = getLoadTableOutputPath(mvWork); + } + if (outputPath != null) { + if (BlobStorageUtils.isBlobStoragePath(hconf, outputPath)) { + + // All s3a specific logic should be place in the method below, for all filesystems or + // output committer implementations, a similar pattern should be followed + if ("s3a".equals(outputPath.toUri().getScheme())) { + setupS3aOutputCommitter(mvWork); + } + + PathOutputCommitterWork setupWork = createPathOutputCommitterWork(outputPath); + mvWork.setPathOutputCommitterWork(setupWork); + + fileSinkDesc.setHasOutputCommitter(true); + fileSinkDesc.setTargetDirName(outputPath.toString()); + + LOG.info("Using Output Committer " + setupWork.getPathOutputCommitterClass() + + " for MoveTask: " + mvTask + ", FileSinkOperator: " + fsOp + " and output " + + "path: " + outputPath); + + if (hconf.getBoolean(HIVE_BLOBSTORE_COMMIT_DISABLE_EXPLAIN, false)) { + PathOutputCommitterSetupTask setupTask = new PathOutputCommitterSetupTask(); + setupTask.setWork(setupWork); + setupTask.executeTask(null); + } else { + task.addDependentTask(TaskFactory.get(setupWork)); + } + } + } + } + } + } + + private PathOutputCommitterWork createPathOutputCommitterWork(Path outputPath) { + JobID jobID = new JobID(); + TaskAttemptContext taskAttemptContext = createTaskAttemptContext(jobID); + JobContext jobContext = new JobContextImpl(hconf, jobID); + + return new PathOutputCommitterWork(outputPath.toString(), + jobContext, taskAttemptContext); + } + + /** + * Setups any necessary configuration specific to a s3a. All s3a specific logic should be + * encapsulated within this method. + */ + private void setupS3aOutputCommitter(MoveWork mvWork) { + if (mvWork.getLoadTableWork() != null && mvWork.getLoadTableWork() + .isInsertOverwrite()) { + hconf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CommitConstants. + CONFLICT_MODE_REPLACE); + } else { + hconf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CommitConstants + .CONFLICT_MODE_APPEND); + } + + // We set this to false because its better for Hive to have more control over the file + // names given the that the JobID we are creating has no meaning + hconf.setBoolean(CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, false); + } + + // Not sure if this is the best way to do this, somewhat copied from TaskCompiler, which uses + // similar logic to get the default location for the target table in a CTAS query + // don't think will work if a custom location is specified + // works because you can't specify a custom location when auto-creating a partition + private Path getDefaultPartitionPath(Path tablePath, Map partitionSpec) + throws MetaException { + Warehouse wh = new Warehouse(hconf); + return wh.getPartitionPath(tablePath, partitionSpec); + } + + private TaskAttemptContext createTaskAttemptContext(JobID jobID) { + return new TaskAttemptContextImpl(hconf, + new TaskAttemptID(jobID.getJtIdentifier(), jobID.getId(), TaskType.JOB_SETUP, 0, 0)); + } + + private Path getLoadFileOutputPath(MoveWork mvWork) { + return mvWork.getLoadFileWork().getTargetDir(); + } + + private Path getLoadTableOutputPath(MoveWork mvWork) throws HiveException, MetaException { + if (mvWork.getLoadTableWork().getPartitionSpec() != null && + !mvWork.getLoadTableWork().getPartitionSpec().isEmpty()) { + return getLoadPartitionOutputPath(mvWork); + } else { + // should probably replace this with Hive.getTable().getDataLocation() + return new Path(mvWork.getLoadTableWork().getTable().getProperties() + .getProperty("location")); // INSERT INTO ... VALUES (...) + } + } + + private Path getLoadPartitionOutputPath(MoveWork mvWork) throws HiveException, MetaException { + Hive db = Hive.get(); + Partition partition = db.getPartition(db.getTable(mvWork.getLoadTableWork() + .getTable().getTableName()), + mvWork.getLoadTableWork().getPartitionSpec(), false); + if (partition != null) { + return partition.getDataLocation(); + } else { + return getDefaultPartitionPath(db.getTable(mvWork.getLoadTableWork() + .getTable().getTableName()).getDataLocation(), mvWork + .getLoadTableWork().getPartitionSpec()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index d508d02ed1..7c07bc985c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -98,6 +98,10 @@ private void initialize(HiveConf hiveConf) { if (pctx.getContext().getExplainAnalyze() != null) { resolvers.add(new AnnotateRunTimeStatsOptimizer()); } + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER)) { + resolvers.add(new PathOutputCommitterResolver()); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 0a76ffa28b..b13bb3ae80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.PathOutputCommitterResolver; import org.apache.hadoop.hive.ql.optimizer.physical.SparkCrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.SparkDynamicPartitionPruningResolver; import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; @@ -613,6 +614,12 @@ protected void optimizeTaskPlan(List> rootTasks, Pa new AnnotateRunTimeStatsOptimizer().resolve(physicalCtx); } + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER)) { + new PathOutputCommitterResolver().resolve(physicalCtx); + } else { + LOG.debug("Skipping S3A commit optimizer"); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE); return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 1d054688ee..8323fa150f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -28,6 +29,9 @@ import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; /** * FileSinkDesc. @@ -63,6 +67,8 @@ private DynamicPartitionCtx dpCtx; private String staticSpec; // static partition spec ends with a '/' private boolean gatherStats; + private String targetDirName; + private boolean hasOutputCommitter; // Consider a query like: // insert overwrite table T3 select ... from T1 join T2 on T1.key = T2.key; @@ -583,6 +589,14 @@ public boolean getInsertOverwrite() { return isInsertOverwrite; } + public String getTargetDirName() { + return this.targetDirName; + } + + public void setTargetDirName(String targetDirName) { + this.targetDirName = targetDirName; + } + @Override public boolean isSame(OperatorDesc other) { if (getClass().getName().equals(other.getClass().getName())) { @@ -601,4 +615,11 @@ public boolean isSame(OperatorDesc other) { return false; } + public void setHasOutputCommitter(boolean hasOutputCommitter) { + this.hasOutputCommitter = hasOutputCommitter; + } + + public boolean getHasOutputCommitter() { + return this.hasOutputCommitter; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9a1e3a1af5..aabb1bef0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -38,6 +38,7 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + private PathOutputCommitterWork pathOutputCommitterWork; private boolean checkFileFormat; private boolean srcLocal; @@ -153,5 +154,13 @@ public boolean isSrcLocal() { public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - + + public PathOutputCommitterWork getPathOutputCommitterWork() { + return this.pathOutputCommitterWork; + } + + public void setPathOutputCommitterWork( + PathOutputCommitterWork pathOutputCommitterWork) { + this.pathOutputCommitterWork = pathOutputCommitterWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java new file mode 100644 index 0000000000..4caed55848 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; + +import java.io.IOException; +import java.io.Serializable; + + +@Explain(displayName = "Path Output Committer Setup Work", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED}) +public class PathOutputCommitterWork implements Serializable { + + private static final long serialVersionUID = -6333040835478371176L; + + private String outputPath; + private transient JobContext jobContext; + private transient TaskAttemptContext taskAttemptContext; + + public PathOutputCommitterWork(String outputPath, JobContext jobContext, + TaskAttemptContext taskAttemptContext) { + this.outputPath = outputPath; + this.jobContext = jobContext; + this.taskAttemptContext = taskAttemptContext; + } + + @Explain(displayName = "Path Output Committer Factory") + public Class getPathOutputCommitterClass() { + return PathOutputCommitterFactory.getCommitterFactory(new Path(this.outputPath), this.jobContext + .getConfiguration()).getClass(); + } + + @Explain(displayName = "Path Output Path") + public String getOutputPath() { + return this.outputPath; + } + + public JobContext getJobContext() { + return this.jobContext; + } + + public PathOutputCommitter createPathOutputCommitter() throws IOException { + return PathOutputCommitterFactory.createCommitter(new Path(this.outputPath), + this.taskAttemptContext); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index e108684660..6fbf19085c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, false, null, 0, false); + true, false, false, false, null, 0, false, new HiveDataCommitter()); i++; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java index a20a2ae3ce..f5a279b2e7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.HiveDataCommitter; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.BeforeClass; import org.junit.Rule; @@ -83,7 +84,8 @@ public void testRenameNewFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false,null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, + false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -107,7 +109,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -127,7 +129,7 @@ public void testRenameExistingFilesOnSameFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -158,7 +160,7 @@ public void testCopyNewFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -185,7 +187,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -205,7 +207,7 @@ public void testCopyExistingFilesOnDifferentFileSystem() throws IOException { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); + new HiveDataCommitter().copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false);