diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java index 699c7bf..1a84024 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java @@ -103,7 +103,7 @@ protected void setUp() { db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false, false); + db.loadTable(hadoopDataFile[i], src, false, false, false, false); i++; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 9960917..e1dc911 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -273,7 +273,8 @@ public int execute(DriverContext driverContext) { if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); db.loadTable(tbd.getSourcePath(), tbd.getTable() - .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime(), work.isSrcLocal()); + .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime(), work.isSrcLocal(), + isSkewedStoredAsDirs(tbd)); if (work.getOutputs() != null) { work.getOutputs().add(new WriteEntity(table, (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 86e4fbf..d8ad873 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1468,9 +1468,11 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param holdDDLTime * @param isSrcLocal * If the source directory is LOCAL + * @param isSkewedStoreAsSubdir + * if list bucketing enabled */ public void loadTable(Path loadPath, String tableName, boolean replace, - boolean holdDDLTime, boolean isSrcLocal) throws HiveException { + boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException { Table tbl = getTable(tableName); if (replace) { tbl.replaceFiles(loadPath, isSrcLocal); @@ -1478,6 +1480,20 @@ public void loadTable(Path loadPath, String tableName, boolean replace, tbl.copyFiles(loadPath, isSrcLocal); } + try { + if (isSkewedStoreAsSubdir) { + SkewedInfo skewedInfo = tbl.getSkewedInfo(); + // Construct list bucketing location mappings from sub-directory name. + Map, String> skewedColValueLocationMaps = constructListBucketingLocationMap( + tbl.getPath(), skewedInfo); + // Add list bucketing location mappings. + skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); + } + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); + } + if (!holdDDLTime) { try { alterTable(tableName, tbl); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java index 41d27fa..db16dc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.PrunerOperatorFactory; import org.apache.hadoop.hive.ql.optimizer.pcr.PcrOpProcFactory; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; @@ -51,23 +52,24 @@ protected void generatePredicate(NodeProcessorCtx procCtx, FilterOperator fop, TableScanOperator top) throws SemanticException, UDFArgumentException { LBOpPartitionWalkerCtx owc = (LBOpPartitionWalkerCtx) procCtx; + Table tbl = owc.getParseContext().getTopToTable().get(top); + if (tbl.isPartitioned()) { + // Run partition pruner to get partitions + ParseContext parseCtx = owc.getParseContext(); + PrunedPartitionList prunedPartList; + try { + String alias = (String) parseCtx.getTopOps().keySet().toArray()[0]; + prunedPartList = PartitionPruner.prune(top, parseCtx, alias); + } catch (HiveException e) { + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + throw new SemanticException(e.getMessage(), e); + } - //Run partition pruner to get partitions - ParseContext parseCtx = owc.getParseContext(); - PrunedPartitionList prunedPartList; - try { - String alias = (String) parseCtx.getTopOps().keySet().toArray()[0]; - prunedPartList = PartitionPruner.prune(top, parseCtx, alias); - } catch (HiveException e) { - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - throw new SemanticException(e.getMessage(), e); + if (prunedPartList != null) { + owc.setPartitions(prunedPartList); + } } - - if (prunedPartList != null) { - owc.setPartitions(prunedPartList); - } - } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java index faa0f18..4faaa25 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java @@ -132,7 +132,7 @@ db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, src, true, true); db.createTable(src, cols, null, TextInputFormat.class, IgnoreKeyTextOutputFormat.class); - db.loadTable(hadoopDataFile[i], src, false, false, true); + db.loadTable(hadoopDataFile[i], src, false, false, true, false); i++; } diff --git ql/src/test/queries/clientpositive/list_bucket_dml_14.q ql/src/test/queries/clientpositive/list_bucket_dml_14.q new file mode 100644 index 0000000..f60c5b0 --- /dev/null +++ ql/src/test/queries/clientpositive/list_bucket_dml_14.q @@ -0,0 +1,38 @@ +set hive.mapred.supports.subdirectories=true; +set hive.exec.dynamic.partition=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; +set mapred.input.dir.recursive=true; + +-- list bucketing DML : unpartitioned table and 2 stage query plan. + +-- INCLUDE_HADOOP_MAJOR_VERSIONS(0.23) + +-- create a skewed table +create table list_bucketing (key String, value String) +skewed by (key) on ("484") +stored as DIRECTORIES +; + +-- list bucketing DML +explain extended +insert overwrite table list_bucketing select * from src; +insert overwrite table list_bucketing select * from src; + +-- check DML result +desc formatted list_bucketing; + +select count(1) from src; +select count(1) from list_bucketing; + +select key, value from src where key = "484"; +set hive.optimize.listbucketing=true; +explain extended +select key, value from list_bucketing where key = "484"; +select key, value from list_bucketing where key = "484"; + +-- clean up resources +drop table list_bucketing; + diff --git ql/src/test/results/clientpositive/list_bucket_dml_14.q.out ql/src/test/results/clientpositive/list_bucket_dml_14.q.out new file mode 100644 index 0000000..2c9b4e4 --- /dev/null +++ ql/src/test/results/clientpositive/list_bucket_dml_14.q.out @@ -0,0 +1,404 @@ +PREHOOK: query: -- list bucketing DML : unpartitioned table and 2 stage query plan. + +-- INCLUDE_HADOOP_MAJOR_VERSIONS(0.23) + +-- create a skewed table +create table list_bucketing (key String, value String) +skewed by (key) on ("484") +stored as DIRECTORIES +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: -- list bucketing DML : unpartitioned table and 2 stage query plan. + +-- INCLUDE_HADOOP_MAJOR_VERSIONS(0.23) + +-- create a skewed table +create table list_bucketing (key String, value String) +skewed by (key) on ("484") +stored as DIRECTORIES +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@list_bucketing +PREHOOK: query: -- list bucketing DML +explain extended +insert overwrite table list_bucketing select * from src +PREHOOK: type: QUERY +POSTHOOK: query: -- list bucketing DML +explain extended +insert overwrite table list_bucketing select * from src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + src + TOK_INSERT + TOK_DESTINATION + TOK_TAB + TOK_TABNAME + list_bucketing + TOK_SELECT + TOK_SELEXPR + TOK_ALLCOLREF + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 1 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.list_bucketing + serialization.ddl struct list_bucketing { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.list_bucketing + TotalFiles: 1 + GatherStats: true + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: src + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments defaultdefault + columns.types string:string +#### A masked pattern was here #### + name default.src + numFiles 1 + numRows 0 + rawDataSize 0 + serialization.ddl struct src { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.src + name: default.src + Truncated Path -> Alias: + /src [src] + + Stage: Stage-0 + Move Operator + tables: + replace: true +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.list_bucketing + serialization.ddl struct list_bucketing { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.list_bucketing + + Stage: Stage-2 + Stats-Aggr Operator +#### A masked pattern was here #### + +PREHOOK: query: insert overwrite table list_bucketing select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@list_bucketing +POSTHOOK: query: insert overwrite table list_bucketing select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@list_bucketing +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- check DML result +desc formatted list_bucketing +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@list_bucketing +POSTHOOK: query: -- check DML result +desc formatted list_bucketing +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@list_bucketing +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +# col_name data_type comment + +key string +value string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Protect Mode: None +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + COLUMN_STATS_ACCURATE true + numFiles 2 + numRows 500 + rawDataSize 5312 + totalSize 5812 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Stored As SubDirectories: Yes +Skewed Columns: [key] +Skewed Values: [[484]] +#### A masked pattern was here #### +Skewed Value to Truncated Path: {[484]=/list_bucketing/key=484} +Storage Desc Params: + serialization.format 1 +PREHOOK: query: select count(1) from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +500 +PREHOOK: query: select count(1) from list_bucketing +PREHOOK: type: QUERY +PREHOOK: Input: default@list_bucketing +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from list_bucketing +POSTHOOK: type: QUERY +POSTHOOK: Input: default@list_bucketing +#### A masked pattern was here #### +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +500 +PREHOOK: query: select key, value from src where key = "484" +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select key, value from src where key = "484" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +484 val_484 +PREHOOK: query: explain extended +select key, value from list_bucketing where key = "484" +PREHOOK: type: QUERY +POSTHOOK: query: explain extended +select key, value from list_bucketing where key = "484" +POSTHOOK: type: QUERY +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +ABSTRACT SYNTAX TREE: + +TOK_QUERY + TOK_FROM + TOK_TABREF + TOK_TABNAME + list_bucketing + TOK_INSERT + TOK_DESTINATION + TOK_DIR + TOK_TMP_FILE + TOK_SELECT + TOK_SELEXPR + TOK_TABLE_OR_COL + key + TOK_SELEXPR + TOK_TABLE_OR_COL + value + TOK_WHERE + = + TOK_TABLE_OR_COL + key + "484" + + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: list_bucketing + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: (key = '484') (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0,_col1 + columns.types string:string + escape.delim \ + hive.serialization.extend.nesting.levels true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: list_bucketing + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.list_bucketing + numFiles 2 + numRows 500 + rawDataSize 5312 + serialization.ddl struct list_bucketing { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE true + bucket_count -1 + columns key,value + columns.comments + columns.types string:string +#### A masked pattern was here #### + name default.list_bucketing + numFiles 2 + numRows 500 + rawDataSize 5312 + serialization.ddl struct list_bucketing { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 5812 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.list_bucketing + name: default.list_bucketing + Truncated Path -> Alias: + /list_bucketing [list_bucketing] + + Stage: Stage-0 + Fetch Operator + limit: -1 + +PREHOOK: query: select key, value from list_bucketing where key = "484" +PREHOOK: type: QUERY +PREHOOK: Input: default@list_bucketing +#### A masked pattern was here #### +POSTHOOK: query: select key, value from list_bucketing where key = "484" +POSTHOOK: type: QUERY +POSTHOOK: Input: default@list_bucketing +#### A masked pattern was here #### +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +484 val_484 +PREHOOK: query: -- clean up resources +drop table list_bucketing +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@list_bucketing +PREHOOK: Output: default@list_bucketing +POSTHOOK: query: -- clean up resources +drop table list_bucketing +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@list_bucketing +POSTHOOK: Output: default@list_bucketing +POSTHOOK: Lineage: list_bucketing.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: list_bucketing.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]