diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 38380fb..ce888b7 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -524,6 +524,8 @@ spark.query.files=add_part_multiple.q, \ bucketmapjoin7.q, \ bucketmapjoin8.q, \ bucketmapjoin9.q, \ + bucketmapjoin10.q, \ + bucketmapjoin11.q, \ bucketmapjoin12.q, \ bucketmapjoin13.q, \ bucketmapjoin_negative.q, \ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java index 65bb1b7..2d2448d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java @@ -115,7 +115,6 @@ public MapJoinRowContainer copy() { public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) throws IOException, SerDeException { - clearRows(); long numRows = in.readLong(); for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { container.readFields(in); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java index eddbf18..2f9e55a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -135,7 +135,10 @@ public MapJoinPersistableTableContainer load( for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { MapJoinKeyObject key = new MapJoinKeyObject(); key.read(keyContext, in, keyContainer); - MapJoinEagerRowContainer values = new MapJoinEagerRowContainer(); + if (tableContainer.get(key) == null) { + tableContainer.put(key, new MapJoinEagerRowContainer()); + } + MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) tableContainer.get(key); values.read(valueContext, in, valueContainer); tableContainer.put(key, values); } diff --git ql/src/test/results/clientpositive/spark/bucketmapjoin10.q.out ql/src/test/results/clientpositive/spark/bucketmapjoin10.q.out index 4188ad8..6291ce2 100644 --- ql/src/test/results/clientpositive/spark/bucketmapjoin10.q.out +++ ql/src/test/results/clientpositive/spark/bucketmapjoin10.q.out @@ -192,15 +192,13 @@ TOK_QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 Stage-0 depends on stages: Stage-1 STAGE PLANS: - Stage: Stage-1 + Stage: Stage-3 Spark - Edges: - Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -213,13 +211,16 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 869 Data size: 3477 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int) - sort order: + - Map-reduce partition columns: key (type: int) - Statistics: Num rows: 869 Data size: 3477 Basic stats: COMPLETE Column stats: NONE - tag: 1 - auto parallelism: false + Spark HashTable Sink Operator + condition expressions: + 0 + 1 + keys: + 0 key (type: int) + 1 key (type: int) + Position of Big Table: 0 + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -320,7 +321,14 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_2/part=1 [b] /srcbucket_mapjoin_part_2/part=2 [b] - Map 4 + + Stage: Stage-1 + Spark + Edges: + Reducer 3 <- Map 2 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 2 Map Operator Tree: TableScan alias: a @@ -330,13 +338,34 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 869 Data size: 3477 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int) - sort order: + - Map-reduce partition columns: key (type: int) - Statistics: Num rows: 869 Data size: 3477 Basic stats: COMPLETE Column stats: NONE - tag: 0 - auto parallelism: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + keys: + 0 key (type: int) + 1 key (type: int) + input vertices: + 1 Map 1 + Position of Big Table: 0 + Statistics: Num rows: 955 Data size: 3824 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 955 Data size: 3824 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col0 (type: bigint) + auto parallelism: false + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -437,29 +466,6 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_1/part=1 [a] /srcbucket_mapjoin_part_1/part=2 [a] - Reducer 2 - Needs Tagging: true - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 - 1 - Statistics: Num rows: 955 Data size: 3824 Basic stats: COMPLETE Column stats: NONE - Select Operator - Statistics: Num rows: 955 Data size: 3824 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: count() - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - tag: -1 - value expressions: _col0 (type: bigint) - auto parallelism: false Reducer 3 Needs Tagging: false Reduce Operator Tree: diff --git ql/src/test/results/clientpositive/spark/bucketmapjoin11.q.out ql/src/test/results/clientpositive/spark/bucketmapjoin11.q.out index e4a98ba..19043e5 100644 --- ql/src/test/results/clientpositive/spark/bucketmapjoin11.q.out +++ ql/src/test/results/clientpositive/spark/bucketmapjoin11.q.out @@ -202,15 +202,13 @@ TOK_QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 Stage-0 depends on stages: Stage-1 STAGE PLANS: - Stage: Stage-1 + Stage: Stage-3 Spark - Edges: - Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -223,13 +221,16 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int) - sort order: + - Map-reduce partition columns: key (type: int) - Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - tag: 1 - auto parallelism: false + Spark HashTable Sink Operator + condition expressions: + 0 + 1 + keys: + 0 key (type: int) + 1 key (type: int) + Position of Big Table: 0 + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -330,7 +331,14 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_2/part=1 [b] /srcbucket_mapjoin_part_2/part=2 [b] - Map 4 + + Stage: Stage-1 + Spark + Edges: + Reducer 3 <- Map 2 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 2 Map Operator Tree: TableScan alias: a @@ -340,13 +348,34 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int) - sort order: + - Map-reduce partition columns: key (type: int) - Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - tag: 0 - auto parallelism: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + keys: + 0 key (type: int) + 1 key (type: int) + input vertices: + 1 Map 1 + Position of Big Table: 0 + Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col0 (type: bigint) + auto parallelism: false + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -447,29 +476,6 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_1/part=1 [a] /srcbucket_mapjoin_part_1/part=2 [a] - Reducer 2 - Needs Tagging: true - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 - 1 - Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE - Select Operator - Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: count() - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - tag: -1 - value expressions: _col0 (type: bigint) - auto parallelism: false Reducer 3 Needs Tagging: false Reduce Operator Tree: @@ -605,15 +611,13 @@ TOK_QUERY STAGE DEPENDENCIES: - Stage-1 is a root stage + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 Stage-0 depends on stages: Stage-1 STAGE PLANS: - Stage: Stage-1 + Stage: Stage-3 Spark - Edges: - Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -626,13 +630,16 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int), part (type: string) - sort order: ++ - Map-reduce partition columns: key (type: int), part (type: string) - Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - tag: 1 - auto parallelism: false + Spark HashTable Sink Operator + condition expressions: + 0 + 1 + keys: + 0 key (type: int), part (type: string) + 1 key (type: int), part (type: string) + Position of Big Table: 0 + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -733,7 +740,14 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_2/part=1 [b] /srcbucket_mapjoin_part_2/part=2 [b] - Map 4 + + Stage: Stage-1 + Spark + Edges: + Reducer 3 <- Map 2 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 2 Map Operator Tree: TableScan alias: a @@ -743,13 +757,34 @@ STAGE PLANS: isSamplingPred: false predicate: key is not null (type: boolean) Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: key (type: int), part (type: string) - sort order: ++ - Map-reduce partition columns: key (type: int), part (type: string) - Statistics: Num rows: 1070 Data size: 4281 Basic stats: COMPLETE Column stats: NONE - tag: 0 - auto parallelism: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + keys: + 0 key (type: int), part (type: string) + 1 key (type: int), part (type: string) + input vertices: + 1 Map 1 + Position of Big Table: 0 + Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + tag: -1 + value expressions: _col0 (type: bigint) + auto parallelism: false + Local Work: + Map Reduce Local Work Path -> Alias: #### A masked pattern was here #### Path -> Partition: @@ -850,29 +885,6 @@ STAGE PLANS: Truncated Path -> Alias: /srcbucket_mapjoin_part_1/part=1 [a] /srcbucket_mapjoin_part_1/part=2 [a] - Reducer 2 - Needs Tagging: true - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 - 1 - Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE - Select Operator - Statistics: Num rows: 1177 Data size: 4709 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: count() - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - tag: -1 - value expressions: _col0 (type: bigint) - auto parallelism: false Reducer 3 Needs Tagging: false Reduce Operator Tree: