diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java index 6ca35e2..156a8a4 100644 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -51,4 +51,8 @@ public static boolean isBlobStorageAsScratchDir(final Configuration conf) { DISABLE_BLOBSTORAGE_AS_SCRATCHDIR ); } + + public static boolean shouldWriteFinalOutputToBlobstore(final Configuration conf) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_BLOBSTORE_WRITE_FINAL_OUTPUT_TO_BLOBSTORE); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d287b45..776b292 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3168,8 +3168,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); - + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_WRITE_FINAL_OUTPUT_TO_BLOBSTORE("hive.blobstore.write.final.output.to.blobstore", true, + "If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore. " + + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore. " + + "The advantage is that any copying of data that needs to be done from the scratch directory to the final " + + "table directory can be server-side, within the blobstore. The MoveTask simply renames data from the " + + "scratch directory to the final table location, which should translate to a server-side COPY request. " + + "This way HiveServer2 doesn't have to actually copy any data, it just tells the blobstore to do all the work"); public final String varname; private final String altName; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 838d73e..fccf214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -347,16 +347,20 @@ public Path getMRScratchDir() { /** * Create a temporary directory depending of the path specified. - * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS), unless isFinalJob and + * {@link BlobStorageUtils#shouldWriteFinalOutputToBlobstore(Configuration)} are both true, then return a path on + * the blobstore. * - If path is on HDFS, then create a staging directory inside the path * * @param path Path used to verify the Filesystem to use for temporary directory + * @param isFinalJob true if the required {@link Path} will be used for the final job (e.g. the final FSOP) + * * @return A path to the new temporary directory - */ - public Path getTempDirForPath(Path path) { + */ + public Path getTempDirForPath(Path path, boolean isFinalJob) { boolean isLocal = isPathLocal(path); - if ((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) - || isLocal) { + if (((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) + || isLocal) && !(isFinalJob && BlobStorageUtils.shouldWriteFinalOutputToBlobstore(conf))) { // For better write performance, we use HDFS for temporary data when object store is used. // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system // to take advantage of this performance. @@ -366,6 +370,19 @@ public Path getTempDirForPath(Path path) { } } + + /** + * Create a temporary directory depending of the path specified. + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) + * - If path is on HDFS, then create a staging directory inside the path + * + * @param path Path used to verify the Filesystem to use for temporary directory + * @return A path to the new temporary directory + */ + public Path getTempDirForPath(Path path) { + return getTempDirForPath(path, false); + } + /* * Checks if the path is for the local filesystem or not */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..88d5afa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1806,7 +1806,7 @@ public static Path createMoveTask(Task currTask, boolean // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. - Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), true); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 17dfd03..02e7239 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6621,7 +6621,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (isNonNativeTable) { queryTmpdir = dest_path; } else { - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate @@ -6738,7 +6738,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6786,7 +6786,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getTempDirForPath(qPath); + queryTmpdir = ctx.getTempDirForPath(qPath, true); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e);