Hive
  1. Hive
  2. HIVE-790

race condition related to ScriptOperator + UnionOperator

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.4.0
    • Component/s: Query Processor
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      ScriptOperator uses a second thread to output the rows to the children operators. In a corner case which contains a union, 2 threads might be outputting data into the same operator hierarchy and caused race conditions.

      CREATE TABLE tablea (cola STRING);
      SELECT *
      FROM (
          SELECT TRANSFORM(cola)
          USING 'cat'
          AS cola
          FROM tablea
        UNION ALL
          SELECT cola as cola
          FROM tablea
      ) a;
      
      1. Hive_790_0.4.0.patch
        18 kB
        Ning Zhang
      2. Hive-790_4.patch
        19 kB
        Ning Zhang
      3. Hive-790_3.patch
        12 kB
        Ning Zhang
      4. Hive-790_2.patch
        12 kB
        Ning Zhang
      5. Hive-790.patch
        11 kB
        Ning Zhang

        Activity

        Hide
        He Yongqiang added a comment -

        This query can not be compiled with the truck code.

        Show
        He Yongqiang added a comment - This query can not be compiled with the truck code.
        Hide
        Zheng Shao added a comment -

        updated the query.

        Show
        Zheng Shao added a comment - updated the query.
        Hide
        Ning Zhang added a comment -

        The above query still has semantics analysis error. Here's a working example demonstrating the bug:

        select * from (
        select transform(a) using 'cat' as cola from nzhang_tt
        union all
        select transform(a) using 'cat' as cola from nzhang_tt) s;

        The result is 3 rows:
        1
        2
        3

        But the query:

        select cola from (
        select a as cola from nzhang_tt
        union all
        select a as cola from nzhang_tt) s;

        returns 6 rows:

        1
        1
        2
        2
        3
        3

        Show
        Ning Zhang added a comment - The above query still has semantics analysis error. Here's a working example demonstrating the bug: select * from ( select transform(a) using 'cat' as cola from nzhang_tt union all select transform(a) using 'cat' as cola from nzhang_tt) s; The result is 3 rows: 1 2 3 But the query: select cola from ( select a as cola from nzhang_tt union all select a as cola from nzhang_tt) s; returns 6 rows: 1 1 2 2 3 3
        Hide
        Ashish Thusoo added a comment -

        Which one is the correct result?

        Show
        Ashish Thusoo added a comment - Which one is the correct result?
        Hide
        Ning Zhang added a comment -

        The 6-row one is correct since it is "union all" operator.

        Show
        Ning Zhang added a comment - The 6-row one is correct since it is "union all" operator.
        Hide
        Ning Zhang added a comment -

        Another related bug:

        – bug1
        select * from (
        select transform(b) using 'cat' as cola from nzhang_tt
        union all
        select transform(c) using 'cat' as cola from nzhang_tt) s;

        results:
        NULL
        NULL
        NULL

        – bug2
        select * from (
        select b as cola from nzhang_tt
        union all
        select c as cola from nzhang_tt) s;
        results:
        NULL
        b
        NULL
        e
        NULL
        i

        Data in the table:

        select b, c from nzhang_tt;

        results:
        b c
        e f
        i j

        This bug may not be only due to race condition between threads. Investigating the more generic bug in UNION operator.

        Show
        Ning Zhang added a comment - Another related bug: – bug1 select * from ( select transform(b) using 'cat' as cola from nzhang_tt union all select transform(c) using 'cat' as cola from nzhang_tt) s; results: NULL NULL NULL – bug2 select * from ( select b as cola from nzhang_tt union all select c as cola from nzhang_tt) s; results: NULL b NULL e NULL i Data in the table: select b, c from nzhang_tt; results: b c e f i j This bug may not be only due to race condition between threads. Investigating the more generic bug in UNION operator.
        Hide
        Ning Zhang added a comment -

        Found out the reason for bug2: nzhang_tt is stored as RCFile and it turns out to be a bug in RCFile rather than UNION all. Will file a bug for RCFile. Bug1 and the original bug is still caused by multi-threading.

        Show
        Ning Zhang added a comment - Found out the reason for bug2: nzhang_tt is stored as RCFile and it turns out to be a bug in RCFile rather than UNION all. Will file a bug for RCFile. Bug1 and the original bug is still caused by multi-threading.
        Hide
        He Yongqiang added a comment -

        Hi Ning, i uploaded a patch for hive-796. That is a bug in HiveInputFormat which incorrectly erases column ids previously set. Can you have a try again? Thanks.

        Show
        He Yongqiang added a comment - Hi Ning, i uploaded a patch for hive-796. That is a bug in HiveInputFormat which incorrectly erases column ids previously set. Can you have a try again? Thanks.
        Hide
        Ning Zhang added a comment -

        This patch solves the bug in "UNION ALL" two SCRIPT operators but only show results from one operator. The reason is because the UNION Operator has two parents and when one of the parent is still OPEN, it shouldn't close its child operator.

        Will look the race condition issue and try to reproduce it.

        Show
        Ning Zhang added a comment - This patch solves the bug in "UNION ALL" two SCRIPT operators but only show results from one operator. The reason is because the UNION Operator has two parents and when one of the parent is still OPEN, it shouldn't close its child operator. Will look the race condition issue and try to reproduce it.
        Hide
        Zheng Shao added a comment -

        @Hive-790.patch:
        1547: If this operator can be closed only if all parents are in the state FINISHED.
        An extra "If" at the beginning?

        Do we need a new state? I think "CLOSE" is enough - it means CLOSE is called on an operator. A child just need to check if all parents' states are CLOSE.
        We just need another function "areAllParentsClosed".

        The result of the test case is non-deterministic. Can you add a "SORT BY" at the end?

        Show
        Zheng Shao added a comment - @Hive-790.patch: 1547: If this operator can be closed only if all parents are in the state FINISHED. An extra "If" at the beginning? Do we need a new state? I think "CLOSE" is enough - it means CLOSE is called on an operator. A child just need to check if all parents' states are CLOSE. We just need another function "areAllParentsClosed". The result of the test case is non-deterministic. Can you add a "SORT BY" at the end?
        Hide
        Ning Zhang added a comment -

        @zheng, I'll fix the comment and the test query.

        As for the new state, maybe "FINISH" is not a good name for it but I think we need two states since they have two different situations when an operator has two or more parents:
        1) the close() is called on this operator, but it doesn't guarantee all its child operators are also called close() (the FINISH state)
        2) the close() is called and all its children are called close() (the CLOSE state).

        The current code set the state CLOSE at the end of the function, which means all its children (eventually desendants) are closed. So it is the second semantics. What you proposed is the first semantics, to implement which we need to move the statement to set the state to CLOSE to the beginning of the close() function (just after the check of the CLOSE state and return if true).

        We need both both states since if we just have 1 state (CLOSE) and assign it in the beginning, if there are two parents to the operator, when the first parent call close(), this operator will set it state to CLOSE and just return without calling close() to all its children (since the other parent has not been closed). When the second parent call close(), it just return since its state is already closed. So this end up all children are not closed. We should not remove the CLOSE state checkup in the beginning since that may cause an operator being closed multiple times.

        We cannot use just the CLOSE state as it is in the current implementation as well since the CLOSE state is set at the end of the close() function. When a parent calls this operator's close(), the parent's state is still not in CLOSE. So we end up just return and don't close the child operators. If we have the FINISH state and this state is set at the beginning of close(), whenever a parent calls close(), the parent is in the FINISH state and this operator can check and treat FINISH the same as CLOSE except that this operator hasn't return yet.

        Show
        Ning Zhang added a comment - @zheng, I'll fix the comment and the test query. As for the new state, maybe "FINISH" is not a good name for it but I think we need two states since they have two different situations when an operator has two or more parents: 1) the close() is called on this operator, but it doesn't guarantee all its child operators are also called close() (the FINISH state) 2) the close() is called and all its children are called close() (the CLOSE state). The current code set the state CLOSE at the end of the function, which means all its children (eventually desendants) are closed. So it is the second semantics. What you proposed is the first semantics, to implement which we need to move the statement to set the state to CLOSE to the beginning of the close() function (just after the check of the CLOSE state and return if true). We need both both states since if we just have 1 state (CLOSE) and assign it in the beginning, if there are two parents to the operator, when the first parent call close(), this operator will set it state to CLOSE and just return without calling close() to all its children (since the other parent has not been closed). When the second parent call close(), it just return since its state is already closed. So this end up all children are not closed. We should not remove the CLOSE state checkup in the beginning since that may cause an operator being closed multiple times. We cannot use just the CLOSE state as it is in the current implementation as well since the CLOSE state is set at the end of the close() function. When a parent calls this operator's close(), the parent's state is still not in CLOSE. So we end up just return and don't close the child operators. If we have the FINISH state and this state is set at the beginning of close(), whenever a parent calls close(), the parent is in the FINISH state and this operator can check and treat FINISH the same as CLOSE except that this operator hasn't return yet.
        Hide
        Ning Zhang added a comment -

        This patch incorporated Zheng's comments.

        Also changed the UnionOperator.processOp to synchronized. This seems fixed the race condition issue – the 2nd mapping phase passed w/o IOException. The reducers are still running but the "UNION" and "SCRIPT" operators should already passed.

        Will look into any possible performance hit of the introduction of synchronized. We should open another JIRA if there is any.

        Show
        Ning Zhang added a comment - This patch incorporated Zheng's comments. Also changed the UnionOperator.processOp to synchronized. This seems fixed the race condition issue – the 2nd mapping phase passed w/o IOException. The reducers are still running but the "UNION" and "SCRIPT" operators should already passed. Will look into any possible performance hit of the introduction of synchronized. We should open another JIRA if there is any.
        Hide
        Zheng Shao added a comment -

        Overall it looks good.

        Can you measure the performance impact by running a union of 2 simple "select *"? If it's less than 5%, let's just leave it as it is. Otherwise let's open another JIRA to improve it.

        UnionOperator's close() also needs to be synchronized.

        We need both both states since if we just have 1 state (CLOSE) and assign it in the beginning, if there are two parents to the operator, when the first parent call close(), this operator will set it state to CLOSE and just return without calling close() to all its children (since the other parent has not been closed). When the second parent call close(), it just return since its state is already closed. So this end up all children are not closed. We should not remove the CLOSE state checkup in the beginning since that may cause an operator being closed multiple times.

        Can we do this:

        public void close(boolean abort) {
          // only close when all parents are closed.
          if (!allParentsAreClosed()) {
            return;
          }
        
          this.state = CLOSE;
        
          for (int i=0; i<children.size(); i++) {
            children.get(i).close(abort);
          }
        }
        
        
        Show
        Zheng Shao added a comment - Overall it looks good. Can you measure the performance impact by running a union of 2 simple "select *"? If it's less than 5%, let's just leave it as it is. Otherwise let's open another JIRA to improve it. UnionOperator's close() also needs to be synchronized. We need both both states since if we just have 1 state (CLOSE) and assign it in the beginning, if there are two parents to the operator, when the first parent call close(), this operator will set it state to CLOSE and just return without calling close() to all its children (since the other parent has not been closed). When the second parent call close(), it just return since its state is already closed. So this end up all children are not closed. We should not remove the CLOSE state checkup in the beginning since that may cause an operator being closed multiple times. Can we do this: public void close( boolean abort) { // only close when all parents are closed. if (!allParentsAreClosed()) { return ; } this .state = CLOSE; for ( int i=0; i<children.size(); i++) { children.get(i).close(abort); } }
        Hide
        Zheng Shao added a comment -

        The code above does not allow simple overriding of the close function.
        The following code does. It mimics the structure of initialize(...).

        public void close(boolean abort) {
          // only close when all parents are closed.
          if (!allParentsAreClosed()) {
            return;
          }
        
          this.state = CLOSE;
          // Close this operator
          this.closeOp(boolean abort);
        
          // Close all children
          for (int i=0; i<children.size(); i++) {
            children.get(i).close(abort);
          }
        }
        
        protected void closeOp(boolean abort) {
          // Different operator will have different states.
        }
        

        I am also OK if you do want to add a new state, but please rename the states to something like "CLOSING, CLOSED", instead of "FINISHED", "CLOSE" which are harder to understand.

        Show
        Zheng Shao added a comment - The code above does not allow simple overriding of the close function. The following code does. It mimics the structure of initialize(...). public void close( boolean abort) { // only close when all parents are closed. if (!allParentsAreClosed()) { return ; } this .state = CLOSE; // Close this operator this .closeOp( boolean abort); // Close all children for ( int i=0; i<children.size(); i++) { children.get(i).close(abort); } } protected void closeOp( boolean abort) { // Different operator will have different states. } I am also OK if you do want to add a new state, but please rename the states to something like "CLOSING, CLOSED", instead of "FINISHED", "CLOSE" which are harder to understand.
        Hide
        Ning Zhang added a comment -

        These functions lacks the CLOSE state check at the beginning:

        if ( state == State.CLOSE )
        return;

        So if the close() function is called twice, there is no guard to guarantee the body of the close() (meaning closeOp and close() of all its children) is called only once. If we can guarantee the close() is called only once, then this implementation should be fine and we don't need two states.

        Show
        Ning Zhang added a comment - These functions lacks the CLOSE state check at the beginning: if ( state == State.CLOSE ) return; So if the close() function is called twice, there is no guard to guarantee the body of the close() (meaning closeOp and close() of all its children) is called only once. If we can guarantee the close() is called only once, then this implementation should be fine and we don't need two states.
        Hide
        Zheng Shao added a comment -

        Added the guard.

        public void close(boolean abort) {
        
          // Do not close twice
          if (this.state == CLOSE) {
            return;
          }
        
          // only close when all parents are closed.
          if (!allParentsAreClosed()) {
            return;
          }
        
          this.state = CLOSE;
          // Close this operator
          this.closeOp(boolean abort);
        
          // Close all children
          for (int i=0; i<children.size(); i++) {
            children.get(i).close(abort);
          }
        }
        
        protected void closeOp(boolean abort) {
          // Different operator will have different states.
        }
        
        Show
        Zheng Shao added a comment - Added the guard. public void close( boolean abort) { // Do not close twice if ( this .state == CLOSE) { return ; } // only close when all parents are closed. if (!allParentsAreClosed()) { return ; } this .state = CLOSE; // Close this operator this .closeOp( boolean abort); // Close all children for ( int i=0; i<children.size(); i++) { children.get(i).close(abort); } } protected void closeOp( boolean abort) { // Different operator will have different states. }
        Hide
        Ning Zhang added a comment -

        Discussed with Zheng offline. Zheng's proposal works as long as we change the semantics of CLOSE state from "all its parents and children are closed" to "all its parents are closed, but not necessarily its children". It should be fine since we don't need to first semantics now. If it is needed later we probably need to add another state.

        Also for the close() to be synchronized, it seems not necessary for the script operator since close() is always called by the main thread and the union operator's close is called by one script operator at a time.

        Show
        Ning Zhang added a comment - Discussed with Zheng offline. Zheng's proposal works as long as we change the semantics of CLOSE state from "all its parents and children are closed" to "all its parents are closed, but not necessarily its children". It should be fine since we don't need to first semantics now. If it is needed later we probably need to add another state. Also for the close() to be synchronized, it seems not necessary for the script operator since close() is always called by the main thread and the union operator's close is called by one script operator at a time.
        Hide
        Ning Zhang added a comment -

        Incorporated Zheng's comments.

        Show
        Ning Zhang added a comment - Incorporated Zheng's comments.
        Hide
        Ning Zhang added a comment -

        Measured the performance of UnionOperator.processOp() sync vs. no-sync. Surprisingly the sync one performs a little bit better. Here's the query:

        insert overwrite table tmp_nzhang_ad_union select * from (select * from nzhang_ad_imps_2_lazysimple union all select * from nzhang_ad_imps_2_lazysimple) s;

        The table nzhang_ad_imps_2_lazysimple has 180k rows and about 100MB. I run the query twice for each test and looked at the mapper's log for the wallclock time (end_time-begin_time).

        Sync:
        mappers of 1st MapRed job: avg over all mappers of two runs: 3.75025 sec
        mappers of 2nd MapRed Job: avg over all mappers of two runs: 5.152 sec.

        No-sync:
        mappers of 1st MapRed job: avg over all mappers of two runs: 4.1065 sec
        mappers of 2nd MapRed Job: avg over all mappers of two runs: 5.252 sec.

        Show
        Ning Zhang added a comment - Measured the performance of UnionOperator.processOp() sync vs. no-sync. Surprisingly the sync one performs a little bit better. Here's the query: insert overwrite table tmp_nzhang_ad_union select * from (select * from nzhang_ad_imps_2_lazysimple union all select * from nzhang_ad_imps_2_lazysimple) s; The table nzhang_ad_imps_2_lazysimple has 180k rows and about 100MB. I run the query twice for each test and looked at the mapper's log for the wallclock time (end_time-begin_time). Sync: mappers of 1st MapRed job: avg over all mappers of two runs: 3.75025 sec mappers of 2nd MapRed Job: avg over all mappers of two runs: 5.152 sec. No-sync: mappers of 1st MapRed job: avg over all mappers of two runs: 4.1065 sec mappers of 2nd MapRed Job: avg over all mappers of two runs: 5.252 sec.
        Hide
        Ning Zhang added a comment -

        BTW, the performance testing is on the test cluster, so no other jobs are running.

        Show
        Ning Zhang added a comment - BTW, the performance testing is on the test cluster, so no other jobs are running.
        Hide
        Zheng Shao added a comment -

        @Hive-790_3.patch:
        We need to change the close() method in all XXXOperator to be named closeOp() (and possibly remove the check of "this.STATE == CLOSE")

        Also, please run all tests and make sure they all pass.

        Show
        Zheng Shao added a comment - @Hive-790_3.patch: We need to change the close() method in all XXXOperator to be named closeOp() (and possibly remove the check of "this.STATE == CLOSE") Also, please run all tests and make sure they all pass.
        Hide
        Ning Zhang added a comment -

        Attached a new patch Hive-790_4.patch that solves some other issues when running unit tests. One issue is found that some operators are are called close(). This may cause problem for union_all operator that assumes all parents should be closed before it can close.
        Also change the Operator.close() to be a generic close function for all operators. Other operators that inherent it shouldn't reimplement the logic over and over again. If they have special clean up to do, include it in closeOp() which can be override from Operator..

        Show
        Ning Zhang added a comment - Attached a new patch Hive-790_4.patch that solves some other issues when running unit tests. One issue is found that some operators are are called close(). This may cause problem for union_all operator that assumes all parents should be closed before it can close. Also change the Operator.close() to be a generic close function for all operators. Other operators that inherent it shouldn't reimplement the logic over and over again. If they have special clean up to do, include it in closeOp() which can be override from Operator..
        Hide
        Namit Jain added a comment -

        +1

        looks good - will commit if the tests pass

        Show
        Namit Jain added a comment - +1 looks good - will commit if the tests pass
        Hide
        Namit Jain added a comment -

        Committed. Thanks Ning

        Show
        Namit Jain added a comment - Committed. Thanks Ning
        Hide
        Zheng Shao added a comment -

        do we want to commit to both 0.4 and trunk since this is a bug fix?

        Show
        Zheng Shao added a comment - do we want to commit to both 0.4 and trunk since this is a bug fix?
        Hide
        Ning Zhang added a comment -

        Tested performance on a larger data set: the base table is 47 million rows of ad_imps_2 with compressed storage of 89MB. The query is

        insert overwrite table Tmp select * from (select * from Base union all select * from Base) s;

        With synchronized UnionOperator.processOp(), mapper 1 got average time of 476.05 secs, mapper2 average time = 648.5 secs
        With non-sync UnionOperator.processOp(), mapper 1 average = 476.34 secs, mapper2 average time = 655.117 sec.

        The union operator is executed in mapper 1 and there is almost no difference by introducing the synchronized keyword. So no new JIRA is needed for performance regression.

        Show
        Ning Zhang added a comment - Tested performance on a larger data set: the base table is 47 million rows of ad_imps_2 with compressed storage of 89MB. The query is insert overwrite table Tmp select * from (select * from Base union all select * from Base) s; With synchronized UnionOperator.processOp(), mapper 1 got average time of 476.05 secs, mapper2 average time = 648.5 secs With non-sync UnionOperator.processOp(), mapper 1 average = 476.34 secs, mapper2 average time = 655.117 sec. The union operator is executed in mapper 1 and there is almost no difference by introducing the synchronized keyword. So no new JIRA is needed for performance regression.
        Hide
        Namit Jain added a comment -

        I will try to apply the existing patch to 0.4, and see if it works

        Show
        Namit Jain added a comment - I will try to apply the existing patch to 0.4, and see if it works
        Hide
        Namit Jain added a comment -

        @Ning, there were some problems with the patch when I applied to 0.4 - can you re-generate the patch for 0.4

        Show
        Namit Jain added a comment - @Ning, there were some problems with the patch when I applied to 0.4 - can you re-generate the patch for 0.4
        Hide
        Namit Jain added a comment -

        need to fix for 0.4 also

        Show
        Namit Jain added a comment - need to fix for 0.4 also
        Hide
        Ning Zhang added a comment -

        patch Hive_790_0.4.0.patch is the patch for branch 0.4.0.

        Show
        Ning Zhang added a comment - patch Hive_790_0.4.0.patch is the patch for branch 0.4.0.
        Hide
        Namit Jain added a comment -

        committed in 0.4 also - thanks Ning

        Show
        Namit Jain added a comment - committed in 0.4 also - thanks Ning

          People

          • Assignee:
            Ning Zhang
            Reporter:
            Zheng Shao
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development