Hive
  1. Hive
  2. HIVE-741

NULL is not handled correctly in join

    Details

      Description

      With the following data in table input4_cb:

      Key Value
      ------ --------
      NULL 325
      18 NULL

      The following query:

      select * from input4_cb a join input4_cb b on a.key = b.value;
      

      returns the following result:

      NULL 325 18 NULL

      The correct result should be empty set.

      When 'null' is replaced by '' it works.

      1. join_nulls.q.out
        27 kB
        Ning Zhang
      2. patch-741.txt
        7 kB
        Amareshwari Sriramadasu
      3. patch-741-1.txt
        27 kB
        Amareshwari Sriramadasu
      4. patch-741-2.txt
        32 kB
        Amareshwari Sriramadasu
      5. patch-741-3.txt
        39 kB
        Amareshwari Sriramadasu
      6. patch-741-4.txt
        40 kB
        Amareshwari Sriramadasu
      7. patch-741-5.txt
        40 kB
        Amareshwari Sriramadasu
      8. patch-741-6.txt
        42 kB
        Amareshwari Sriramadasu
      9. smbjoin_nulls.q.txt
        0.6 kB
        Amareshwari Sriramadasu

        Issue Links

          Activity

          Hide
          Ning Zhang added a comment -

          Committed. Thanks Amareshwari!

          Show
          Ning Zhang added a comment - Committed. Thanks Amareshwari!
          Hide
          Ning Zhang added a comment -

          +1. Will commit if tests pass.

          Show
          Ning Zhang added a comment - +1. Will commit if tests pass.
          Hide
          Amareshwari Sriramadasu added a comment -

          Patch with following changes:

          • Renamed the method hasNullElements to hasAllNulls
          • Added keyToHasNullsMap to SMBMapJoinOperator to cache whether key has nulls or not, which is populated when the key elements are computed.
          • Added appropriate "order by" clauses to smb join queries in the testcase
          Show
          Amareshwari Sriramadasu added a comment - Patch with following changes: Renamed the method hasNullElements to hasAllNulls Added keyToHasNullsMap to SMBMapJoinOperator to cache whether key has nulls or not, which is populated when the key elements are computed. Added appropriate "order by" clauses to smb join queries in the testcase
          Hide
          Ning Zhang added a comment -

          uploading test results join_null.q.out which has conflicts with the patch-741-5.txt.

          Show
          Ning Zhang added a comment - uploading test results join_null.q.out which has conflicts with the patch-741-5.txt.
          Hide
          Ning Zhang added a comment -

          Amareshwari, aside from Yongqiang comment, join_null.q's result is not deterministic – the SMB joins result in different orders. Can you make it deterministic by adding a 'order by' clause at the end of the queries? I'll attach my test run results

          Show
          Ning Zhang added a comment - Amareshwari, aside from Yongqiang comment, join_null.q's result is not deterministic – the SMB joins result in different orders. Can you make it deterministic by adding a 'order by' clause at the end of the queries? I'll attach my test run results
          Hide
          He Yongqiang added a comment -

          also about Ning's comments:
          >>2) SMBMapJoinOperator.compareKey() is called for each row so it is critical for performance. In your code the hasNullElement() could be called 4 times in the worse case. If you cache the result it can be called only twice.
          Agree. Not sure how much overhead is there, will try to estimate the overhead over production running. That will be great if you can try to cache the null check results, so that it can only happen one time for each key.

          Show
          He Yongqiang added a comment - also about Ning's comments: >>2) SMBMapJoinOperator.compareKey() is called for each row so it is critical for performance. In your code the hasNullElement() could be called 4 times in the worse case. If you cache the result it can be called only twice. Agree. Not sure how much overhead is there, will try to estimate the overhead over production running. That will be great if you can try to cache the null check results, so that it can only happen one time for each key.
          Hide
          He Yongqiang added a comment -

          +1. The patch looks good to me.
          (Only have one minor comment on the name of "hasNullElements", should we rename it since this function is used to determine all keys are null?)

          Show
          He Yongqiang added a comment - +1. The patch looks good to me. (Only have one minor comment on the name of "hasNullElements", should we rename it since this function is used to determine all keys are null?)
          Hide
          Amareshwari Sriramadasu added a comment -

          Updated the patch. Thanks Ning for your help.

          Show
          Amareshwari Sriramadasu added a comment - Updated the patch. Thanks Ning for your help.
          Hide
          Ning Zhang added a comment -

          Looks good except one mintor thing: SerDeUtils.java:369 should return true? Amareshwari, can you upload a new patch and I'll run unit tests.

          Yongqiang, can you test this patch on the production SMB join queries?

          Show
          Ning Zhang added a comment - Looks good except one mintor thing: SerDeUtils.java:369 should return true? Amareshwari, can you upload a new patch and I'll run unit tests. Yongqiang, can you test this patch on the production SMB join queries?
          Hide
          Amareshwari Sriramadasu added a comment -

          Updated smb input with two files.

          Show
          Amareshwari Sriramadasu added a comment - Updated smb input with two files.
          Hide
          Ning Zhang added a comment -

          The SMB test case still has a minor issue: the tables was created as 2 buckets but there is only 1 file in the tables. This is conflicting to the table schema. If a table is defined as bucketd 2, there should be 2 files in the partition or table. They SMB join takes the 1st file in T1 join the 1st file in T2, and 2nd file in T1 join 2nd file in T2. So the test case should cover this use case.

          Show
          Ning Zhang added a comment - The SMB test case still has a minor issue: the tables was created as 2 buckets but there is only 1 file in the tables. This is conflicting to the table schema. If a table is defined as bucketd 2, there should be 2 files in the partition or table. They SMB join takes the 1st file in T1 join the 1st file in T2, and 2nd file in T1 join 2nd file in T2. So the test case should cover this use case.
          Hide
          Amareshwari Sriramadasu added a comment -

          Thanks Ning for the comments.

          Patch incorporates the review comments. Looked at smb_mapjoin* query files and updated smb join queries.

          Show
          Amareshwari Sriramadasu added a comment - Thanks Ning for the comments. Patch incorporates the review comments. Looked at smb_mapjoin* query files and updated smb join queries.
          Hide
          Ning Zhang added a comment -

          Looks good in general. Some minor comments:
          1) add more test cases for SMB joins. Currently the only test case has only 1 bucket which does not cover the most common use case. Can you add more test cases for more buckets? You can take a look at bucketed join queries included in the client positive tests.
          2) SMBMapJoinOperator.compareKey() is called for each row so it is critical for performance. In your code the hasNullElement() could be called 4 times in the worse case. If you cache the result it can be called only twice.

          Yongqiang, any further comments?

          Show
          Ning Zhang added a comment - Looks good in general. Some minor comments: 1) add more test cases for SMB joins. Currently the only test case has only 1 bucket which does not cover the most common use case. Can you add more test cases for more buckets? You can take a look at bucketed join queries included in the client positive tests. 2) SMBMapJoinOperator.compareKey() is called for each row so it is critical for performance. In your code the hasNullElement() could be called 4 times in the worse case. If you cache the result it can be called only twice. Yongqiang, any further comments?
          Hide
          Amareshwari Sriramadasu added a comment -

          All the tests passed with latest patch : patch-741-2.txt

          Show
          Amareshwari Sriramadasu added a comment - All the tests passed with latest patch : patch-741-2.txt
          Hide
          Amareshwari Sriramadasu added a comment -

          Patch fixes SMBMapJoinOperator also. I modified compareKeys(ArrayList<Object> k1, ArrayList<Object> k2) to do the following:

              if (hasNullElements(k1) && hasNullElements(k2)) {
                return -1; // just return k1 is smaller than k2
              } else if (hasNullElements(k1)) {
                return (0 - k2.size());
              } else if (hasNullElements(k2)) {
                return k1.size();
              }
             ... //the existing code.
          

          Does the above make sense?

          Updated the testcase with smb join queries.

          When I'm running smb join on my local machine (pseudo distributed mode), I'm getting different results. I think that is mostly because of HIVE-1561. Will update the issue with my findings.

          Show
          Amareshwari Sriramadasu added a comment - Patch fixes SMBMapJoinOperator also. I modified compareKeys(ArrayList<Object> k1, ArrayList<Object> k2) to do the following: if (hasNullElements(k1) && hasNullElements(k2)) { return -1; // just return k1 is smaller than k2 } else if (hasNullElements(k1)) { return (0 - k2.size()); } else if (hasNullElements(k2)) { return k1.size(); } ... //the existing code. Does the above make sense? Updated the testcase with smb join queries. When I'm running smb join on my local machine (pseudo distributed mode), I'm getting different results. I think that is mostly because of HIVE-1561 . Will update the issue with my findings.
          Hide
          Amareshwari Sriramadasu added a comment -

          Submitting patch for the review of patch-741-1.txt

          Show
          Amareshwari Sriramadasu added a comment - Submitting patch for the review of patch-741-1.txt
          Hide
          Amareshwari Sriramadasu added a comment -

          Attaching patch that fixes the bugs in earlier patch, that Ning has found. Also adds more testcases.

          Can you also add one or few tests for sort merge join?

          Attached file smbjoin_nulls.q.txt has tests for sort merge join. But it fails with NPE as mentioned as earlier. I tried to fix the NPE, but could not come up with a fix. Shall I do it on followup jira?

          For inner, left and right outer joins, a simpler fix would be to add a filter on top.

          I think this can be done as part of HIVE-1544 as an improvement.

          @Amareshwari, sorry the syntax was wrong for the 3 table joins.

          Ning, Hive was not complaining about the syntax. So, included this also in the testcase. The results are fine with the latest patch.

          Show
          Amareshwari Sriramadasu added a comment - Attaching patch that fixes the bugs in earlier patch, that Ning has found. Also adds more testcases. Can you also add one or few tests for sort merge join? Attached file smbjoin_nulls.q.txt has tests for sort merge join. But it fails with NPE as mentioned as earlier. I tried to fix the NPE, but could not come up with a fix. Shall I do it on followup jira? For inner, left and right outer joins, a simpler fix would be to add a filter on top. I think this can be done as part of HIVE-1544 as an improvement. @Amareshwari, sorry the syntax was wrong for the 3 table joins. Ning, Hive was not complaining about the syntax. So, included this also in the testcase. The results are fine with the latest patch.
          Hide
          Ning Zhang added a comment -

          @Amareshwari, sorry the syntax was wrong for the 3 table joins. Below is the correct syntax and plan.

          explain select * from src a left outer join src b on (a.value=b.value) right outer join src c on (b.value=c.value);
          OK
          ABSTRACT SYNTAX TREE:
            (TOK_QUERY (TOK_FROM (TOK_RIGHTOUTERJOIN (TOK_LEFTOUTERJOIN (TOK_TABREF src a) (TOK_TABREF src b) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))) (TOK_TABREF src c) (= (. (TOK_TABLE_OR_COL b) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
          
          STAGE DEPENDENCIES:
            Stage-1 is a root stage
            Stage-0 is a root stage
          
          STAGE PLANS:
            Stage: Stage-1
              Map Reduce
                Alias -> Map Operator Tree:
                  a 
                    TableScan
                      alias: a
                      Reduce Output Operator
                        key expressions:
                              expr: value
                              type: string
                        sort order: +
                        Map-reduce partition columns:
                              expr: value
                              type: string
                        tag: 0
                        value expressions:
                              expr: key
                              type: string
                              expr: value
                              type: string
                  b 
                    TableScan
                      alias: b
                      Reduce Output Operator
                        key expressions:
                              expr: value
                              type: string
                        sort order: +
                        Map-reduce partition columns:
                              expr: value
                              type: string
                        tag: 1
                        value expressions:
                              expr: key
                              type: string
                              expr: value
                              type: string
                  c 
                    TableScan
                      alias: c
                      Reduce Output Operator
                        key expressions:
                              expr: value
                              type: string
                        sort order: +
                        Map-reduce partition columns:
                              expr: value
                              type: string
                        tag: 2
                        value expressions:
                              expr: key
                              type: string
                              expr: value
                              type: string
                Reduce Operator Tree:
                  Join Operator
                    condition map:
                         Left Outer Join0 to 1
                         Right Outer Join1 to 2
                    condition expressions:
                      0 {VALUE._col0} {VALUE._col1}
                      1 {VALUE._col0} {VALUE._col1}
                      2 {VALUE._col0} {VALUE._col1}
                    handleSkewJoin: false
                    outputColumnNames: _col0, _col1, _col4, _col5, _col8, _col9
                    Select Operator
                      expressions:
                            expr: _col0
                            type: string
                            expr: _col1
                            type: string
                            expr: _col4
                            type: string
                            expr: _col5
                            type: string
                            expr: _col8
                            type: string
                            expr: _col9
                            type: string
                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
                      File Output Operator
                        compressed: true
                        GlobalTableId: 0
                        table:
                            input format: org.apache.hadoop.mapred.TextInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          
            Stage: Stage-0
              Fetch Operator
                limit: -1
          
          Show
          Ning Zhang added a comment - @Amareshwari, sorry the syntax was wrong for the 3 table joins. Below is the correct syntax and plan. explain select * from src a left outer join src b on (a.value=b.value) right outer join src c on (b.value=c.value); OK ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_RIGHTOUTERJOIN (TOK_LEFTOUTERJOIN (TOK_TABREF src a) (TOK_TABREF src b) (= (. (TOK_TABLE_OR_COL a) value) (. (TOK_TABLE_OR_COL b) value))) (TOK_TABREF src c) (= (. (TOK_TABLE_OR_COL b) value) (. (TOK_TABLE_OR_COL c) value)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a TableScan alias: a Reduce Output Operator key expressions: expr: value type: string sort order: + Map-reduce partition columns: expr: value type: string tag: 0 value expressions: expr: key type: string expr: value type: string b TableScan alias: b Reduce Output Operator key expressions: expr: value type: string sort order: + Map-reduce partition columns: expr: value type: string tag: 1 value expressions: expr: key type: string expr: value type: string c TableScan alias: c Reduce Output Operator key expressions: expr: value type: string sort order: + Map-reduce partition columns: expr: value type: string tag: 2 value expressions: expr: key type: string expr: value type: string Reduce Operator Tree: Join Operator condition map: Left Outer Join0 to 1 Right Outer Join1 to 2 condition expressions: 0 {VALUE._col0} {VALUE._col1} 1 {VALUE._col0} {VALUE._col1} 2 {VALUE._col0} {VALUE._col1} handleSkewJoin: false outputColumnNames: _col0, _col1, _col4, _col5, _col8, _col9 Select Operator expressions: expr: _col0 type: string expr: _col1 type: string expr: _col4 type: string expr: _col5 type: string expr: _col8 type: string expr: _col9 type: string outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 File Output Operator compressed: true GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Stage: Stage-0 Fetch Operator limit: -1
          Hide
          Amareshwari Sriramadasu added a comment -

          Thanks Ning for your comments.

          select * FROM myinput1 a left outer JOIN myinput1 b ON a.value = b.value;

          select * FROM myinput1 a right outer JOIN myinput1 b ON a.value = b.value;

          This is happening because I'm assuming nr.get(0) in JoinOperator is the join-key. It seems it not always true that key is the first element in the ArrayList. When I modified a the code to the following, above queries are giving correct results.

                StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
                StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
                    .toString());
                Object keyObject = soi.getStructFieldData(row, sf);
                if (SerDeUtils.isNullObject(keyObject, soi)) {
                  endGroup();
                  startGroup();
               }
          

          Added method SerDeUtils.isNullObject(keyObject, soi) to know if the object passed is representing a NULL object.

          select * FROM myinput1 a left outer JOIN myinput1 b right outer join myinput1 c ON a.value = b.value and b.value = c.value;

          Looking at Stage-1 of "explain" for the above query:

          Stage: Stage-1
              Map Reduce
                Alias -> Map Operator Tree:
                  a
                    TableScan
                      alias: a
                      Reduce Output Operator
                        sort order:
                        tag: 0
                        value expressions:
                              expr: key
                              type: int
                              expr: value
                              type: int
                  b
                    TableScan
                      alias: b
                      Reduce Output Operator
                        sort order:
                        tag: 1
                        value expressions:
                              expr: key
                              type: int
                              expr: value
                              type: int
                Reduce Operator Tree:
                  Join Operator
                    condition map:
                         Left Outer Join0 to 1
                    condition expressions:
                      0 {VALUE._col0} {VALUE._col1}
                      1 {VALUE._col0} {VALUE._col1}
                    handleSkewJoin: false
                    outputColumnNames: _col0, _col1, _col4, _col5
                    Filter Operator
                      predicate:
                          expr: (_col1 = _col5)
                          type: boolean
                      File Output Operator
                        compressed: false
                        GlobalTableId: 0
                        table:
                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
          

          Join happens without join key? Here, join output is the Cartesian product of a and b on which FilterOperator is applied, Am I right? I see the semantics of inner/outer join on two tables without join condition is to produce Cartesian product. As a side note: "MySql does not allow outer joins without join condition".
          If Join is allowed without join condition to produce Cartesian product of the two tables, then my patch should be changed to consider if join-key is defined for the join or not. I could reproduce it by simple query "select * FROM myinput1 a JOIN myinput1 b". I think the same applies to MapJoin as well.

          Verified that SMBMapJoinOperator already filters nulls properly.

          Can you also add one or few tests for sort merge join?

          It seems my verification was wrong here, I thought if the table is sorted and hive.optimize.bucketmapjoin, hive.optimize.bucketmapjoin.sortedmerge are set to true, MapJoin uses SMBMapJoinOperator. But it was using MapJoinOperator it self. When I created a table with "sorted by" column, I see it using SMBMapJoinOperator. Currently if there are any nulls in the input table, SMBJoin fails with NullPointerException:

          Caused by: java.lang.NullPointerException
          	at org.apache.hadoop.io.IntWritable.compareTo(IntWritable.java:60)
          	at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:115)
          	at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.compareKeys(SMBMapJoinOperator.java:389)
          	at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.processKey(SMBMapJoinOperator.java:438)
          	at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.processOp(SMBMapJoinOperator.java:205)
          	at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:458)
          	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:698)
          	at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:45)
          	at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:458)
          	at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.fetchOneRow(SMBMapJoinOperator.java:479)
          	... 17 more
          

          Will look into this.

          For inner, left and right outer joins, a simpler fix would be to add a filter on top.

          Now, I agree it would be simpler . Will consider this also and see if i can do some special handling for full outer joins.

          Show
          Amareshwari Sriramadasu added a comment - Thanks Ning for your comments. select * FROM myinput1 a left outer JOIN myinput1 b ON a.value = b.value; select * FROM myinput1 a right outer JOIN myinput1 b ON a.value = b.value; This is happening because I'm assuming nr.get(0) in JoinOperator is the join-key. It seems it not always true that key is the first element in the ArrayList. When I modified a the code to the following, above queries are giving correct results. StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY .toString()); Object keyObject = soi.getStructFieldData(row, sf); if (SerDeUtils.isNullObject(keyObject, soi)) { endGroup(); startGroup(); } Added method SerDeUtils.isNullObject(keyObject, soi) to know if the object passed is representing a NULL object. select * FROM myinput1 a left outer JOIN myinput1 b right outer join myinput1 c ON a.value = b.value and b.value = c.value; Looking at Stage-1 of "explain" for the above query: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: a TableScan alias: a Reduce Output Operator sort order: tag: 0 value expressions: expr: key type: int expr: value type: int b TableScan alias: b Reduce Output Operator sort order: tag: 1 value expressions: expr: key type: int expr: value type: int Reduce Operator Tree: Join Operator condition map: Left Outer Join0 to 1 condition expressions: 0 {VALUE._col0} {VALUE._col1} 1 {VALUE._col0} {VALUE._col1} handleSkewJoin: false outputColumnNames: _col0, _col1, _col4, _col5 Filter Operator predicate: expr: (_col1 = _col5) type: boolean File Output Operator compressed: false GlobalTableId: 0 table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Join happens without join key? Here, join output is the Cartesian product of a and b on which FilterOperator is applied, Am I right? I see the semantics of inner/outer join on two tables without join condition is to produce Cartesian product. As a side note: "MySql does not allow outer joins without join condition". If Join is allowed without join condition to produce Cartesian product of the two tables, then my patch should be changed to consider if join-key is defined for the join or not. I could reproduce it by simple query "select * FROM myinput1 a JOIN myinput1 b". I think the same applies to MapJoin as well. Verified that SMBMapJoinOperator already filters nulls properly. Can you also add one or few tests for sort merge join? It seems my verification was wrong here, I thought if the table is sorted and hive.optimize.bucketmapjoin, hive.optimize.bucketmapjoin.sortedmerge are set to true, MapJoin uses SMBMapJoinOperator. But it was using MapJoinOperator it self. When I created a table with "sorted by" column, I see it using SMBMapJoinOperator. Currently if there are any nulls in the input table, SMBJoin fails with NullPointerException: Caused by: java.lang.NullPointerException at org.apache.hadoop.io.IntWritable.compareTo(IntWritable.java:60) at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:115) at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.compareKeys(SMBMapJoinOperator.java:389) at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.processKey(SMBMapJoinOperator.java:438) at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.processOp(SMBMapJoinOperator.java:205) at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:458) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:698) at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:45) at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:458) at org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator.fetchOneRow(SMBMapJoinOperator.java:479) ... 17 more Will look into this. For inner, left and right outer joins, a simpler fix would be to add a filter on top. Now, I agree it would be simpler . Will consider this also and see if i can do some special handling for full outer joins.
          Hide
          Namit Jain added a comment -

          For inner, left and right outer joins, a simpler fix would be to add a filter on top.

          For example, for

          A join B on A.c1 = B.c2

          add a filter before A (A.c1 is not null) and B (B.c2 is not null)

          For

          A left outer join B on A.c1 = B.c2, the filter before A is not needed and
          similarly, for
          A right outer join B on A.c1 = B.c2, the filter before B is not needed

          Some special handling might still be needed for full outer joins and sort-merge joins

          Show
          Namit Jain added a comment - For inner, left and right outer joins, a simpler fix would be to add a filter on top. For example, for A join B on A.c1 = B.c2 add a filter before A (A.c1 is not null) and B (B.c2 is not null) For A left outer join B on A.c1 = B.c2, the filter before A is not needed and similarly, for A right outer join B on A.c1 = B.c2, the filter before B is not needed Some special handling might still be needed for full outer joins and sort-merge joins
          Hide
          Ning Zhang added a comment -

          @Amareshwari, aside from adding new test cases for sort merge join, this patch also has some bugs.

          For example in your test data:

          hive> select * from myinput1
          NULL    356
          484 NULL
          10  10
          
          -- incorrect result below
          hive> select * FROM myinput1 a left outer JOIN myinput1 b ON a.value = b.value; 
          484 NULL    484 NULL
          10  10  10  10
          NULL    356 NULL    NULL
          
          hive> select * FROM myinput1 a right outer JOIN myinput1 b ON a.value = b.value;
          484 NULL    484 NULL
          10  10  10  10
          NULL    NULL    NULL    356
          
          hive> select * FROM myinput1 a left outer JOIN myinput1 b right outer join myinput1 c ON a.value = b.value and b.value = c.value;
          NULL    NULL    NULL    NULL    484 NULL
          NULL    NULL    NULL    NULL    10  10
          NULL    NULL    NULL    NULL    NULL    356
          

          Can you take a look? I'm not sure whether ending a group and starting a new group for each null-keyed row works for all cases particularly in joins involving more than 2 tables and mixture of left and right outer joins.

          Show
          Ning Zhang added a comment - @Amareshwari, aside from adding new test cases for sort merge join, this patch also has some bugs. For example in your test data: hive> select * from myinput1 NULL 356 484 NULL 10 10 -- incorrect result below hive> select * FROM myinput1 a left outer JOIN myinput1 b ON a.value = b.value; 484 NULL 484 NULL 10 10 10 10 NULL 356 NULL NULL hive> select * FROM myinput1 a right outer JOIN myinput1 b ON a.value = b.value; 484 NULL 484 NULL 10 10 10 10 NULL NULL NULL 356 hive> select * FROM myinput1 a left outer JOIN myinput1 b right outer join myinput1 c ON a.value = b.value and b.value = c.value; NULL NULL NULL NULL 484 NULL NULL NULL NULL NULL 10 10 NULL NULL NULL NULL NULL 356 Can you take a look? I'm not sure whether ending a group and starting a new group for each null-keyed row works for all cases particularly in joins involving more than 2 tables and mixture of left and right outer joins.
          Hide
          He Yongqiang added a comment -

          the change looks good to me. Can you also add one or few tests for sort merge join?

          Show
          He Yongqiang added a comment - the change looks good to me. Can you also add one or few tests for sort merge join?
          Hide
          Amareshwari Sriramadasu added a comment -

          Attaching a simple patch which fixes JoinOperator (for reduce-side joins) and MapJoinOperator (for map-side joins) to not group the null join keys together.
          Any comments on the approach/code are welcome.
          Verified that SMBMapJoinOperator already filters nulls properly.

          Show
          Amareshwari Sriramadasu added a comment - Attaching a simple patch which fixes JoinOperator (for reduce-side joins) and MapJoinOperator (for map-side joins) to not group the null join keys together. Any comments on the approach/code are welcome. Verified that SMBMapJoinOperator already filters nulls properly.
          Hide
          Ted Xu added a comment -

          a left outer join preserves rows on the left side regardless of whether the ON clause evaluates true

          That's right, thanks.

          Show
          Ted Xu added a comment - a left outer join preserves rows on the left side regardless of whether the ON clause evaluates true That's right, thanks.
          Hide
          Ning Zhang added a comment -

          @Amareshwar, currently we already distinguish different join types with different functions (take a look at CommonJoinOperator.joinObjects()). I look forward to seeing your proposal to avoid grouping null-keyed rows.

          @Ted, I agree with Amareshwar and John that we cannot avoid rows (or the value part of the key-value pairs) with null as a key. However you have a point in that if we know the join operator does not involve outer join at all (we already have a flag noOuterJoin in JoinDesc), then we could avoid sending rows will null keys from the mappers to the reducers. This will save bandwidth as well as processing time. Could you open another JIRA and be able to submit a patch?

          Show
          Ning Zhang added a comment - @Amareshwar, currently we already distinguish different join types with different functions (take a look at CommonJoinOperator.joinObjects()). I look forward to seeing your proposal to avoid grouping null-keyed rows. @Ted, I agree with Amareshwar and John that we cannot avoid rows (or the value part of the key-value pairs) with null as a key. However you have a point in that if we know the join operator does not involve outer join at all (we already have a flag noOuterJoin in JoinDesc), then we could avoid sending rows will null keys from the mappers to the reducers. This will save bandwidth as well as processing time. Could you open another JIRA and be able to submit a patch?
          Hide
          John Sichi added a comment -

          @Ted: as Amareshwari mentioned, a left outer join preserves rows on the left side regardless of whether the ON clause evaluates true. So in that case (and similar cases for right/full outer join), we can't filter out the rows with null join keys.

          Show
          John Sichi added a comment - @Ted: as Amareshwari mentioned, a left outer join preserves rows on the left side regardless of whether the ON clause evaluates true. So in that case (and similar cases for right/full outer join), we can't filter out the rows with null join keys.
          Hide
          Ted Xu added a comment -

          Sorry I'm mistakenly expressed my idea, I mean values with NULL join keys shall be filtered out in mappers.

          I think values with NULL join keys shall be filtered out because NULL equals nothing, and Hive only support equal join.

          Please correct me if I'm wrong.

          Show
          Ted Xu added a comment - Sorry I'm mistakenly expressed my idea, I mean values with NULL join keys shall be filtered out in mappers. I think values with NULL join keys shall be filtered out because NULL equals nothing, and Hive only support equal join. Please correct me if I'm wrong.
          Hide
          Amareshwari Sriramadasu added a comment -

          but it will be great if we can filter the NULL values in mappers.

          Ted, we should not filter out the NULL values in mappers. Because for outer joins, these rows should be cartesian producted with nulls as shown in expected output in the comment

          Show
          Amareshwari Sriramadasu added a comment - but it will be great if we can filter the NULL values in mappers. Ted, we should not filter out the NULL values in mappers. Because for outer joins, these rows should be cartesian producted with nulls as shown in expected output in the comment
          Hide
          Amareshwari Sriramadasu added a comment -

          Thanks Ning for the details.

          To summarize the implementation of join:

          • In reduce-side join, rows with same join keys are grouped together; and in MapSide join, rows with same join keys are added the same entry in the hash table.
          • CommonJoinOperator.checkAndGenObject: The rows with same join key are cartesian producted with each other(i.e. with rows of different aliases). If there are no rows in one table alias, the rows of other table alias are ignored (for inner join) or cartesian producted with nulls (outer joins).

          The above implementation works fine except for null join keys ; Since these rows are grouped together/hashed to same entry, the current issue exists.

          I think the fix would be to check the NULL value of the join keys and do proper output based on the semantics of different types of joins.

          This would need special handling for each type of join (inner, left outer, right outer, full outer an etc.). So, I'm thinking the better solution is not group rows with null join keys together. Then the above join algorithm works correctly for all types of joins.

          Currently they are grouped together because HiveKey.compare compares the bytes of the key (in case of reduce-side join) and MapJoinObjectKey.equals returns true if both keys are null (in case of map-side join). I'm trying to see if can come up with a solution which does not group rows with null join keys together. Please correct me if am wrong.

          Show
          Amareshwari Sriramadasu added a comment - Thanks Ning for the details. To summarize the implementation of join: In reduce-side join, rows with same join keys are grouped together; and in MapSide join, rows with same join keys are added the same entry in the hash table. CommonJoinOperator.checkAndGenObject: The rows with same join key are cartesian producted with each other(i.e. with rows of different aliases). If there are no rows in one table alias, the rows of other table alias are ignored (for inner join) or cartesian producted with nulls (outer joins). The above implementation works fine except for null join keys ; Since these rows are grouped together/hashed to same entry, the current issue exists. I think the fix would be to check the NULL value of the join keys and do proper output based on the semantics of different types of joins. This would need special handling for each type of join (inner, left outer, right outer, full outer an etc.). So, I'm thinking the better solution is not group rows with null join keys together. Then the above join algorithm works correctly for all types of joins. Currently they are grouped together because HiveKey.compare compares the bytes of the key (in case of reduce-side join) and MapJoinObjectKey.equals returns true if both keys are null (in case of map-side join). I'm trying to see if can come up with a solution which does not group rows with null join keys together. Please correct me if am wrong.
          Hide
          Ted Xu added a comment -

          Thanks for Ning and Amareshwari, we are looking forward to see the bug fixed. I think it's Okay to solve it by modifying the *JoinOperators, but it will be great if we can filter the NULL values in mappers, say, in ReduceSinkOperator, provided if we can know which part of the reduce sink key is from join (other than from group by, distinct, etc,.).

          Show
          Ted Xu added a comment - Thanks for Ning and Amareshwari, we are looking forward to see the bug fixed. I think it's Okay to solve it by modifying the *JoinOperators, but it will be great if we can filter the NULL values in mappers, say, in ReduceSinkOperator, provided if we can know which part of the reduce sink key is from join (other than from group by, distinct, etc,.).
          Hide
          Ning Zhang added a comment -

          The joins are implemented in the JoinOperator and CommonJoinOperators for regular reduce-side joins. The map-side joins are implemented in the MapJoinOperator.

          In the reduce side joins, the join keys are treated as distribution keys from the mappers to the reducers so that each group (marked by beginGroup() and endGroup()) will consists of rows with the same join keys. The reduce-side joins will cache all rows within a group except the last one (aka streaming table), which is scanned and cartesian producted with the cached rows of the other tables. I think the fix would be to check the NULL value of the join keys and do proper output based on the semantics of different types of joins.

          For the map-side join, it's basically a hash join where the small table is read in entirety in a hash table and probed while scanning the streaming table.

          There are other types of joins (bucketed map-side join, sort merge join etc.), but they all rely on the 3 classes mentioned above.

          Let me know if you have further questions for you to get started.

          Show
          Ning Zhang added a comment - The joins are implemented in the JoinOperator and CommonJoinOperators for regular reduce-side joins. The map-side joins are implemented in the MapJoinOperator. In the reduce side joins, the join keys are treated as distribution keys from the mappers to the reducers so that each group (marked by beginGroup() and endGroup()) will consists of rows with the same join keys. The reduce-side joins will cache all rows within a group except the last one (aka streaming table), which is scanned and cartesian producted with the cached rows of the other tables. I think the fix would be to check the NULL value of the join keys and do proper output based on the semantics of different types of joins. For the map-side join, it's basically a hash join where the small table is read in entirety in a hash table and probed while scanning the streaming table. There are other types of joins (bucketed map-side join, sort merge join etc.), but they all rely on the 3 classes mentioned above. Let me know if you have further questions for you to get started.
          Hide
          Ning Zhang added a comment -

          I'm not actively working on it. Please feel free to take it.

          Show
          Ning Zhang added a comment - I'm not actively working on it. Please feel free to take it.
          Hide
          Amareshwari Sriramadasu added a comment -

          By adding logs to ExecReducer, I see that the input to reduce is the following:

          {"key":{"joinkey0":null},"value":{"_col0":null,"_col1":35},"alias":0}
          {"key":{"joinkey0":null},"value":{"_col0":12,"_col1":null},"alias":1}
          {"key":{"joinkey0":10},"value":{"_col0":10,"_col1":1000},"alias":0}
          {"key":{"joinkey0":10},"value":{"_col0":10,"_col1":100},"alias":0}
          {"key":{"joinkey0":12},"value":{"_col0":12,"_col1":null},"alias":0}
          {"key":{"joinkey0":35},"value":{"_col0":null,"_col1":35},"alias":1}
          {"key":{"joinkey0":100},"value":{"_col0":100,"_col1":100},"alias":0}
          {"key":{"joinkey0":100},"value":{"_col0":10,"_col1":100},"alias":1}
          {"key":{"joinkey0":100},"value":{"_col0":100,"_col1":100},"alias":1}
          {"key":{"joinkey0":1000},"value":{"_col0":10,"_col1":1000},"alias":1}
          

          And joinkey with null values are processed under same group, I think they should be processed in different groups, because comparison between nulls is not defined.

          Show
          Amareshwari Sriramadasu added a comment - By adding logs to ExecReducer, I see that the input to reduce is the following: {"key":{"joinkey0":null},"value":{"_col0":null,"_col1":35},"alias":0} {"key":{"joinkey0":null},"value":{"_col0":12,"_col1":null},"alias":1} {"key":{"joinkey0":10},"value":{"_col0":10,"_col1":1000},"alias":0} {"key":{"joinkey0":10},"value":{"_col0":10,"_col1":100},"alias":0} {"key":{"joinkey0":12},"value":{"_col0":12,"_col1":null},"alias":0} {"key":{"joinkey0":35},"value":{"_col0":null,"_col1":35},"alias":1} {"key":{"joinkey0":100},"value":{"_col0":100,"_col1":100},"alias":0} {"key":{"joinkey0":100},"value":{"_col0":10,"_col1":100},"alias":1} {"key":{"joinkey0":100},"value":{"_col0":100,"_col1":100},"alias":1} {"key":{"joinkey0":1000},"value":{"_col0":10,"_col1":1000},"alias":1} And joinkey with null values are processed under same group, I think they should be processed in different groups, because comparison between nulls is not defined.
          Hide
          Amareshwari Sriramadasu added a comment -

          When 'null' is replaced by '' it works.

          I see the same result even if 'null' is replaced with ''.

          To reproduce the above, I created a table input1 with

          hive> create table input1 (key int, value int);
          

          Loaded the following input using Load data command

          in.txt
          ^A35
          12^A
          10^A1000
          10^A100
          100^A100
          

          I see the following output for join queries executed.

          hive> select * from input1 a join input1 b on a.key=b.value;
          
          Output:
          NULL    35      12      NULL
          100     100     10      100
          100     100     100     100
          
          Expected Output:
          100     100     10      100
          100     100     100     100
          
          hive> select * from input1 a left outer join input1 b on a.key=b.value;
          
          OUTPUT:
          NULL    35      12      NULL
          10      1000    NULL    NULL
          10      100     NULL    NULL
          12      NULL    NULL    NULL
          100     100     10      100
          100     100     100     100
          
          Expected Output:
          NULL    35      NULL    NULL
          10      1000    NULL    NULL
          10      100     NULL    NULL
          12      NULL    NULL    NULL
          100     100     10      100
          100     100     100     100
          
          hive> select * from input1 a right outer join input1 b on a.key=b.value;
          
          OUTPUT:
          NULL    35      12      NULL
          NULL    NULL    NULL    35
          100     100     10      100
          100     100     100     100
          NULL    NULL    10      1000
          
          ExpectedOutput:
          NULL    NULL    NULL    35
          NULL    NULL    12      NULL
          100     100     10      100
          100     100     100     100
          NULL    NULL    10      1000
          
          

          Expected output is obtained from mysql db for a similar query.

          Ning, if you are not working on the fix for this, I would like to contribute. Would need your help understanding join code also, as I'm a new to hive.

          Show
          Amareshwari Sriramadasu added a comment - When 'null' is replaced by '' it works. I see the same result even if 'null' is replaced with ''. To reproduce the above, I created a table input1 with hive> create table input1 (key int , value int ); Loaded the following input using Load data command in.txt ^A35 12^A 10^A1000 10^A100 100^A100 I see the following output for join queries executed. hive> select * from input1 a join input1 b on a.key=b.value; Output: NULL 35 12 NULL 100 100 10 100 100 100 100 100 Expected Output: 100 100 10 100 100 100 100 100 hive> select * from input1 a left outer join input1 b on a.key=b.value; OUTPUT: NULL 35 12 NULL 10 1000 NULL NULL 10 100 NULL NULL 12 NULL NULL NULL 100 100 10 100 100 100 100 100 Expected Output: NULL 35 NULL NULL 10 1000 NULL NULL 10 100 NULL NULL 12 NULL NULL NULL 100 100 10 100 100 100 100 100 hive> select * from input1 a right outer join input1 b on a.key=b.value; OUTPUT: NULL 35 12 NULL NULL NULL NULL 35 100 100 10 100 100 100 100 100 NULL NULL 10 1000 ExpectedOutput: NULL NULL NULL 35 NULL NULL 12 NULL 100 100 10 100 100 100 100 100 NULL NULL 10 1000 Expected output is obtained from mysql db for a similar query. Ning, if you are not working on the fix for this, I would like to contribute. Would need your help understanding join code also, as I'm a new to hive.

            People

            • Assignee:
              Amareshwari Sriramadasu
              Reporter:
              Ning Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development