diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java index 6ca35e2..1c28644 100644 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -51,4 +51,14 @@ public static boolean isBlobStorageAsScratchDir(final Configuration conf) { DISABLE_BLOBSTORAGE_AS_SCRATCHDIR ); } + + /** + * Returns true if the output of the final MR / Spark / Tez Job should be written to a blobstore. All intermediate + * data will be written to the default fs, and only the final job will write to the blobstore. This has the advantage + * that all intermediate data can be stored on HDFS or the local fs, and only the final output gets written to the + * blobstore. + */ + public static boolean shouldWriteFinalOutputToBlobstore(final Configuration conf) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_BLOBSTORE_WRITE_FINAL_OUTPUT_TO_BLOBSTORE); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d287b45..776b292 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3168,8 +3168,15 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); - + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_WRITE_FINAL_OUTPUT_TO_BLOBSTORE("hive.blobstore.write.final.output.to.blobstore", true, + "If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore. " + + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore. " + + "The advantage is that any copying of data that needs to be done from the scratch directory to the final " + + "table directory can be server-side, within the blobstore. The MoveTask simply renames data from the " + + "scratch directory to the final table location, which should translate to a server-side COPY request. " + + "This way HiveServer2 doesn't have to actually copy any data, it just tells the blobstore to do all the work"); public final String varname; private final String altName; diff --git a/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q b/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q index e36ef1d..bdcd0e0 100644 --- a/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q +++ b/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q @@ -1,4 +1,6 @@ DROP TABLE qtest; CREATE TABLE qtest (value int) LOCATION '${hiveconf:test.blobstore.path.unique}/qtest/'; +EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000); INSERT INTO qtest VALUES (1), (10), (100), (1000); +EXPLAIN EXTENDED SELECT * FROM qtest; SELECT * FROM qtest; diff --git a/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out b/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out index 919f3e7..1b47255 100644 --- a/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out +++ b/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out @@ -12,15 +12,314 @@ POSTHOOK: type: CREATETABLE POSTHOOK: Input: #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@qtest +PREHOOK: query: EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-7 depends on stages: Stage-1 , consists of Stage-4, Stage-3, Stage-5 + Stage-4 + Stage-0 depends on stages: Stage-4, Stage-3, Stage-6 + Stage-2 depends on stages: Stage-0 + Stage-3 + Stage-5 + Stage-6 depends on stages: Stage-5 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: values__tmp__table__1 + Statistics: Num rows: 1 Data size: 14 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: UDFToInteger(tmp_values_col1) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 14 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 14 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: Values__Tmp__Table__1 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns tmp_values_col1 + columns.comments + columns.types string +#### A masked pattern was here #### + name default.values__tmp__table__1 + serialization.ddl struct values__tmp__table__1 { string tmp_values_col1} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns tmp_values_col1 + columns.comments + columns.types string +#### A masked pattern was here #### + name default.values__tmp__table__1 + serialization.ddl struct values__tmp__table__1 { string tmp_values_col1} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.values__tmp__table__1 + name: default.values__tmp__table__1 + Truncated Path -> Alias: +#### A masked pattern was here #### + + Stage: Stage-7 + Conditional Operator + + Stage: Stage-4 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-0 + Move Operator + tables: + replace: false +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + + Stage: Stage-2 + Stats-Aggr Operator +#### A masked pattern was here #### + + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -ext-10002 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + name: default.qtest + Truncated Path -> Alias: +#### A masked pattern was here #### + + Stage: Stage-5 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -ext-10002 + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns value + columns.comments + columns.types int +#### A masked pattern was here #### + location #### A masked pattern was here #### + name default.qtest + serialization.ddl struct qtest { i32 value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.qtest + name: default.qtest + Truncated Path -> Alias: +#### A masked pattern was here #### + + Stage: Stage-6 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: INSERT INTO qtest VALUES (1), (10), (100), (1000) PREHOOK: type: QUERY -PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Input: default@values__tmp__table__2 PREHOOK: Output: default@qtest POSTHOOK: query: INSERT INTO qtest VALUES (1), (10), (100), (1000) POSTHOOK: type: QUERY -POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Input: default@values__tmp__table__2 POSTHOOK: Output: default@qtest -POSTHOOK: Lineage: qtest.value EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: qtest.value EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: EXPLAIN EXTENDED SELECT * FROM qtest +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED SELECT * FROM qtest +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: qtest + Statistics: Num rows: 3 Data size: 14 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: value (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 3 Data size: 14 Basic stats: COMPLETE Column stats: NONE + ListSink + PREHOOK: query: SELECT * FROM qtest PREHOOK: type: QUERY PREHOOK: Input: default@qtest diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 838d73e..fccf214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -347,16 +347,20 @@ public Path getMRScratchDir() { /** * Create a temporary directory depending of the path specified. - * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS), unless isFinalJob and + * {@link BlobStorageUtils#shouldWriteFinalOutputToBlobstore(Configuration)} are both true, then return a path on + * the blobstore. * - If path is on HDFS, then create a staging directory inside the path * * @param path Path used to verify the Filesystem to use for temporary directory + * @param isFinalJob true if the required {@link Path} will be used for the final job (e.g. the final FSOP) + * * @return A path to the new temporary directory - */ - public Path getTempDirForPath(Path path) { + */ + public Path getTempDirForPath(Path path, boolean isFinalJob) { boolean isLocal = isPathLocal(path); - if ((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) - || isLocal) { + if (((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) + || isLocal) && !(isFinalJob && BlobStorageUtils.shouldWriteFinalOutputToBlobstore(conf))) { // For better write performance, we use HDFS for temporary data when object store is used. // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system // to take advantage of this performance. @@ -366,6 +370,19 @@ public Path getTempDirForPath(Path path) { } } + + /** + * Create a temporary directory depending of the path specified. + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) + * - If path is on HDFS, then create a staging directory inside the path + * + * @param path Path used to verify the Filesystem to use for temporary directory + * @return A path to the new temporary directory + */ + public Path getTempDirForPath(Path path) { + return getTempDirForPath(path, false); + } + /* * Checks if the path is for the local filesystem or not */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..88d5afa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1806,7 +1806,7 @@ public static Path createMoveTask(Task currTask, boolean // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. - Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), true); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 17dfd03..02e7239 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6621,7 +6621,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (isNonNativeTable) { queryTmpdir = dest_path; } else { - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate @@ -6738,7 +6738,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6786,7 +6786,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getTempDirForPath(qPath); + queryTmpdir = ctx.getTempDirForPath(qPath, true); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e);