diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 9984ce5..fa584fe 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -295,6 +295,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ vector_left_outer_join.q,\ vector_left_outer_join2.q,\ vector_leftsemi_mapjoin.q,\ + vector_mapjoin_complex_values.q,\ vector_map_order.q,\ vector_mr_diff_schema_alias.q,\ vector_multi_insert.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e93d666..7f48dc5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -3623,6 +3623,9 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi * * Value expressions include keys? YES. */ + boolean supportsValueTypes = true; // Assume. + HashSet notSupportedValueTypes = new HashSet(); + int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length]; String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length]; TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length]; @@ -3637,7 +3640,13 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi ExprNodeDesc exprNode = bigTableExprs.get(i); bigTableValueColumnNames[i] = exprNode.toString(); - bigTableValueTypeInfos[i] = exprNode.getTypeInfo(); + TypeInfo typeInfo = exprNode.getTypeInfo(); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + supportsValueTypes = false; + Category category = typeInfo.getCategory(); + notSupportedValueTypes.add(category.toString()); + } + bigTableValueTypeInfos[i] = typeInfo; } if (bigTableValueExpressionsList.size() == 0) { slimmedBigTableValueExpressions = null; @@ -3880,6 +3889,10 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi if (!supportsKeyTypes) { vectorDesc.setNotSupportedKeyTypes(new ArrayList(notSupportedKeyTypes)); } + vectorDesc.setSupportsValueTypes(supportsValueTypes); + if (!supportsValueTypes) { + vectorDesc.setNotSupportedValueTypes(new ArrayList(notSupportedValueTypes)); + } // Check common conditions for both Optimized and Fast Hash Tables. boolean result = true; // Assume. @@ -3889,7 +3902,8 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi !oneMapJoinCondition || hasNullSafes || !smallTableExprVectorizes || - outerJoinHasNoKeys) { + outerJoinHasNoKeys || + !supportsValueTypes) { result = false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 8ba5101..0c127c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -557,6 +557,16 @@ public MapJoinOperatorExplainVectorization(MapJoinDesc mapJoinDesc, vectorMapJoinDesc.getSupportsKeyTypes(), "Optimized Table and Supports Key Types")); } + final boolean supportsValueTypes = vectorMapJoinDesc.getSupportsValueTypes(); + if (!supportsValueTypes) { + + // Only add this condition when false to avoid mega-Q file update. + conditionList.add( + new VectorizationCondition( + false, + "Supports Value Types " + + vectorMapJoinDesc.getNotSupportedValueTypes().toString())); + } VectorizationCondition[] conditions = conditionList.toArray(new VectorizationCondition[0]); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java index 89a07b4..a0ee3a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java @@ -204,6 +204,8 @@ public VectorMapJoinInfo getVectorMapJoinInfo() { private boolean isHybridHashJoin; private boolean supportsKeyTypes; private List notSupportedKeyTypes; + private boolean supportsValueTypes; + private List notSupportedValueTypes; private boolean smallTableExprVectorizes; private boolean outerJoinHasNoKeys; boolean isFullOuter; @@ -250,6 +252,18 @@ public void setNotSupportedKeyTypes(List notSupportedKeyTypes) { public List getNotSupportedKeyTypes() { return notSupportedKeyTypes; } + public void setSupportsValueTypes(boolean supportsValueTypes) { + this.supportsValueTypes = supportsValueTypes; + } + public boolean getSupportsValueTypes() { + return supportsValueTypes; + } + public void setNotSupportedValueTypes(List notSupportedValueTypes) { + this.notSupportedValueTypes = notSupportedValueTypes; + } + public List getNotSupportedValueTypes() { + return notSupportedValueTypes; + } public void setSmallTableExprVectorizes(boolean smallTableExprVectorizes) { this.smallTableExprVectorizes = smallTableExprVectorizes; } diff --git ql/src/test/queries/clientpositive/vector_mapjoin_complex_values.q ql/src/test/queries/clientpositive/vector_mapjoin_complex_values.q new file mode 100644 index 0000000..1c88daa --- /dev/null +++ ql/src/test/queries/clientpositive/vector_mapjoin_complex_values.q @@ -0,0 +1,34 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.vectorized.execution.enabled=true; +set hive.auto.convert.join=true; +set hive.mapjoin.hybridgrace.hashtable=false; +set hive.fetch.task.conversion=none; +set hive.cli.print.header=true; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; + +create table census( +ssn int, +name string, +city string, +email string) +row format delimited +fields terminated by ','; + +insert into census values(100,"raj","san jose","email"); + +create table census_clus( +ssn int, +name string, +city string, +email string) +clustered by (ssn) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true'); + +insert into table census_clus select * from census; + +EXPLAIN VECTORIZATION DETAIL +UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn); + +UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn); \ No newline at end of file diff --git ql/src/test/results/clientpositive/llap/vector_mapjoin_complex_values.q.out ql/src/test/results/clientpositive/llap/vector_mapjoin_complex_values.q.out new file mode 100644 index 0000000..36290ae --- /dev/null +++ ql/src/test/results/clientpositive/llap/vector_mapjoin_complex_values.q.out @@ -0,0 +1,355 @@ +PREHOOK: query: create table census( +ssn int, +name string, +city string, +email string) +row format delimited +fields terminated by ',' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@census +POSTHOOK: query: create table census( +ssn int, +name string, +city string, +email string) +row format delimited +fields terminated by ',' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@census +PREHOOK: query: insert into census values(100,"raj","san jose","email") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@census +POSTHOOK: query: insert into census values(100,"raj","san jose","email") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@census +POSTHOOK: Lineage: census.city SCRIPT [] +POSTHOOK: Lineage: census.email SCRIPT [] +POSTHOOK: Lineage: census.name SCRIPT [] +POSTHOOK: Lineage: census.ssn SCRIPT [] +col1 col2 col3 col4 +PREHOOK: query: create table census_clus( +ssn int, +name string, +city string, +email string) +clustered by (ssn) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@census_clus +POSTHOOK: query: create table census_clus( +ssn int, +name string, +city string, +email string) +clustered by (ssn) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@census_clus +PREHOOK: query: insert into table census_clus select * from census +PREHOOK: type: QUERY +PREHOOK: Input: default@census +PREHOOK: Output: default@census_clus +POSTHOOK: query: insert into table census_clus select * from census +POSTHOOK: type: QUERY +POSTHOOK: Input: default@census +POSTHOOK: Output: default@census_clus +POSTHOOK: Lineage: census_clus.city SIMPLE [(census)census.FieldSchema(name:city, type:string, comment:null), ] +POSTHOOK: Lineage: census_clus.email SIMPLE [(census)census.FieldSchema(name:email, type:string, comment:null), ] +POSTHOOK: Lineage: census_clus.name SIMPLE [(census)census.FieldSchema(name:name, type:string, comment:null), ] +POSTHOOK: Lineage: census_clus.ssn SIMPLE [(census)census.FieldSchema(name:ssn, type:int, comment:null), ] +census.ssn census.name census.city census.email +PREHOOK: query: EXPLAIN VECTORIZATION DETAIL +UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn) +PREHOOK: type: QUERY +PREHOOK: Input: default@census +PREHOOK: Input: default@census_clus +PREHOOK: Output: default@census_clus +POSTHOOK: query: EXPLAIN VECTORIZATION DETAIL +UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@census +POSTHOOK: Input: default@census_clus +POSTHOOK: Output: default@census_clus +Explain +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Reducer 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 4 <- Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: census_clus + filterExpr: (ssn = 100) (type: boolean) + Statistics: Num rows: 1 Data size: 185 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ssn:int, 1:name:string, 2:city:string, 3:email:string, 4:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColEqualLongScalar(col 0:int, val 100) + predicate: (ssn = 100) (type: boolean) + Statistics: Num rows: 1 Data size: 185 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Left Semi Join 0 to 1 + keys: + 0 100 (type: int) + 1 100 (type: int) + Map Join Vectorization: + bigTableKeyExpressions: ConstantVectorExpression(val 100) -> 5:int + bigTableValueExpressions: col 2:string, col 3:string, col 4:struct + className: VectorMapJoinOperator + native: false + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Optimized Table and Supports Key Types IS true + nativeConditionsNotMet: Supports Value Types [STRUCT] IS false + outputColumnNames: _col2, _col3, _col6 + input vertices: + 1 Reducer 4 + Statistics: Num rows: 1 Data size: 257 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col6 (type: struct), _col2 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col3, _col4 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [2, 0, 1] + Statistics: Num rows: 1 Data size: 357 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: struct) + sort order: + + Map-reduce partition columns: UDFToInteger(_col0) (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + keyColumns: 2:struct + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + partitionColumns: 3:int + valueColumns: 0:string, 1:string + Statistics: Num rows: 1 Data size: 357 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col3 (type: string), _col4 (type: string) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0, 2, 3] + dataColumns: ssn:int, name:string, city:string, email:string + neededVirtualColumns: [ROWID] + partitionColumnCount: 0 + scratchColumnTypeNames: [bigint] + Map 3 + Map Operator Tree: + TableScan + alias: census + filterExpr: (ssn = 100) (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ssn:int, 1:name:string, 2:city:string, 3:email:string, 4:ROW__ID:struct] + Filter Operator + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: FilterLongColEqualLongScalar(col 0:int, val 100) + predicate: (ssn = 100) (type: boolean) + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [] + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + Group By Vectorization: + className: VectorGroupByOperator + groupByMode: HASH + keyExpressions: ConstantVectorExpression(val 100) -> 5:int + native: false + vectorProcessingMode: HASH + projectedOutputColumnNums: [] + keys: 100 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: 100 (type: int) + sort order: + + Map-reduce partition columns: 100 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 1:int + keyExpressions: ConstantVectorExpression(val 100) -> 1:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: no inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vector.serde.deserialize IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.mapred.TextInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0] + dataColumns: ssn:int, name:string, city:string, email:string + partitionColumnCount: 0 + scratchColumnTypeNames: [bigint] + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: z + reduceColumnSortOrder: + + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + dataColumns: KEY.reducesinkkey0:struct, VALUE._col1:string, VALUE._col2:string + partitionColumnCount: 0 + scratchColumnTypeNames: [bigint, string] + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: struct), 100 (type: int), 'updated name' (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 3, 4, 1, 2] + selectExpressions: ConstantVectorExpression(val 100) -> 3:int, ConstantVectorExpression(val updated name) -> 4:string + Statistics: Num rows: 1 Data size: 357 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 1 Data size: 357 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.census_clus + Write Type: UPDATE + Reducer 4 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: a + reduceColumnSortOrder: + + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 1 + dataColumns: KEY._col0:int + partitionColumnCount: 0 + scratchColumnTypeNames: [bigint, bigint] + Reduce Operator Tree: + Group By Operator + Group By Vectorization: + className: VectorGroupByOperator + groupByMode: MERGEPARTIAL + keyExpressions: ConstantVectorExpression(val 100) -> 1:int, ConstantVectorExpression(val 100) -> 2:int + native: false + vectorProcessingMode: MERGE_PARTIAL + projectedOutputColumnNums: [] + keys: 100 (type: int), 100 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [] + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + Group By Vectorization: + className: VectorGroupByOperator + groupByMode: HASH + keyExpressions: ConstantVectorExpression(val 100) -> 2:int + native: false + vectorProcessingMode: HASH + projectedOutputColumnNums: [] + keys: 100 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: 100 (type: int) + sort order: + + Map-reduce partition columns: 100 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 1:int + keyExpressions: ConstantVectorExpression(val 100) -> 1:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.census_clus + Write Type: UPDATE + + Stage: Stage-3 + Stats Work + Basic Stats Work: + +PREHOOK: query: UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn) +PREHOOK: type: QUERY +PREHOOK: Input: default@census +PREHOOK: Input: default@census_clus +PREHOOK: Output: default@census_clus +POSTHOOK: query: UPDATE census_clus SET name = 'updated name' where ssn=100 and EXISTS (select distinct ssn from census where ssn=census_clus.ssn) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@census +POSTHOOK: Input: default@census_clus +POSTHOOK: Output: default@census_clus +row__id ssn _c2 city email