diff --git data/files/array_int.txt data/files/array_int.txt new file mode 100644 index 0000000..df70f1c --- /dev/null +++ data/files/array_int.txt @@ -0,0 +1,2 @@ +5023374853555556 +60259106131 \ No newline at end of file diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 7b7559a..9dfea89 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -333,6 +333,7 @@ minitez.query.files=bucket_map_join_tez1.q,\ tez_join_hash.q,\ tez_join_tests.q,\ tez_joins_explain.q,\ + tez_optimized_hash_complex_types.q,\ tez_schema_evolution.q,\ tez_self_join.q,\ tez_union.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index cf4c71d..db9e2b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -52,6 +52,8 @@ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; @@ -733,6 +735,8 @@ public int directSpillPartitionId() { private final ByteArrayRef uselessIndirection; // LBStruct needs ByteArrayRef private final LazyBinaryStruct valueStruct; + private boolean needsComplexObjectFixup; + private ArrayList complexObjectArrayBuffer; private int partitionId; // Current hashMap in use @@ -740,8 +744,13 @@ public ReusableRowContainer() { if (internalValueOi != null) { valueStruct = (LazyBinaryStruct) LazyBinaryFactory.createLazyBinaryObject(internalValueOi); + needsComplexObjectFixup = MapJoinBytesTableContainer.hasComplexObjects(internalValueOi); + if (needsComplexObjectFixup) { + complexObjectArrayBuffer = new ArrayList(internalValueOi.getAllStructFieldRefs().size()); + } } else { valueStruct = null; // No rows? + needsComplexObjectFixup = false; } uselessIndirection = new ByteArrayRef(); hashMapResult = new BytesBytesMultiHashMap.Result(); @@ -859,7 +868,14 @@ public MapJoinRowContainer copy() throws HiveException { } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); - return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? + if (!needsComplexObjectFixup) { + // Good performance for common case where small table has no complex objects. + return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? + } + // Convert the complex LazyBinary objects to standard (Java) objects so downstream + // operators like FileSinkOperator can serialize complex objects in the form they expect + // (i.e. Java objects). + return MapJoinBytesTableContainer.fixupComplexObjects(valueStruct, complexObjectArrayBuffer, internalValueOi); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 34b3aa9..974916d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -49,8 +49,11 @@ import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryObjectInspectorFactory; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -397,6 +400,36 @@ public void put(Writable currentKey, Writable currentValue) throws SerDeExceptio hashMap.put(directWriteHelper, -1); } + public static boolean hasComplexObjects(LazyBinaryStructObjectInspector lazyBinaryStructObjectInspector) { + List fields = lazyBinaryStructObjectInspector.getAllStructFieldRefs(); + + for (StructField field : fields) { + if (field.getFieldObjectInspector().getCategory() != Category.PRIMITIVE) { + return true; + } + } + return false; + } + + public static List fixupComplexObjects(LazyBinaryStruct lazyBinaryStruct, + ArrayList objectArrayBuffer, LazyBinaryStructObjectInspector lazyBinaryStructObjectInspector) { + + List fields = lazyBinaryStructObjectInspector.getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + ObjectInspector objectInspector = field.getFieldObjectInspector(); + Category category = objectInspector.getCategory(); + Object object = lazyBinaryStruct.getField(i); + if (category == Category.PRIMITIVE) { + objectArrayBuffer.set(i, object); + } else { + objectArrayBuffer.set(i, ObjectInspectorUtils.copyToStandardObject( + object, objectInspector, ObjectInspectorCopyOption.WRITABLE)); + } + } + return objectArrayBuffer; + } + /** Implementation of ReusableGetAdaptor that has Output for key serialization; row * container is also created once and reused for every row. */ private class GetAdaptor implements ReusableGetAdaptor, ReusableGetAdaptorDirectAccess { @@ -510,13 +543,20 @@ public int directSpillPartitionId() { private final ByteArrayRef uselessIndirection; // LBStruct needs ByteArrayRef private final LazyBinaryStruct valueStruct; + private boolean needsComplexObjectFixup; + private ArrayList complexObjectArrayBuffer; public ReusableRowContainer() { if (internalValueOi != null) { valueStruct = (LazyBinaryStruct) LazyBinaryFactory.createLazyBinaryObject(internalValueOi); + needsComplexObjectFixup = hasComplexObjects(internalValueOi); + if (needsComplexObjectFixup) { + complexObjectArrayBuffer = new ArrayList(internalValueOi.getAllStructFieldRefs().size()); + } } else { valueStruct = null; // No rows? + needsComplexObjectFixup = false; } uselessIndirection = new ByteArrayRef(); hashMapResult = new BytesBytesMultiHashMap.Result(); @@ -619,7 +659,14 @@ public MapJoinRowContainer copy() throws HiveException { } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); - return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? + if (!needsComplexObjectFixup) { + // Good performance for common case where small table has no complex objects. + return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? + } + // Convert the complex LazyBinary objects to standard (Java) objects so downstream + // operators like FileSinkOperator can serialize complex objects in the form they expect + // (i.e. Java objects). + return fixupComplexObjects(valueStruct, complexObjectArrayBuffer, internalValueOi); } @Override diff --git ql/src/test/queries/clientpositive/tez_optimized_hash_complex_types.q ql/src/test/queries/clientpositive/tez_optimized_hash_complex_types.q new file mode 100644 index 0000000..cc9c426 --- /dev/null +++ ql/src/test/queries/clientpositive/tez_optimized_hash_complex_types.q @@ -0,0 +1,26 @@ +set hive.auto.convert.join=true; +set hive.mapjoin.optimized.hashtable=true; + +create table int_table_a(a bigint) stored as textfile; +create table array_int_b(b bigint, arr_int array) stored as textfile; + +insert into table int_table_a values (1), (23), (90), (4), (5), (20), (21), (22), (23), (24), (25), (26), (27); +load data local inpath '../../data/files/array_int.txt' into table array_int_b; + +-- SORT_QUERY_RESULTS + +select * from int_table_a; +select * from array_int_b; + +explain +select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a; + +select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a; + +CREATE TABLE test (a INT, b MAP) STORED AS ORC; +INSERT OVERWRITE TABLE test SELECT 1, MAP(1, "val_1", 2, "val_2") FROM src LIMIT 1; + +explain +select * from src join test where src.key=test.a; + +select * from src join test where src.key=test.a; \ No newline at end of file diff --git ql/src/test/results/clientpositive/tez/tez_optimized_hash_complex_types.q.out ql/src/test/results/clientpositive/tez/tez_optimized_hash_complex_types.q.out new file mode 100644 index 0000000..c249aa1 --- /dev/null +++ ql/src/test/results/clientpositive/tez/tez_optimized_hash_complex_types.q.out @@ -0,0 +1,245 @@ +PREHOOK: query: create table int_table_a(a bigint) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@int_table_a +POSTHOOK: query: create table int_table_a(a bigint) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@int_table_a +PREHOOK: query: create table array_int_b(b bigint, arr_int array) stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@array_int_b +POSTHOOK: query: create table array_int_b(b bigint, arr_int array) stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@array_int_b +PREHOOK: query: insert into table int_table_a values (1), (23), (90), (4), (5), (20), (21), (22), (23), (24), (25), (26), (27) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@int_table_a +POSTHOOK: query: insert into table int_table_a values (1), (23), (90), (4), (5), (20), (21), (22), (23), (24), (25), (26), (27) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@int_table_a +POSTHOOK: Lineage: int_table_a.a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: load data local inpath '../../data/files/array_int.txt' into table array_int_b +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@array_int_b +POSTHOOK: query: load data local inpath '../../data/files/array_int.txt' into table array_int_b +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@array_int_b +PREHOOK: query: -- SORT_QUERY_RESULTS + +select * from int_table_a +PREHOOK: type: QUERY +PREHOOK: Input: default@int_table_a +#### A masked pattern was here #### +POSTHOOK: query: -- SORT_QUERY_RESULTS + +select * from int_table_a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@int_table_a +#### A masked pattern was here #### +1 +20 +21 +22 +23 +23 +24 +25 +26 +27 +4 +5 +90 +PREHOOK: query: select * from array_int_b +PREHOOK: type: QUERY +PREHOOK: Input: default@array_int_b +#### A masked pattern was here #### +POSTHOOK: query: select * from array_int_b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@array_int_b +#### A masked pattern was here #### +5 [0,23,37,48,53,55,55,56] +6 [0,2,59,106,131] +PREHOOK: query: explain +select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a +PREHOOK: type: QUERY +POSTHOOK: query: explain +select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 2 <- Map 1 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: int_table_a + Statistics: Num rows: 13 Data size: 23 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: a is not null (type: boolean) + Statistics: Num rows: 7 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: a (type: bigint) + sort order: + + Map-reduce partition columns: a (type: bigint) + Statistics: Num rows: 7 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Map 2 + Map Operator Tree: + TableScan + alias: array_int_b + Statistics: Num rows: 1 Data size: 41 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: b is not null (type: boolean) + Statistics: Num rows: 1 Data size: 41 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 a (type: bigint) + 1 b (type: bigint) + outputColumnNames: _col0, _col4, _col5 + input vertices: + 0 Map 1 + Statistics: Num rows: 7 Data size: 13 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Select Operator + expressions: _col0 (type: bigint), _col4 (type: bigint), _col5 (type: array) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 7 Data size: 13 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 7 Data size: 13 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a +PREHOOK: type: QUERY +PREHOOK: Input: default@array_int_b +PREHOOK: Input: default@int_table_a +#### A masked pattern was here #### +POSTHOOK: query: select /*+ MAPJOIN(array_int_b) */ * from int_table_a join array_int_b on array_int_b.b = int_table_a.a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@array_int_b +POSTHOOK: Input: default@int_table_a +#### A masked pattern was here #### +5 5 [0,23,37,48,53,55,55,56] +PREHOOK: query: CREATE TABLE test (a INT, b MAP) STORED AS ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test +POSTHOOK: query: CREATE TABLE test (a INT, b MAP) STORED AS ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test +PREHOOK: query: INSERT OVERWRITE TABLE test SELECT 1, MAP(1, "val_1", 2, "val_2") FROM src LIMIT 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@test +POSTHOOK: query: INSERT OVERWRITE TABLE test SELECT 1, MAP(1, "val_1", 2, "val_2") FROM src LIMIT 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@test +POSTHOOK: Lineage: test.a SIMPLE [] +POSTHOOK: Lineage: test.b EXPRESSION [] +PREHOOK: query: explain +select * from src join test where src.key=test.a +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from src join test where src.key=test.a +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: UDFToDouble(key) is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 UDFToDouble(key) (type: double) + 1 UDFToDouble(a) (type: double) + outputColumnNames: _col0, _col1, _col5, _col6 + input vertices: + 1 Map 2 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Filter Operator + predicate: (_col0 = _col5) (type: boolean) + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col5 (type: int), _col6 (type: map) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 137 Data size: 1455 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Map 2 + Map Operator Tree: + TableScan + alias: test + Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: UDFToDouble(a) is not null (type: boolean) + Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: UDFToDouble(a) (type: double) + sort order: + + Map-reduce partition columns: UDFToDouble(a) (type: double) + Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE + value expressions: a (type: int), b (type: map) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from src join test where src.key=test.a +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@test +#### A masked pattern was here #### +POSTHOOK: query: select * from src join test where src.key=test.a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@test +#### A masked pattern was here ####