commit a73ef2c223fbff18c1802319e48819d8aab0c630 Author: Sahil Takiar Date: Sat Apr 14 15:25:45 2018 -0700 HIVE-16295: Add support for using Hadoop's S3A OutputCommitter diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f40c60606c..b78509cc25 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1829,7 +1829,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "For testing only. Will make ACID subsystem write RecordIdentifier.bucketId in specified\n" + "format", false), - HIVEMERGEMAPFILES("hive.merge.mapfiles", true, + HIVEMERGEMAPFILES("hive.merge.mapfiles", false, "Merge small files at the end of a map-only job"), HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false, "Merge small files at the end of a map-reduce job"), @@ -4331,7 +4331,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This parameter enables a number of optimizations when running on blobstores:\n" + "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" + - "See HIVE-15121 for details."); + "See HIVE-15121 for details."), + + HIVE_BLOBSTORE_S3A_COMMIT("hive.blobstore.s3a.commit", true, "Whether to use the S3A " + + "Output Committer (HADOOP-13786) when writing data to a table. The S3A Output " + + "Committer has the advantage that it requires writing data to S3 once, rather than " + + "writing it multiple times (because S3 doesn't support renames). Hive does not " + + "support all of the configuration parameters of the S3A Output Committer. This " + + "overrides the behavior described in hive.blobstore.optimizations.enabled. See " + + "HIVE-16295 for details."); public final String varname; public final String altName; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 9b50fd4f30..76e32f4965 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -107,7 +107,7 @@ protected void setUp() { db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, - LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false); + LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false, true); i++; } diff --git a/ql/pom.xml b/ql/pom.xml index 165610f8f6..37ee0d368f 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -221,6 +221,11 @@ ${hadoop.version} true + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + org.apache.hadoop hadoop-yarn-registry diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index c084fa054c..2622713188 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -22,7 +22,8 @@ import java.io.IOException; import java.io.Serializable; -import java.io.StringWriter; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,11 +32,14 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.UUID; + import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -83,25 +87,21 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE; - /** * File Sink operator implementation. **/ @@ -144,6 +144,8 @@ private transient Path destTablePath; private transient boolean isInsertOverwrite; private transient String counterGroup; + private transient PathOutputCommitter pathOutputCommitter; + private TaskAttemptContext taskAttemptContext; /** * Counters. */ @@ -263,7 +265,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf); } - if(outPaths[idx] != null && fs.exists(outPaths[idx])) { + if(pathOutputCommitter == null && outPaths[idx] != null && fs.exists(outPaths[idx])) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); @@ -287,6 +289,13 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } } + if (pathOutputCommitter != null && outPaths[idx] != null && + outPaths[idx].getFileSystem(hconf).exists(outPaths[idx])) { + if (pathOutputCommitter.needsTaskCommit(taskAttemptContext)) { + pathOutputCommitter.commitTask(taskAttemptContext); + } + } + updateProgress(); } @@ -307,17 +316,21 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi } } - public void configureDynPartPath(String dirName, String childSpecPathDynLinkedPartitions) { + public void configureDynPartPath(String dirName, String childSpecPathDynLinkedPartitions) throws IOException { dirName = (childSpecPathDynLinkedPartitions == null) ? dirName : dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions; - tmpPath = new Path(tmpPath, dirName); - if (taskOutputTempPath != null) { - taskOutputTempPath = new Path(taskOutputTempPath, dirName); + if (pathOutputCommitter == null) { + tmpPath = new Path(tmpPath, dirName); + if (taskOutputTempPath != null) { + taskOutputTempPath = new Path(taskOutputTempPath, dirName); + } + } else { + taskOutputTempPath = new Path(pathOutputCommitter.getWorkPath(), dirName); } } public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable, - boolean isSkewedStoredAsSubDirectories) { + boolean isSkewedStoredAsSubDirectories) throws IOException { if (isNativeTable) { String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); if (!isMmTable) { @@ -326,7 +339,15 @@ public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeT } else { finalPaths[filesIdx] = getFinalPath(taskId, tmpPath, extension); } - outPaths[filesIdx] = getTaskOutPath(taskId); + if (pathOutputCommitter != null) { + if (!bDynParts) { + outPaths[filesIdx] = getPathOutputCommitterPath(taskId); + } else { + outPaths[filesIdx] = new Path(this.taskOutputTempPath, taskId); + } + } else { + outPaths[filesIdx] = getTaskOutPath(taskId); + } } else { String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), writeId, writeId, stmtId); if (unionPath != null) { @@ -511,6 +532,13 @@ protected void initializeOp(Configuration hconf) throws HiveException { destTablePath = conf.getDestPath(); isInsertOverwrite = conf.getInsertOverwrite(); counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP); + + if (conf.getPathOutputCommitterClass() != null) { + taskAttemptContext = createTaskAttemptContext(); + pathOutputCommitter = createPathOutputCommitter(); + pathOutputCommitter.setupTask(taskAttemptContext); + } + if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -1080,7 +1108,11 @@ protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveExceptio */ private FSPaths createNewPaths(String dirName) throws HiveException { FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); - fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? unionPath : null); + try { + fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? unionPath : null); + } catch (IOException e) { + throw new HiveException(e); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("creating new paths " + System.identityHashCode(fsp2) + " for " + dirName + ", childSpec " + unionPath + ": tmpPath " + fsp2.getTmpPath() @@ -1273,6 +1305,9 @@ public void closeOp(boolean abort) throws HiveException { } } List commitPaths = new ArrayList<>(); + for (FSPaths fsp : valToPaths.values()) { + fsp.closeWriters(abort); + } for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); // before closing the operator check if statistics gathering is requested @@ -1311,6 +1346,7 @@ public void closeOp(boolean abort) throws HiveException { fsp.commit(fs, commitPaths); } } + // Could do the commit here instead of inside fsp.commit if (conf.isMmTable()) { Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); @@ -1578,4 +1614,43 @@ private Configuration unsetNestedColumnPaths(Configuration conf) { private boolean isNativeTable() { return !conf.getTableInfo().isNonNative(); } + + private PathOutputCommitter createPathOutputCommitter() { + Constructor constructor; + try { + constructor = conf.getPathOutputCommitterClass().getConstructor(Path.class, + TaskAttemptContext.class); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + + try { + return constructor.newInstance(new Path(conf.getTargetDirName()), taskAttemptContext); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + private TaskAttemptContextImpl createTaskAttemptContext() { + TaskAttemptID origId = MapredContext.get().getTaskAttemptID(); + + TaskAttemptID taskAttemptID = new TaskAttemptID(conf.getJtIdentifier(), conf.getJobId(), + origId.getTaskType(), origId.getTaskID().getId(), origId.getId()); + + return new TaskAttemptContextImpl(hconf, taskAttemptID); + } + + // Copied from Hive.java, should probably be re-factored, not sure if handling of overwrites + // is necessary or not; not sure if we need to handle 000000_0.gz -> 000000_0.gz_copy_1 + private Path getPathOutputCommitterPath(String taskId) throws IOException { + Path workDirPath = pathOutputCommitter.getWorkPath(); + Path workFilePath = new Path(workDirPath, taskId); + // Nuking this for now because with S3 eventual consistency it can cause Hive to + // inadvertantly overwrite data +// FileSystem workFs = workFilePath.getFileSystem(hconf); +// for (int counter = 1; workFs.exists(workFilePath); counter++) { +// workFilePath = new Path(workDirPath, taskId + (Utilities.COPY_KEYWORD + counter)); +// } + return workFilePath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index 09cbf32f9c..683db2a250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -73,6 +74,7 @@ public static void close() { private final List udfs; private Reporter reporter; + private TaskAttemptID taskAttemptID; protected MapredContext(boolean isMap, JobConf jobConf) { this.isMap = isMap; @@ -105,6 +107,14 @@ public void setReporter(Reporter reporter) { this.reporter = reporter; } + public TaskAttemptID getTaskAttemptID() { + return this.taskAttemptID; + } + + public void setTaskAttemptID(TaskAttemptID taskAttemptID) { + this.taskAttemptID = taskAttemptID; + } + private void registerCloseable(Closeable closeable) { udfs.add(closeable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index dbda5fdef4..f3daa796cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -132,18 +132,20 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, tgtFs); } - Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); - if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { - try { - if (deletePath != null) { - tgtFs.delete(deletePath, true); + if (work.getPathOutputCommitterWork() == null) { + Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); + if (!Hive.moveFile(conf, sourcePath, targetPath, true, false)) { + try { + if (deletePath != null) { + tgtFs.delete(deletePath, true); + } + } catch (IOException e) { + LOG.info("Unable to delete the path created for facilitating rename: {}", + deletePath); } - } catch (IOException e) { - LOG.info("Unable to delete the path created for facilitating rename: {}", - deletePath); + throw new HiveException("Unable to rename: " + sourcePath + + " to: " + targetPath); } - throw new HiveException("Unable to rename: " + sourcePath - + " to: " + targetPath); } } else if (!tgtFs.mkdirs(targetPath)) { throw new HiveException("Unable to make directory: " + targetPath); @@ -282,6 +284,11 @@ public int execute(DriverContext driverContext) { } Hive db = getHive(); + if (work.getPathOutputCommitterWork() != null) { + work.getPathOutputCommitterWork().createPathOutputCommitter().commitJob( + work.getPathOutputCommitterWork().getJobContext()); + } + // Do any hive related operations like moving tables and files // to appropriate locations LoadFileDesc lfd = work.getLoadFileWork(); @@ -372,7 +379,7 @@ public int execute(DriverContext driverContext) { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), work.getPathOutputCommitterWork() == null); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -474,7 +481,7 @@ private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), hasFollowingStatsTask(), - tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite()); + tbd.getWriteId(), tbd.getStmtId(), tbd.isInsertOverwrite(), work.getPathOutputCommitterWork() == null); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); // See the comment inside updatePartitionBucketSortColumns. @@ -509,7 +516,7 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, // want to isolate any potential issue it may introduce. Map, Partition> dp = db.loadDynamicPartitions( - tbd.getSourcePath(), + work.getPathOutputCommitterWork() == null ? tbd.getSourcePath() : new Path(work.getPathOutputCommitterWork().getOutputPath()), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getLoadFileType(), @@ -521,7 +528,8 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, tbd.getStmtId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType(), - tbd.isInsertOverwrite()); + tbd.isInsertOverwrite(), + work.getPathOutputCommitterWork() == null); // publish DP columns to its subscribers if (dps != null && dps.size() > 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java new file mode 100644 index 0000000000..175491b993 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PathOutputCommitterSetupTask.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PathOutputCommitterSetupTask extends Task { + + private static final Logger LOG = LoggerFactory.getLogger(PathOutputCommitterSetupTask.class); + + private static final long serialVersionUID = -8867710739987754989L; + + @Override + protected int execute(DriverContext driverContext) { + try { + LOG.info("Running setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName()); + work.createPathOutputCommitter().setupJob(getWork().getJobContext()); + } catch (Exception e) { + LOG.error("Failed run setupJob for Path Output Committer " + + work.getPathOutputCommitterClass().getName(), e); + setException(e); + return 1; + } + return 0; + } + + @Override + public StageType getType() { + return null; + } + + @Override + public String getName() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 2da6b0f1fe..41678ff7ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -112,6 +113,7 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); + taskvec.add(new TaskTuple<>(PathOutputCommitterWork.class, PathOutputCommitterSetupTask.class)); } private static ThreadLocal tid = new ThreadLocal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 99b33a3aad..1a60f3e46f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -73,6 +74,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - try { @@ -110,6 +113,7 @@ public void configure(JobConf job) { execContext.setLocalWork(localWork); MapredContext.init(true, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); mo.passExecContext(execContext); mo.initializeLocalWork(jc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 829006d375..c0b9e34a87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.mapred.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -90,6 +91,8 @@ @Override public void configure(JobConf job) { + TaskAttemptID taskAttemptID = TaskAttemptID.forName(job.get("mapred.task.id")); + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; @@ -141,6 +144,7 @@ public void configure(JobConf job) { } MapredContext.init(false, new JobConf(jc)); + MapredContext.get().setTaskAttemptID(taskAttemptID); // initialize reduce operator tree try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4661881301..29b0f33907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1647,7 +1647,7 @@ public Database getDatabaseCurrent() throws HiveException { public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, - int stmtId, boolean isInsertOverwrite) throws HiveException { + int stmtId, boolean isInsertOverwrite, boolean commitData) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters()); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -1736,14 +1736,18 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par if (!isTxnTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) - boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); - replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + if (commitData) { + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + } } else { - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable); + if (commitData) { + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, + tbl.getNumBuckets() > 0, isFullAcidTable); + } } } perfLogger.PerfLogEnd("MoveTask", "FileMoves"); @@ -2119,11 +2123,11 @@ private void constructOneLBLocationMap(FileStatus fSta, * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ - public Map, Partition> loadDynamicPartitions(final Path loadPath, + public Map, Partition> loadDynamicPartitions(Path loadPath, final String tableName, final Map partSpec, final LoadFileType loadFileType, final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId, final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, - boolean isInsertOverwrite) throws HiveException { + boolean isInsertOverwrite, boolean commitData) throws HiveException { final Map, Partition> partitionsMap = Collections.synchronizedMap(new LinkedHashMap, Partition>()); @@ -2137,6 +2141,14 @@ private void constructOneLBLocationMap(FileStatus fSta, // Get all valid partition paths and existing partitions for them (if any) final Table tbl = getTable(tableName); +// if (!commitData) { +// loadPath = tbl.getDataLocation(); +// } + + // the issue is that this will read ALL partitions in the table, which is (1) bad because if + // there are thousands of exists partitions then thats terrible; plus it doesn't seem safe to + // call this for existing partitions + // affects both insert into and insert overwrite; as overwrite may only replace a few partitions final Set validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId, AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite); @@ -2172,7 +2184,7 @@ public Void call() throws Exception { // load the partition Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, loadFileType, true, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId, - isInsertOverwrite); + isInsertOverwrite, commitData); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { @@ -2266,7 +2278,7 @@ public Void call() throws Exception { */ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, - Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException { + Long writeId, int stmtId, boolean isInsertOverwrite, boolean commitData) throws HiveException { List newFiles = null; Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); @@ -2311,10 +2323,12 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); } else { try { - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, - loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, - tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); + if (commitData) { + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, + loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, + tbl.getNumBuckets() > 0 ? true : false, isFullAcidTable); + } } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index d508d02ed1..c349864a09 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -98,6 +98,10 @@ private void initialize(HiveConf hiveConf) { if (pctx.getContext().getExplainAnalyze() != null) { resolvers.add(new AnnotateRunTimeStatsOptimizer()); } + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_S3A_COMMIT)) { + resolvers.add(new S3ACommitOptimizer()); + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/S3ACommitOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/S3ACommitOptimizer.java new file mode 100644 index 0000000000..37a3c70093 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/S3ACommitOptimizer.java @@ -0,0 +1,248 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; + +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PathOutputCommitterSetupTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.PathOutputCommitterWork; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + + +public class S3ACommitOptimizer implements PhysicalPlanResolver { + + private static final Logger LOG = LoggerFactory.getLogger(S3ACommitOptimizer.class); + + private static final AtomicInteger JOB_ID_COUNTER = new AtomicInteger(); + private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(); + private static final AtomicInteger ID_COUNTER = new AtomicInteger(); + + private final Map, Collection> taskToFsOps = new HashMap<>(); + private final List> mvTasks = new ArrayList<>(); + private HiveConf hconf; + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + this.hconf = pctx.getConf(); + + // For now, skip the optimization if the merge files job is enabled + if (hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES)) { + return pctx; + } + + // Collect all MoveTasks and FSOPs + TaskGraphWalker graphWalker = new TaskGraphWalker(new S3ACommitDispatcher()); + List rootTasks = new ArrayList<>(pctx.getRootTasks()); + graphWalker.startWalking(rootTasks, null); + + // Iterate through each FSOP + for (Map.Entry, Collection> entry : taskToFsOps.entrySet()) { + for (FileSinkOperator fsOp : entry.getValue()) { + try { + processFsOp(entry.getKey(), fsOp); + } catch (MetaException e) { + throw new SemanticException(e); + } + } + } + + return pctx; + } + + private class S3ACommitDispatcher implements Dispatcher { + + @Override + public Object dispatch(Node nd, Stack stack, + Object... nodeOutputs) throws SemanticException { + + Task task = (Task) nd; + Collection fsOps = getAllFsOps(task); + if (!fsOps.isEmpty()) { + taskToFsOps.put((Task) nd, fsOps); + } + if (nd instanceof MoveTask) { + mvTasks.add((MoveTask) nd); + } + return null; + } + } + + private Collection getAllFsOps(Task task) { + Collection> fsOps = new ArrayList<>(); + if (task instanceof MapRedTask) { + fsOps.addAll(((MapRedTask) task).getWork().getAllOperators()); + } else if (task instanceof SparkTask) { + for (BaseWork work : ((SparkTask) task).getWork().getAllWork()) { + fsOps.addAll(work.getAllOperators()); + } + } + return fsOps.stream() + .filter(FileSinkOperator.class::isInstance) + .map(FileSinkOperator.class::cast) + .collect(Collectors.toList()); + } + + private void processFsOp(Task task, FileSinkOperator fsOp) throws MetaException { + FileSinkDesc fileSinkDesc = fsOp.getConf(); + + Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, + fileSinkDesc.getFinalDirName(), fileSinkDesc.isMmTable()); + + if (mvTask != null) { + Path outputPath = null; + if (mvTask.getWork().getLoadFileWork() != null) { +// // Short circuit DP for now +// if (mvTask.getWork().getLoadMultiFilesWork() != null) { +// return; +// } + outputPath = mvTask.getWork().getLoadFileWork().getTargetDir(); + } + if (mvTask.getWork().getLoadTableWork() != null) { +// // Short circuit DP for now +// if (mvTask.getWork().getLoadTableWork().getDPCtx() != null) { +// return; +// } + if (outputPath != null) { + throw new IllegalStateException("Load file and load table work cannot both be " + + "non-null"); + } + if (mvTask.getWork().getLoadTableWork().getPartitionSpec() != null && !mvTask.getWork() + .getLoadTableWork().getPartitionSpec().isEmpty()) { + try { + Hive db = Hive.get(); + Map nonNullPartitionSpec = mvTask.getWork().getLoadTableWork() + .getPartitionSpec().entrySet().stream().filter(entry -> entry.getValue() != + null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (nonNullPartitionSpec.size() != mvTask.getWork().getLoadTableWork() + .getPartitionSpec().size()) { + if (nonNullPartitionSpec.isEmpty()) { + outputPath = new Path(mvTask.getWork().getLoadTableWork().getTable().getProperties() + .getProperty("location")); + } else { + outputPath = getDefaultPartitionPath(db.getTable(mvTask.getWork().getLoadTableWork() + .getTable().getTableName()).getDataLocation(), nonNullPartitionSpec); + } + } else { + Partition partition = db.getPartition(db.getTable(mvTask.getWork().getLoadTableWork() + .getTable().getTableName()), + nonNullPartitionSpec, false); + if (partition != null) { + outputPath = partition.getDataLocation(); + } else { + outputPath = getDefaultPartitionPath(db.getTable(mvTask.getWork().getLoadTableWork() + .getTable().getTableName()).getDataLocation(), nonNullPartitionSpec); + } + } + } catch (HiveException e) { + throw new RuntimeException(e); + } + + } else { + // should probably replace this with Hive.getTable().getDataLocation() + outputPath = new Path(mvTask.getWork().getLoadTableWork().getTable().getProperties() + .getProperty("location")); // INSERT INTO ... VALUES (...) + } + } + if (outputPath == null) { + throw new IllegalStateException("Either load file work or load table work must be set"); + } + + if ("s3a".equals(outputPath.toUri().getScheme())) { + LOG.info("Using S3A Output Committer for MoveTask " + mvTask.getWork() + " and " + + "FileSinkOperator " + fileSinkDesc); + + Class committerClass = null; + + // Not sure if its necessary to make this DP specific + if (mvTask.getWork().getLoadTableWork().getDPCtx() != null && mvTask.getWork() + .getLoadTableWork().isInsertOverwrite()) { + hconf.set("fs.s3a.committer.staging.conflict-mode", "replace"); + committerClass = PartitionedStagingCommitter.class; + } else { + hconf.set("fs.s3a.committer.staging.conflict-mode", "append"); + committerClass = DirectoryStagingCommitter.class; + } +// hconf.set("fs.s3a.committer.staging.unique-filenames", "false"); + + JobID jobID = createJobID(); + TaskAttemptContext taskAttemptContext = createTaskAttemptContext(jobID); + JobContext jobContext = new JobContextImpl(hconf, jobID); + + PathOutputCommitterWork setupWork = new PathOutputCommitterWork( + committerClass, outputPath.toString(), jobContext, + taskAttemptContext); + PathOutputCommitterSetupTask testTask = new PathOutputCommitterSetupTask(); + testTask.setWork(setupWork); + testTask.executeTask(null); + //task.addDependentTask(TaskFactory.get(setupWork)); + mvTask.getWork().setPathOutputCommitterWork(setupWork); + + fileSinkDesc.setTargetDirName(outputPath.toString()); + fileSinkDesc.setPathOutputCommitterClass(DirectoryStagingCommitter.class); + fileSinkDesc.setJobId(jobID.getId()); + fileSinkDesc.setJtIdentifier(jobID.getJtIdentifier()); + } + } + } + + // Not sure if this is the best way to do this, somewhat copied from TaskCompiler, which uses + // similar logic to get the default location for the target table in a CTAS query + // don't think will work if a custom location is specified + // works because you can't specify a custom location when auto-creating a partition + private Path getDefaultPartitionPath(Path tablePath, Map partitionSpec) + throws MetaException { + Warehouse wh = new Warehouse(hconf); + return wh.getPartitionPath(tablePath, partitionSpec); + } + + private JobID createJobID() { + return new JobID("s3a-output-committer", JOB_ID_COUNTER.getAndIncrement()); + } + + private TaskAttemptContext createTaskAttemptContext(JobID jobID) { + return new TaskAttemptContextImpl(hconf, + new TaskAttemptID(jobID.getJtIdentifier(), jobID.getId(), TaskType.JOB_SETUP, + TASK_ID_COUNTER.getAndIncrement(), ID_COUNTER.getAndIncrement())); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 0a76ffa28b..e7cb439166 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.S3ACommitOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.SparkCrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.SparkDynamicPartitionPruningResolver; import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; @@ -613,6 +614,12 @@ protected void optimizeTaskPlan(List> rootTasks, Pa new AnnotateRunTimeStatsOptimizer().resolve(physicalCtx); } + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_BLOBSTORE_S3A_COMMIT)) { + new S3ACommitOptimizer().resolve(physicalCtx); + } else { + LOG.debug("Skipping S3A commit optimizer"); + } + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE); return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index fcb6de7d08..7443a20956 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -28,6 +29,9 @@ import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; /** * FileSinkDesc. @@ -63,6 +67,10 @@ private DynamicPartitionCtx dpCtx; private String staticSpec; // static partition spec ends with a '/' private boolean gatherStats; + private Class pathOutputCommitterClass; + private int jobId; + private String jtIdentifier; + private String targetDirName; // Consider a query like: // insert overwrite table T3 select ... from T1 join T2 on T1.key = T2.key; @@ -583,6 +591,39 @@ public boolean getInsertOverwrite() { return isInsertOverwrite; } + public Class getPathOutputCommitterClass() { + return this.pathOutputCommitterClass; + } + + public void setPathOutputCommitterClass( + Class pathOutputCommitterClass) { + this.pathOutputCommitterClass = pathOutputCommitterClass; + } + + public void setJobId(int jobId) { + this.jobId = jobId; + } + + public void setJtIdentifier(String jtIdentifier) { + this.jtIdentifier = jtIdentifier; + } + + public String getJtIdentifier() { + return jtIdentifier; + } + + public int getJobId() { + return jobId; + } + + public String getTargetDirName() { + return this.targetDirName; + } + + public void setTargetDirName(String targetDirName) { + this.targetDirName = targetDirName; + } + @Override public boolean isSame(OperatorDesc other) { if (getClass().getName().equals(other.getClass().getName())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9a1e3a1af5..aabb1bef0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -38,6 +38,7 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + private PathOutputCommitterWork pathOutputCommitterWork; private boolean checkFileFormat; private boolean srcLocal; @@ -153,5 +154,13 @@ public boolean isSrcLocal() { public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - + + public PathOutputCommitterWork getPathOutputCommitterWork() { + return this.pathOutputCommitterWork; + } + + public void setPathOutputCommitterWork( + PathOutputCommitterWork pathOutputCommitterWork) { + this.pathOutputCommitterWork = pathOutputCommitterWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java new file mode 100644 index 0000000000..78efc19718 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PathOutputCommitterWork.java @@ -0,0 +1,60 @@ +package org.apache.hadoop.hive.ql.plan; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; + +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + + +//@Explain(displayName = "Path Output Committer Setup Work", explainLevels = {Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED}) +public class PathOutputCommitterWork implements Serializable { + + private static final long serialVersionUID = -6333040835478371176L; + + private Class pathOutputCommitterClass; + private String outputPath; + private transient JobContext jobContext; + private transient TaskAttemptContext taskAttemptContext; + + public PathOutputCommitterWork(Class pathOutputCommitterClass, + String outputPath, JobContext jobContext, + TaskAttemptContext taskAttemptContext) { + this.pathOutputCommitterClass = pathOutputCommitterClass; + this.outputPath = outputPath; + this.jobContext = jobContext; + this.taskAttemptContext = taskAttemptContext; + } + +// @Explain(displayName = "Path Output Committer Class") + public Class getPathOutputCommitterClass() { + return this.pathOutputCommitterClass; + } + +// @Explain(displayName = "Path Output Path") + public String getOutputPath() { + return this.outputPath; + } + + public JobContext getJobContext() { + return this.jobContext; + } + + public PathOutputCommitter createPathOutputCommitter() { + Constructor constructor; + try { + constructor = pathOutputCommitterClass.getConstructor(Path.class, TaskAttemptContext.class); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + + try { + return constructor.newInstance(new Path(outputPath), taskAttemptContext); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index e108684660..dea127afb2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -142,7 +142,7 @@ db.createTable(src, cols, null, TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class); db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, - true, false, false, false, null, 0, false); + true, false, false, false, null, 0, false, true); i++; }