diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 2f155f6..4480600 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -146,7 +146,7 @@ public JobConf pushProjectionsAndFilters(JobConf jobConf, Path path) if ((part != null) && (part.getTableDesc() != null)) { Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), cloneJobConf); } - pushProjectionsAndFilters(cloneJobConf, path.toString(), path.toUri().toString()); + pushProjectionsAndFilters(cloneJobConf, path.toString(), path.toUri().getPath()); return cloneJobConf; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index d6be4bd..07f560d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -100,20 +100,22 @@ final List typeListWanted = new ArrayList(); final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false); for (final Integer idx : indexColumnsWanted) { - String col = listColumns.get(idx); - if (indexAccess) { - typeListWanted.add(tableSchema.getType(col)); - } else { - col = col.toLowerCase(); - if (lowerCaseFileSchemaColumns.containsKey(col)) { - typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col))); + if(idx < listColumns.size()) { + String col = listColumns.get(idx); + if (indexAccess) { + typeListWanted.add(tableSchema.getType(col)); } else { - // should never occur? - String msg = "Column " + col + " at index " + idx + " does not exist in " + + col = col.toLowerCase(); + if (lowerCaseFileSchemaColumns.containsKey(col)) { + typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col))); + } else { + // should never occur? + String msg = "Column " + col + " at index " + idx + " does not exist in " + lowerCaseFileSchemaColumns; - throw new IllegalStateException(msg); + throw new IllegalStateException(msg); + } } - } + } } requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), typeListWanted), fileSchema, configuration); diff --git a/ql/src/test/queries/clientpositive/parquet_join.q b/ql/src/test/queries/clientpositive/parquet_join.q new file mode 100644 index 0000000..4a6055e --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_join.q @@ -0,0 +1,32 @@ +drop table if exists staging; +drop table if exists parquet_jointable1; +drop table if exists parquet_jointable2; +drop table if exists parquet_jointable1_bucketed_sorted; +drop table if exists parquet_jointable2_bucketed_sorted; + +create table staging (key int, value string) stored as textfile; +insert into table staging select distinct key, value from src order by key limit 2; + +create table parquet_jointable1 stored as parquet as select * from staging; + +create table parquet_jointable2 stored as parquet as select key,key+1,concat(value,"value") as myvalue from staging; + +explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key; +select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key; + +set hive.auto.convert.join=true; + +explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key; +select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key; + +set hive.optimize.bucketmapjoin=true; +set hive.optimize.bucketmapjoin.sortedmerge=true; +set hive.auto.convert.sortmerge.join=true; +set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; + +create table parquet_jointable1_bucketed_sorted (key int,value string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet; +insert overwrite table parquet_jointable1_bucketed_sorted select key,concat(value,"value1") as value from staging cluster by key; +create table parquet_jointable2_bucketed_sorted (key int,value1 string, value2 string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet; +insert overwrite table parquet_jointable2_bucketed_sorted select key,concat(value,"value2-1") as value1,concat(value,"value2-2") as value2 from staging cluster by key; +explain select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key; +select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key; diff --git a/ql/src/test/results/clientpositive/parquet_join.q.out b/ql/src/test/results/clientpositive/parquet_join.q.out new file mode 100644 index 0000000..ec87334 --- /dev/null +++ b/ql/src/test/results/clientpositive/parquet_join.q.out @@ -0,0 +1,300 @@ +PREHOOK: query: drop table if exists staging +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists staging +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists parquet_jointable1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists parquet_jointable1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists parquet_jointable2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists parquet_jointable2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists parquet_jointable1_bucketed_sorted +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists parquet_jointable1_bucketed_sorted +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists parquet_jointable2_bucketed_sorted +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists parquet_jointable2_bucketed_sorted +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table staging (key int, value string) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: create table staging (key int, value string) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@staging +PREHOOK: query: insert into table staging select distinct key, value from src order by key limit 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@staging +POSTHOOK: query: insert into table staging select distinct key, value from src order by key limit 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@staging +POSTHOOK: Lineage: staging.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: staging.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table parquet_jointable1 stored as parquet as select * from staging +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@staging +POSTHOOK: query: create table parquet_jointable1 stored as parquet as select * from staging +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@staging +POSTHOOK: Output: default@parquet_jointable1 +PREHOOK: query: create table parquet_jointable2 stored as parquet as select key,key+1,concat(value,"value") as myvalue from staging +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@staging +POSTHOOK: query: create table parquet_jointable2 stored as parquet as select key,key+1,concat(value,"value") as myvalue from staging +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@staging +POSTHOOK: Output: default@parquet_jointable2 +PREHOOK: query: explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +PREHOOK: type: QUERY +POSTHOOK: query: explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + value expressions: myvalue (type: string) + TableScan + alias: p1 + Statistics: Num rows: 2 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col1} + outputColumnNames: _col6 + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col6 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_jointable1 +PREHOOK: Input: default@parquet_jointable2 +#### A masked pattern was here #### +POSTHOOK: query: select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_jointable1 +POSTHOOK: Input: default@parquet_jointable2 +#### A masked pattern was here #### +val_0value +val_10value +PREHOOK: query: explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +PREHOOK: type: QUERY +POSTHOOK: query: explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-3 depends on stages: Stage-4 + Stage-0 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + p1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + p1 + TableScan + alias: p1 + Statistics: Num rows: 2 Data size: 4 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + condition expressions: + 0 + 1 {myvalue} + keys: + 0 key (type: int) + 1 key (type: int) + + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {myvalue} + keys: + 0 key (type: int) + 1 key (type: int) + outputColumnNames: _col6 + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col6 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_jointable1 +PREHOOK: Input: default@parquet_jointable2 +#### A masked pattern was here #### +POSTHOOK: query: select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_jointable1 +POSTHOOK: Input: default@parquet_jointable2 +#### A masked pattern was here #### +val_0value +val_10value +PREHOOK: query: create table parquet_jointable1_bucketed_sorted (key int,value string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: create table parquet_jointable1_bucketed_sorted (key int,value string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_jointable1_bucketed_sorted +PREHOOK: query: insert overwrite table parquet_jointable1_bucketed_sorted select key,concat(value,"value1") as value from staging cluster by key +PREHOOK: type: QUERY +PREHOOK: Input: default@staging +PREHOOK: Output: default@parquet_jointable1_bucketed_sorted +POSTHOOK: query: insert overwrite table parquet_jointable1_bucketed_sorted select key,concat(value,"value1") as value from staging cluster by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@staging +POSTHOOK: Output: default@parquet_jointable1_bucketed_sorted +POSTHOOK: Lineage: parquet_jointable1_bucketed_sorted.key SIMPLE [(staging)staging.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_jointable1_bucketed_sorted.value EXPRESSION [(staging)staging.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: create table parquet_jointable2_bucketed_sorted (key int,value1 string, value2 string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +POSTHOOK: query: create table parquet_jointable2_bucketed_sorted (key int,value1 string, value2 string) clustered by (key) sorted by (key ASC) INTO 1 BUCKETS stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parquet_jointable2_bucketed_sorted +PREHOOK: query: insert overwrite table parquet_jointable2_bucketed_sorted select key,concat(value,"value2-1") as value1,concat(value,"value2-2") as value2 from staging cluster by key +PREHOOK: type: QUERY +PREHOOK: Input: default@staging +PREHOOK: Output: default@parquet_jointable2_bucketed_sorted +POSTHOOK: query: insert overwrite table parquet_jointable2_bucketed_sorted select key,concat(value,"value2-1") as value1,concat(value,"value2-2") as value2 from staging cluster by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@staging +POSTHOOK: Output: default@parquet_jointable2_bucketed_sorted +POSTHOOK: Lineage: parquet_jointable2_bucketed_sorted.key SIMPLE [(staging)staging.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: parquet_jointable2_bucketed_sorted.value1 EXPRESSION [(staging)staging.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: parquet_jointable2_bucketed_sorted.value2 EXPRESSION [(staging)staging.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: explain select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key +PREHOOK: type: QUERY +POSTHOOK: query: explain select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 2 Data size: 6 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {value} + 1 {value2} + keys: + 0 key (type: int) + 1 key (type: int) + outputColumnNames: _col1, _col6 + Select Operator + expressions: _col1 (type: string), _col6 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_jointable1_bucketed_sorted +PREHOOK: Input: default@parquet_jointable2_bucketed_sorted +#### A masked pattern was here #### +POSTHOOK: query: select p1.value,p2.value2 from parquet_jointable1_bucketed_sorted p1 join parquet_jointable2_bucketed_sorted p2 on p1.key=p2.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_jointable1_bucketed_sorted +POSTHOOK: Input: default@parquet_jointable2_bucketed_sorted +#### A masked pattern was here #### +val_0value1 val_0value2-2 +val_10value1 val_10value2-2