diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java index 6ca35e2..4c0fc53 100644 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -51,4 +51,8 @@ public static boolean isBlobStorageAsScratchDir(final Configuration conf) { DISABLE_BLOBSTORAGE_AS_SCRATCHDIR ); } + + public static boolean shouldFinalJobWriteToBlobStore(final Configuration conf) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_BLOBSTORE_FINAL_JOB_WRITE_TO_BLOBSTORE); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d287b45..08a3eb4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1236,7 +1236,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false), HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only. Will cause Heartbeater to fail.", false), - HIVEMERGEMAPFILES("hive.merge.mapfiles", true, + HIVEMERGEMAPFILES("hive.merge.mapfiles", false, "Merge small files at the end of a map-only job"), HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false, "Merge small files at the end of a map-reduce job"), @@ -3168,8 +3168,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + HIVE_BLOBSTORE_FINAL_JOB_WRITE_TO_BLOBSTORE("hive.blobstore.final.job.write.to.blobstore", true, + "If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore."); public final String varname; private final String altName; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 838d73e..fbdd8fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -353,10 +353,12 @@ public Path getMRScratchDir() { * @param path Path used to verify the Filesystem to use for temporary directory * @return A path to the new temporary directory */ - public Path getTempDirForPath(Path path) { + public Path getTempDirForPath(Path path, boolean isFinalMrJob) { boolean isLocal = isPathLocal(path); - if ((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) - || isLocal) { + if (isFinalMrJob && BlobStorageUtils.shouldFinalJobWriteToBlobStore(conf)) { + return getExtTmpPathRelTo(path); + } else if ((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) + || isLocal) { // For better write performance, we use HDFS for temporary data when object store is used. // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system // to take advantage of this performance. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..ea24024 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1302,10 +1302,10 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, } // - // 2. Constructing a conditional task consisting of a move task and a map reduce task + // 2. Constructing a conditional task consisting of a move task and a map reduce task - this is the task to move from staging to staging // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false, false); MapWork cplan; Serializable work; @@ -1356,7 +1356,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, mrCtx.setLbCtx(fsInputDesc.getLbCtx()); // - // 3. add the moveTask as the children of the conditional task + // 3. add the moveTask as the children of the conditional task - says that mvTasks is a loadTableWork - the correct one // linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask); } @@ -1806,7 +1806,8 @@ public static Path createMoveTask(Task currTask, boolean // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. - Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); + MapRedTask mapRedTask = (MapRedTask) currTask; + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), mapRedTask.getWork().isFinalMapRed()); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 17dfd03..6337cb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6621,7 +6621,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (isNonNativeTable) { queryTmpdir = dest_path; } else { - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate @@ -6738,7 +6738,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6786,7 +6786,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getTempDirForPath(qPath); + queryTmpdir = ctx.getTempDirForPath(qPath, true); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e); @@ -7001,7 +7001,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); if (!destTableIsMaterialization && HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { - String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString(); + String statsTmpLoc = ctx.getTempDirForPath(dest_path, false).toString(); fileSinkDesc.setStatsTmpDir(statsTmpLoc); LOG.debug("Set stats collection dir : " + statsTmpLoc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 68b0ad9..c3c2126 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -132,7 +132,7 @@ public void setLbCtx(ListBucketingCtx lbCtx) { long avgConditionSize = conf .getLongVar(HiveConf.ConfVars.HIVEMERGEMAPFILESAVGSIZE); trgtSize = Math.max(trgtSize, avgConditionSize); - + // this is the staging to staging move Task mvTask = ctx.getListTasks().get(0); Task mrTask = ctx.getListTasks().get(1); Task mrAndMvTask = ctx.getListTasks().get(2); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java index 808cb94..42f1add 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java @@ -52,17 +52,17 @@ public void testGetScratchDirectoriesForPaths() throws IOException { // directory on the default scratch diretory location (usually /temp) Path mrTmpPath = new Path("hdfs://hostname/tmp/scratch"); doReturn(mrTmpPath).when(spyContext).getMRTmpPath(); - assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("s3a://bucket/dir"))); + assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("s3a://bucket/dir"), false)); // When local filesystem paths are used, then getMRTmpPatch() should be called to // get a temporary directory - assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("file:/user"))); - assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("file:///user"))); + assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("file:/user"), false)); + assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("file:///user"), false)); // When Non-Object store paths are used, then getExtTmpPathRelTo is called to get a temporary // directory on the same path passed as a parameter Path tmpPathRelTo = new Path("hdfs://hostname/user"); doReturn(tmpPathRelTo).when(spyContext).getExtTmpPathRelTo(any(Path.class)); - assertEquals(tmpPathRelTo, spyContext.getTempDirForPath(new Path("/user"))); + assertEquals(tmpPathRelTo, spyContext.getTempDirForPath(new Path("/user"), false)); } }