diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java index 6ca35e2..e6a17cb 100644 --- a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -31,11 +31,11 @@ private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; public static boolean isBlobStoragePath(final Configuration conf, final Path path) { - return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); + return path != null && isBlobStorageScheme(conf, path.toUri().getScheme()); } public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { - return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); + return fs != null && isBlobStorageScheme(conf, fs.getScheme()); } public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { @@ -51,4 +51,14 @@ public static boolean isBlobStorageAsScratchDir(final Configuration conf) { DISABLE_BLOBSTORAGE_AS_SCRATCHDIR ); } + + /** + * Returns true if {@link HiveConf.ConfVars#HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED} is true, false otherwise. + */ + public static boolean areOptimizationsEnabled(final Configuration conf) { + return conf.getBoolean( + HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, + HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.defaultBoolVal + ); + } } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a9474c4..207d02b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3190,8 +3190,16 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of supported blobstore schemes."), HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, - "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); - + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."), + + HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED("hive.blobstore.optimizations.enabled", true, + "This parameter enables a number of optimizations when running on blobstores:\n" + + "(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" + + "This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" + + "The advantage is that any copying of data that needs to be done from the scratch directory to the final\n" + + "table directory can be done server-side, within the blobstore. The MoveTask simply renames data from the\n" + + "scratch directory to the final table location, which should translate to a server-side COPY request.\n" + + "This way HiveServer2 doesn't have to actually copy any data, it just tells the blobstore to do all the work."); public final String varname; private final String altName; diff --git a/itests/hive-blobstore/src/test/queries/clientpositive/write_final_output_blobstore.q b/itests/hive-blobstore/src/test/queries/clientpositive/write_final_output_blobstore.q new file mode 100755 index 0000000..7f0086c --- /dev/null +++ b/itests/hive-blobstore/src/test/queries/clientpositive/write_final_output_blobstore.q @@ -0,0 +1,21 @@ +-- Test that the when multiple MR jobs are created for a query, that only the FSOP from the last job writes to S3 + +-- Drop tables +DROP TABLE IF EXISTS hdfs_table; +DROP TABLE IF EXISTS blobstore_table; + +-- Create a table one table on HDFS and another on S3 +CREATE TABLE hdfs_table(key INT); +CREATE TABLE blobstore_table(key INT) LOCATION '${hiveconf:test.blobstore.path.unique}/write_final_output_blobstore/'; + +SET hive.blobstore.use.blobstore.as.scratchdir=false; + +SET hive.blobstore.optimizations.enabled=false; +EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key; + +SET hive.blobstore.optimizations.enabled=true; +EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key; + +-- Drop tables +DROP TABLE hdfs_table; +DROP TABLE blobstore_table; diff --git a/itests/hive-blobstore/src/test/results/clientpositive/write_final_output_blobstore.q.out b/itests/hive-blobstore/src/test/results/clientpositive/write_final_output_blobstore.q.out new file mode 100644 index 0000000..1b1ea97 --- /dev/null +++ b/itests/hive-blobstore/src/test/results/clientpositive/write_final_output_blobstore.q.out @@ -0,0 +1,466 @@ +PREHOOK: query: -- Test that the when multiple MR jobs are created for a query, that only the FSOP from the last job writes to S3 + +-- Drop tables +DROP TABLE IF EXISTS hdfs_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- Test that the when multiple MR jobs are created for a query, that only the FSOP from the last job writes to S3 + +-- Drop tables +DROP TABLE IF EXISTS hdfs_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS blobstore_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS blobstore_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: -- Create a table one table on HDFS and another on S3 +CREATE TABLE hdfs_table(key INT) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@hdfs_table +POSTHOOK: query: -- Create a table one table on HDFS and another on S3 +CREATE TABLE hdfs_table(key INT) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@hdfs_table +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +PREHOOK: Input: ### test.blobstore.path ###/write_final_output_blobstore +PREHOOK: Output: database:default +PREHOOK: Output: default@blobstore_table +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +POSTHOOK: Input: ### test.blobstore.path ###/write_final_output_blobstore +POSTHOOK: Output: database:default +POSTHOOK: Output: default@blobstore_table +PREHOOK: query: EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hdfs_table + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: int) + outputColumnNames: key + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Group By Operator + keys: key (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + tag: -1 + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: hdfs_table + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + name default.hdfs_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct hdfs_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + name default.hdfs_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct hdfs_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.hdfs_table + name: default.hdfs_table + Truncated Path -> Alias: + /hdfs_table [hdfs_table] + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: a + sort order: + + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + tag: -1 + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + location ### test.blobstore.path ###/write_final_output_blobstore + name default.blobstore_table + serialization.ddl struct blobstore_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.blobstore_table + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + + Stage: Stage-0 + Move Operator + tables: + replace: true +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + location ### test.blobstore.path ###/write_final_output_blobstore + name default.blobstore_table + serialization.ddl struct blobstore_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.blobstore_table + + Stage: Stage-3 + Stats-Aggr Operator +#### A masked pattern was here #### + +PREHOOK: query: EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED FROM hdfs_table INSERT OVERWRITE TABLE blobstore_table SELECT hdfs_table.key GROUP BY hdfs_table.key ORDER BY hdfs_table.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: hdfs_table + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: int) + outputColumnNames: key + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Group By Operator + keys: key (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: a + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + tag: -1 + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: hdfs_table + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + name default.hdfs_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct hdfs_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"} + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + name default.hdfs_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct hdfs_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.hdfs_table + name: default.hdfs_table + Truncated Path -> Alias: + /hdfs_table [hdfs_table] + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + GatherStats: false + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: a + sort order: + + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + tag: -1 + auto parallelism: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0 + columns.types int + escape.delim \ + serialization.lib org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Truncated Path -> Alias: +#### A masked pattern was here #### + Needs Tagging: false + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 + directory: ### BLOBSTORE_STAGING_PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Stats Publishing Key Prefix: ### BLOBSTORE_STAGING_PATH ### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + location ### test.blobstore.path ###/write_final_output_blobstore + name default.blobstore_table + serialization.ddl struct blobstore_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.blobstore_table + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + + Stage: Stage-0 + Move Operator + tables: + replace: true + source: ### BLOBSTORE_STAGING_PATH ### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key + columns.comments + columns.types int +#### A masked pattern was here #### + location ### test.blobstore.path ###/write_final_output_blobstore + name default.blobstore_table + serialization.ddl struct blobstore_table { i32 key} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.blobstore_table + + Stage: Stage-3 + Stats-Aggr Operator + Stats Aggregation Key Prefix: ### BLOBSTORE_STAGING_PATH ### + +PREHOOK: query: -- Drop tables +DROP TABLE hdfs_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@hdfs_table +PREHOOK: Output: default@hdfs_table +POSTHOOK: query: -- Drop tables +DROP TABLE hdfs_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@hdfs_table +POSTHOOK: Output: default@hdfs_table +PREHOOK: query: DROP TABLE blobstore_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@blobstore_table +PREHOOK: Output: default@blobstore_table +POSTHOOK: query: DROP TABLE blobstore_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@blobstore_table +POSTHOOK: Output: default@blobstore_table diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 4355d21..bbdce63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -390,23 +390,40 @@ public Path getMRScratchDir() { /** * Create a temporary directory depending of the path specified. + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS), unless isFinalJob and + * {@link BlobStorageUtils#areOptimizationsEnabled(Configuration)} are both true, then return a path on + * the blobstore. + * - If path is on HDFS, then create a staging directory inside the path + * + * @param path Path used to verify the Filesystem to use for temporary directory + * @param isFinalJob true if the required {@link Path} will be used for the final job (e.g. the final FSOP) + * + * @return A path to the new temporary directory + */ + public Path getTempDirForPath(Path path, boolean isFinalJob) { + if (((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir( + conf)) || isPathLocal(path))) { + if (!(isFinalJob && BlobStorageUtils.areOptimizationsEnabled(conf))) { + // For better write performance, we use HDFS for temporary data when object store is used. + // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system + // to take advantage of this performance. + return getMRTmpPath(); + } + } + return getExtTmpPathRelTo(path); + } + + + /** + * Create a temporary directory depending of the path specified. * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) * - If path is on HDFS, then create a staging directory inside the path * * @param path Path used to verify the Filesystem to use for temporary directory * @return A path to the new temporary directory - */ + */ public Path getTempDirForPath(Path path) { - boolean isLocal = isPathLocal(path); - if ((BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) - || isLocal) { - // For better write performance, we use HDFS for temporary data when object store is used. - // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system - // to take advantage of this performance. - return getMRTmpPath(); - } else { - return getExtTmpPathRelTo(path); - } + return getTempDirForPath(path, false); } /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..88d5afa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1806,7 +1806,7 @@ public static Path createMoveTask(Task currTask, boolean // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. - Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath(), true); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7d8b2bd..819c55e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6635,7 +6635,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (isNonNativeTable) { queryTmpdir = dest_path; } else { - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate @@ -6754,7 +6754,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getTempDirForPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path, true); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6802,7 +6802,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getTempDirForPath(qPath); + queryTmpdir = ctx.getTempDirForPath(qPath, true); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e);