commit f672c7c943d395d550d965d91ee0eb9e75bfb328 Author: Sahil Takiar Date: Thu Apr 26 13:40:59 2018 -0500 HIVE-16295: Add support for using Hadoop's S3A OutputCommitter diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f137d6c920..e7570b9386 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4337,7 +4337,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This parameter enables a number of optimizations when running on blobstores:\n" + "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" + - "See HIVE-15121 for details."); + "See HIVE-15121 for details."), + + HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER("hive.blobstore.use.output-committer", false, "Whether to " + + "use a custom PathOutputCommitter to commit data. For all the URIs specified in " + + "hive.blobstore.supported.schemes, Hive will honor the config " + + "mapreduce.outputcommitter.factory.scheme.[uri-scheme]. This overrides the behavior " + + "described in hive.blobstore.optimizations.enabled. See HIVE-16295 for details."); public final String varname; public final String altName; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 9b50fd4f30..76e32f4965 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -107,7 +107,7 @@ protected void setUp() { db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, - LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false); + LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, true); i++; } diff --git a/ql/pom.xml b/ql/pom.xml index 867a38aa68..826e167d0f 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -221,6 +221,11 @@ ${hadoop.version} true + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + org.apache.hadoop hadoop-yarn-registry diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 01a5b4c9c3..302e5b8b8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -18,20 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; - -import java.io.IOException; -import java.io.Serializable; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -82,17 +68,21 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; -import org.apache.hive.common.util.Murmur3; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -103,6 +93,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; + /** * File Sink operator implementation. **/ @@ -146,6 +137,8 @@ private transient boolean isInsertOverwrite; private transient String counterGroup; private transient BiFunction hashFunc; + private transient PathOutputCommitter pathOutputCommitter; + private TaskAttemptContext taskAttemptContext; /** * Counters. */ @@ -247,7 +240,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf); } - if(outPaths[idx] != null && fs.exists(outPaths[idx])) { + if(pathOutputCommitter == null && outPaths[idx] != null && fs.exists(outPaths[idx])) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); @@ -271,6 +264,13 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } } + if (pathOutputCommitter != null && outPaths[idx] != null && + outPaths[idx].getFileSystem(hconf).exists(outPaths[idx])) { + if (pathOutputCommitter.needsTaskCommit(taskAttemptContext)) { + pathOutputCommitter.commitTask(taskAttemptContext); + } + } + updateProgress(); } @@ -292,7 +292,7 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi } public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable, - boolean isSkewedStoredAsSubDirectories) { + boolean isSkewedStoredAsSubDirectories) throws IOException { if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); String taskWithExt = extension == null ? taskId : taskId + extension; @@ -302,7 +302,11 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } else { finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt); } - outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId)); + if (pathOutputCommitter != null) { + outPaths[filesIdx] = getPathOutputCommitterPath(taskId); + } else { + outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), Utilities.toTempPath(taskId)); + } } else { String taskIdPath = taskId; if (conf.isMerge()) { @@ -429,7 +433,7 @@ private void initializeSpecPath() { // 'Parent' boolean isLinked = conf.isLinkedFileSink(); if (!isLinked) { - // Simple case - no union. + // Simple case - no union. specPath = conf.getDirName(); unionPath = null; } else { @@ -500,6 +504,13 @@ protected void initializeOp(Configuration hconf) throws HiveException { destTablePath = conf.getDestPath(); isInsertOverwrite = conf.getInsertOverwrite(); counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP); + + if (conf.getHasOutputCommitter()) { + taskAttemptContext = createTaskAttemptContext(); + pathOutputCommitter = createPathOutputCommitter(); + pathOutputCommitter.setupTask(taskAttemptContext); + } + if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -1569,4 +1580,22 @@ private boolean isNativeTable() { return !conf.getTableInfo().isNonNative(); } + private PathOutputCommitter createPathOutputCommitter() throws IOException { + return PathOutputCommitterFactory.createCommitter(new Path(conf.getTargetDirName()), + taskAttemptContext); + } + + private TaskAttemptContextImpl createTaskAttemptContext() { + TaskAttemptID origId = MapredContext.get().getTaskAttemptID(); + + TaskAttemptID taskAttemptID = new TaskAttemptID(org.apache.commons.lang.StringUtils.EMPTY, 0, + origId.getTaskType(), origId.getTaskID().getId(), origId.getId()); + + return new TaskAttemptContextImpl(hconf, taskAttemptID); + } + + private Path getPathOutputCommitterPath(String taskId) throws IOException { + return new Path(pathOutputCommitter.getWorkPath(), + taskId + "-" + hconf.get(ConfVars.HIVEQUERYID.varname)); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index 09cbf32f9c..683db2a250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,6 +74,7 @@ public static void close() { private final List udfs; private Reporter reporter; + private TaskAttemptID taskAttemptID; protected MapredContext(boolean isMap, JobConf jobConf) { this.isMap = isMap; @@ -105,6 +107,14 @@ public void setReporter(Reporter reporter) { this.reporter = reporter; } + public TaskAttemptID getTaskAttemptID() { + return this.taskAttemptID; + } + + public void setTaskAttemptID(TaskAttemptID taskAttemptID) { + this.taskAttemptID = taskAttemptID; + } + private void registerCloseable(Closeable closeable) { udfs.add(closeable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index dbda5fdef4..1d074e4f54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -132,18 +132,20 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, tgtFs); } - Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { - try { - if (deletePath != null) { - tgtFs.delete(deletePath, true); + if (work.getPathOutputCommitterWork() == null) { + Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { + try { + if (deletePath != null) { + tgtFs.delete(deletePath, true); + } + } catch (IOException e) { + LOG.info("Unable to delete the path created for facilitating rename: {}", + deletePath); } - } catch (IOException e) { - LOG.info("Unable to delete the path created for facilitating rename: {}", - deletePath); + throw new HiveException("Unable to rename: " + sourcePath + + " to: " + targetPath); } - throw new HiveException("Unable to rename: " + sourcePath - + " to: " + targetPath); } } else if (!tgtFs.mkdirs(targetPath)) { throw new HiveException("Unable to make directory: " + targetPath); @@ -282,6 +284,11 @@ public int execute(DriverContext driverContext) { } Hive db = getHive(); + if (work.getPathOutputCommitterWork() != null) { + work.getPathOutputCommitterWork().createPathOutputCommitter().commitJob( + work.getPathOutputCommitterWork().getJobContext()); + } + // Do any hive related operations like moving tables and files // to appropriate locations LoadFileDesc lfd = work.getLoadFileWork(); @@ -372,7 +379,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), work.getPathOutputCommitterWork() == null); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -474,7 +481,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), work.getPathOutputCommitterWork() == null); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); // See the comment inside updatePartitionBucketSortColumns. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java new file mode 100644 index 0000000000..dc1c62bf3d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PathOutputCommitterSetupTask extends Task { + + private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterSetupTask.class); + + private static final long serialVersionUID = -8867710739987754989L; + + @Override + protected int execute(DriverContext driverContext) { + try { + LOG.info("Running setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName()); + work.createPathOutputCommitter().setupJob(getWork().getJobContext()); + } catch (Exception e) { + LOG.error("Failed run setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName(), e); + setException(e); + return 1; + } + return 0; + } + + @Override + public StageType getType() { + return null; + } + + @Override + public String getName() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 2da6b0f1fe..41678ff7ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -112,6 +113,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); + taskvec.add(new TaskTuple<>(PathOutputCommitterWork.class, PathOutputCommitterSetupTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 99b33a3aad..1a60f3e46f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +74,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - try { @@ -110,6 +113,7 @@ public void configure(JobConf job) { execContext.setLocalWork(localWork); MapredContext.init(true, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); mo.passExecContext(execContext); mo.initializeLocalWork(jc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 829006d375..c0b9e34a87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -90,6 +91,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -141,6 +144,7 @@ public void configure(JobConf job) { } MapredContext.init(false, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); // initialize reduce operator tree try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f6608eb827..0c9b686730 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1647,7 +1647,7 @@ public Database getDatabaseCurrent() throws HiveException { public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, - int stmtId, boolean isInsertOverwrite) throws HiveException { + int stmtId, boolean isInsertOverwrite, boolean commitData) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -1736,14 +1736,18 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (!isTxnTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) - boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + if (commitData) { + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + } } else { - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable); + if (commitData) { + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, + tbl.getNumBuckets() > 0, isFullAcidTable); + } } } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); @@ -2169,7 +2173,7 @@ public Void call() throws Exception { // load the partition Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType, true, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId, - isInsertOverwrite); + isInsertOverwrite, true); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { @@ -2263,7 +2267,7 @@ public Void call() throws Exception { */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, - Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + Long writeId, int stmtId, boolean isInsertOverwrite, boolean commitData) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -2308,10 +2312,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); } else { try { - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, - tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); + if (commitData) { + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, + tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); + } } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java new file mode 100644 index 0000000000..a91d285f3d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PathOutputCommitterResolver.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; + +import org.apache.hadoop.hive.common.BlobStorageUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PathOutputCommitterSetupTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.stream.Collectors; + + +public class PathOutputCommitterResolver implements PhysicalPlanResolver { + + private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterResolver.class); + + // Useful for testing + private static final String HIVE_BLOBSTORE_COMMIT_DISABLE_EXPLAIN = "hive.blobstore.commit." + + "disable.explain"; + + private final Map, Collection> taskToFsOps = new HashMap<>(); + private final List> mvTasks = new ArrayList<>(); + private HiveConf hconf; + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + this.hconf = pctx.getConf(); + + // Collect all MoveTasks and FSOPs + TaskGraphWalker graphWalker = new TaskGraphWalker(new S3ACommitDispatcher()); + List rootTasks = new ArrayList<>(pctx.getRootTasks()); + graphWalker.startWalking(rootTasks, null); + + // Find MoveTasks with no child MoveTask + List> sinkMoveTasks = mvTasks.stream() + .filter(mvTask -> !containsChildTask(mvTask.getChildTasks(), MoveTask.class)) + .collect(Collectors.toList()); + + // Iterate through each FSOP + for (Map.Entry, Collection> entry : taskToFsOps.entrySet()) { + for (FileSinkOperator fsOp : entry.getValue()) { + try { + processFsOp(entry.getKey(), fsOp, sinkMoveTasks); + } catch (HiveException | MetaException e) { + throw new SemanticException(e); + } + } + } + + return pctx; + } + + private boolean containsChildTask(List> mvTasks, Class + taskClass) { + if (mvTasks == null) { + return false; + } + boolean containsChildTask = false; + for (Task mvTask : mvTasks) { + if (taskClass.isInstance(mvTask)) { + return true; + } + containsChildTask = containsChildTask(mvTask.getChildTasks(), taskClass); + } + return containsChildTask; + } + + private class S3ACommitDispatcher implements Dispatcher { + + @Override + public Object dispatch(Node nd, Stack stack, + Object... nodeOutputs) throws SemanticException { + + Task task = (Task) nd; + Collection fsOps = getAllFsOps(task); + if (!fsOps.isEmpty()) { + taskToFsOps.put((Task) nd, fsOps); + } + if (nd instanceof MoveTask) { + mvTasks.add((MoveTask) nd); + } + return null; + } + } + + private Collection getAllFsOps(Task task) { + Collection> fsOps = new ArrayList<>(); + if (task instanceof MapRedTask) { + fsOps.addAll(((MapRedTask) task).getWork().getAllOperators()); + } else if (task instanceof SparkTask) { + for (BaseWork work : ((SparkTask) task).getWork().getAllWork()) { + fsOps.addAll(work.getAllOperators()); + } + } + return fsOps.stream() + .filter(FileSinkOperator.class::isInstance) + .map(FileSinkOperator.class::cast) + .collect(Collectors.toList()); + } + + private void processFsOp(Task task, FileSinkOperator fsOp, + List> sinkMoveTasks) throws HiveException, MetaException { + FileSinkDesc fileSinkDesc = fsOp.getConf(); + + // Get the MoveTask that will process the output of the fsOp + Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(sinkMoveTasks, + fileSinkDesc.getFinalDirName(), fileSinkDesc.isMmTable()); + + if (mvTask != null) { + + MoveWork mvWork = mvTask.getWork(); + + // Don't process the mvTask if it requires committing data for DP queries + if (mvWork.getLoadMultiFilesWork() != null && (mvWork.getLoadTableWork() == null || mvWork + .getLoadTableWork().getDPCtx() == null)) { + + // The final output path we will commit data to + Path outputPath = null; + + // Instead of picking between load table work and load file work, throw an exception if + // they are both set (this should never happen) + if (mvWork.getLoadTableWork() != null && mvWork.getLoadFileWork() != null) { + throw new IllegalArgumentException("Load Table Work and Load File Work cannot both be " + + "set"); + } + + // If there is a load file work, get its output path + if (mvWork.getLoadFileWork() != null) { + outputPath = getLoadFileOutputPath(mvWork); + } + + // If there is a load table work, get is output path + if (mvTask.getWork().getLoadTableWork() != null) { + outputPath = getLoadTableOutputPath(mvWork); + } + if (outputPath != null) { + if (BlobStorageUtils.isBlobStoragePath(hconf, outputPath)) { + + if ("s3a".equals(outputPath.toUri().getScheme())) { + setupS3aOutputCommitter(mvWork); + } + + PathOutputCommitterWork setupWork = createPathOutputCommitterWork(outputPath); + mvWork.setPathOutputCommitterWork(setupWork); + + fileSinkDesc.setHasOutputCommitter(true); + fileSinkDesc.setTargetDirName(outputPath.toString()); + + LOG.info("Using Output Committer " + setupWork.getPathOutputCommitterClass() + + " for MoveTask: " + mvTask + ", FileSinkOperator: " + fsOp + " and output " + + "path: " + outputPath); + + if (hconf.getBoolean(HIVE_BLOBSTORE_COMMIT_DISABLE_EXPLAIN, false)) { + PathOutputCommitterSetupTask setupTask = new PathOutputCommitterSetupTask(); + setupTask.setWork(setupWork); + setupTask.executeTask(null); + } else { + task.addDependentTask(TaskFactory.get(setupWork)); + } + } + } + } + } + } + + private PathOutputCommitterWork createPathOutputCommitterWork(Path outputPath) { + JobID jobID = new JobID(); + TaskAttemptContext taskAttemptContext = createTaskAttemptContext(jobID); + JobContext jobContext = new JobContextImpl(hconf, jobID); + + return new PathOutputCommitterWork(outputPath.toString(), + jobContext, taskAttemptContext); + } + + private void setupS3aOutputCommitter(MoveWork mvWork) { + if (mvWork.getLoadTableWork() != null && mvWork.getLoadTableWork() + .isInsertOverwrite()) { + hconf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CommitConstants. + CONFLICT_MODE_REPLACE); + } else { + hconf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CommitConstants + .CONFLICT_MODE_APPEND); + } + + // We set this to false because its better for Hive to have more control over the file + // names given the that the JobID we are creating has no meaning + hconf.setBoolean(CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, false); + } + + // Not sure if this is the best way to do this, somewhat copied from TaskCompiler, which uses + // similar logic to get the default location for the target table in a CTAS query + // don't think will work if a custom location is specified + // works because you can't specify a custom location when auto-creating a partition + private Path getDefaultPartitionPath(Path tablePath, Map partitionSpec) + throws MetaException { + Warehouse wh = new Warehouse(hconf); + return wh.getPartitionPath(tablePath, partitionSpec); + } + + private TaskAttemptContext createTaskAttemptContext(JobID jobID) { + return new TaskAttemptContextImpl(hconf, + new TaskAttemptID(jobID.getJtIdentifier(), jobID.getId(), TaskType.JOB_SETUP, 0, 0)); + } + + private Path getLoadFileOutputPath(MoveWork mvWork) { + return mvWork.getLoadFileWork().getTargetDir(); + } + + private Path getLoadTableOutputPath(MoveWork mvWork) throws HiveException, MetaException { + if (mvWork.getLoadTableWork().getPartitionSpec() != null && + !mvWork.getLoadTableWork().getPartitionSpec().isEmpty()) { + return getLoadPartitionOutputPath(mvWork); + } else { + // should probably replace this with Hive.getTable().getDataLocation() + return new Path(mvWork.getLoadTableWork().getTable().getProperties() + .getProperty("location")); // INSERT INTO ... VALUES (...) + } + } + + private Path getLoadPartitionOutputPath(MoveWork mvWork) throws HiveException, MetaException { + Hive db = Hive.get(); + Partition partition = db.getPartition(db.getTable(mvWork.getLoadTableWork() + .getTable().getTableName()), + mvWork.getLoadTableWork().getPartitionSpec(), false); + if (partition != null) { + return partition.getDataLocation(); + } else { + return getDefaultPartitionPath(db.getTable(mvWork.getLoadTableWork() + .getTable().getTableName()).getDataLocation(), mvWork + .getLoadTableWork().getPartitionSpec()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index d508d02ed1..7c07bc985c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -98,6 +98,10 @@ private void initialize(HiveConf hiveConf) { if (pctx.getContext().getExplainAnalyze() != null) { resolvers.add(new AnnotateRunTimeStatsOptimizer()); } + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER)) { + resolvers.add(new PathOutputCommitterResolver()); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 0a76ffa28b..b13bb3ae80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.PathOutputCommitterResolver; import org.apache.hadoop.hive.ql.optimizer.physical.SparkCrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.SparkDynamicPartitionPruningResolver; import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; @@ -613,6 +614,12 @@ protected void optimizeTaskPlan(List> rootTasks, Pa new AnnotateRunTimeStatsOptimizer().resolve(physicalCtx); } + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_OUTPUTCOMMITTER)) { + new PathOutputCommitterResolver().resolve(physicalCtx); + } else { + LOG.debug("Skipping S3A commit optimizer"); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE); return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index fcb6de7d08..3cf0de6a34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -28,6 +29,9 @@ import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; /** * FileSinkDesc. @@ -63,6 +67,8 @@ private DynamicPartitionCtx dpCtx; private String staticSpec; // static partition spec ends with a '/' private boolean gatherStats; + private String targetDirName; + private boolean hasOutputCommitter; // Consider a query like: // insert overwrite table T3 select ... from T1 join T2 on T1.key = T2.key; @@ -583,6 +589,14 @@ public boolean getInsertOverwrite() { return isInsertOverwrite; } + public String getTargetDirName() { + return this.targetDirName; + } + + public void setTargetDirName(String targetDirName) { + this.targetDirName = targetDirName; + } + @Override public boolean isSame(OperatorDesc other) { if (getClass().getName().equals(other.getClass().getName())) { @@ -601,4 +615,11 @@ public boolean isSame(OperatorDesc other) { return false; } + public void setHasOutputCommitter(boolean hasOutputCommitter) { + this.hasOutputCommitter = hasOutputCommitter; + } + + public boolean getHasOutputCommitter() { + return this.hasOutputCommitter; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9a1e3a1af5..aabb1bef0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -38,6 +38,7 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + private PathOutputCommitterWork pathOutputCommitterWork; private boolean checkFileFormat; private boolean srcLocal; @@ -153,5 +154,13 @@ public boolean isSrcLocal() { public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - + + public PathOutputCommitterWork getPathOutputCommitterWork() { + return this.pathOutputCommitterWork; + } + + public void setPathOutputCommitterWork( + PathOutputCommitterWork pathOutputCommitterWork) { + this.pathOutputCommitterWork = pathOutputCommitterWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java new file mode 100644 index 0000000000..4caed55848 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory; + +import java.io.IOException; +import java.io.Serializable; + + +@Explain(displayName = "Path Output Committer Setup Work", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED}) +public class PathOutputCommitterWork implements Serializable { + + private static final long serialVersionUID = -6333040835478371176L; + + private String outputPath; + private transient JobContext jobContext; + private transient TaskAttemptContext taskAttemptContext; + + public PathOutputCommitterWork(String outputPath, JobContext jobContext, + TaskAttemptContext taskAttemptContext) { + this.outputPath = outputPath; + this.jobContext = jobContext; + this.taskAttemptContext = taskAttemptContext; + } + + @Explain(displayName = "Path Output Committer Factory") + public Class getPathOutputCommitterClass() { + return PathOutputCommitterFactory.getCommitterFactory(new Path(this.outputPath), this.jobContext + .getConfiguration()).getClass(); + } + + @Explain(displayName = "Path Output Path") + public String getOutputPath() { + return this.outputPath; + } + + public JobContext getJobContext() { + return this.jobContext; + } + + public PathOutputCommitter createPathOutputCommitter() throws IOException { + return PathOutputCommitterFactory.createCommitter(new Path(this.outputPath), + this.taskAttemptContext); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index e108684660..dea127afb2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, false, null, 0, false); + true, false, false, false, null, 0, false, true); i++; }