Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Release Note:
      Added LIMIT to Hive query language.

      Description

      Add a limit feature to the Hive Query language.

      so you can do the following things:

      SELECT * FROM T LIMIT 10;

      and this would just return the 10 rows.

      No gaurantees are made on which 10 rows are returned by the query.

        Issue Links

          Activity

          Hide
          Ashish Thusoo added a comment -

          A few thoughts on this...

          The idea here is to implement a limited version of the limit clause as it appears in mysql. I am not planning to implement the offset part of it. Basically I want to support

          SELECT .... LIMIT N

          where N is the number of rows to be returned from the query block (note this is only for the selects in the query block).

          While generating the plan for the query, once the plan for the query block has been generated I can add the plan fragment

          LimitMap -> ReduceSink -> LimitReduce

          to it.

          So for example if the plan of the query block is something like...

          opX1 -> opX2 .... -> ReduceSink -> reduce op -> opY1 -> opY2 ...

          This would look like

          opX1 -> opX2 ... -> ReduceSink -> reduce op -> opY1 -> opY2 ... -> LimitMapOp -> ReduceSink -> LimitReduceOp

          This should also work seemlessly with plans that do not have a ReduceSink ie. plans that look like

          opX1 -> opX2 ... -> opXn

          will look like

          opX1 -> opX2 ... -> opXn -> LimitMap -> ReduceSink -> LinkReduce

          Suppose we are calculating limit N the LimitMap will pass through N rows from each mapper and the LinkReduce will return N rows out of the ones it receives from the mappers. We have to run this map/reduce job with 1 reducer.

          Thoughts?

          Show
          Ashish Thusoo added a comment - A few thoughts on this... The idea here is to implement a limited version of the limit clause as it appears in mysql. I am not planning to implement the offset part of it. Basically I want to support SELECT .... LIMIT N where N is the number of rows to be returned from the query block (note this is only for the selects in the query block). While generating the plan for the query, once the plan for the query block has been generated I can add the plan fragment LimitMap -> ReduceSink -> LimitReduce to it. So for example if the plan of the query block is something like... opX1 -> opX2 .... -> ReduceSink -> reduce op -> opY1 -> opY2 ... This would look like opX1 -> opX2 ... -> ReduceSink -> reduce op -> opY1 -> opY2 ... -> LimitMapOp -> ReduceSink -> LimitReduceOp This should also work seemlessly with plans that do not have a ReduceSink ie. plans that look like opX1 -> opX2 ... -> opXn will look like opX1 -> opX2 ... -> opXn -> LimitMap -> ReduceSink -> LinkReduce Suppose we are calculating limit N the LimitMap will pass through N rows from each mapper and the LinkReduce will return N rows out of the ones it receives from the mappers. We have to run this map/reduce job with 1 reducer. Thoughts?
          Hide
          Joydeep Sen Sarma added a comment - - edited

          some questions:

          • The extra reducesink (in the limitmap -> reducesink -> linkreduce) - what will it reduce on?
          • in many cases - the limit does not seem to need a reduce. for example - in the dumbest case - select * limit N - we just need to run the mappers and then keep concatenating mapper outputs until we have N rows.
          • in the other case where the priot output is sorted/grouped - we need to have top-N operator as limit - that merges prior output and gets top N.

          based on last 2 observations - i find it much easier to understand the limit operator implementation as:

          • a simple select * like operator on a dataset (a table - whether it's an intermediate dataset or not)
          • there are two cases:
          • if the table/data is sorted/grouped - then the limit operator needs to do a merge of all the tables files and produce top N
          • if the table/data is not sorted/grouped - then the limit task needs to get any N rows - possibly by scanning one file at a time
            the limit operator is sequential by definition.

          the limit operator can run in a single mapper map-only hadoop job in case it's writing to a file - or if it's writing to console (select * limit N) - can just run from the client side. this is orthogonal to what it does.

          Show
          Joydeep Sen Sarma added a comment - - edited some questions: The extra reducesink (in the limitmap -> reducesink -> linkreduce) - what will it reduce on? in many cases - the limit does not seem to need a reduce. for example - in the dumbest case - select * limit N - we just need to run the mappers and then keep concatenating mapper outputs until we have N rows. in the other case where the priot output is sorted/grouped - we need to have top-N operator as limit - that merges prior output and gets top N. based on last 2 observations - i find it much easier to understand the limit operator implementation as: a simple select * like operator on a dataset (a table - whether it's an intermediate dataset or not) there are two cases: if the table/data is sorted/grouped - then the limit operator needs to do a merge of all the tables files and produce top N if the table/data is not sorted/grouped - then the limit task needs to get any N rows - possibly by scanning one file at a time the limit operator is sequential by definition. the limit operator can run in a single mapper map-only hadoop job in case it's writing to a file - or if it's writing to console (select * limit N) - can just run from the client side. this is orthogonal to what it does.
          Hide
          Ashish Thusoo added a comment -

          Answers:
          1. The reduce sink is on a random key - anyway this is all going to 1 reducer. The intent is to send these to one node to return N rows out of all the results returned from the mappers.
          2. hmm.. maybe we should then do the same thing for all the limits and eliminate the reducer all together.

          basically the plan would comprise of 2 tasks:

          TaskN-1: op chain.... -> LimitMapOp
          TaskN: concatenation on the results of TaskN-1

          I think that will work fine in all the cases.

          3. I think we should keep the topN separate from limit. The map reduce plan for topN would look something like

          op chain -> TopNmap -> reduce sink -< TopNreduce

          the reduce/merge that you point to can be done in the TopNreduce which would produce a single file and on this file we could apply the limitMapOp (which we can eliminate if we know that the previous stage produces a single file) followed by the concatenation task.

          That way we can have a generic implementation of limit - the merge being a part of the topN instead of limit.

          Show
          Ashish Thusoo added a comment - Answers: 1. The reduce sink is on a random key - anyway this is all going to 1 reducer. The intent is to send these to one node to return N rows out of all the results returned from the mappers. 2. hmm.. maybe we should then do the same thing for all the limits and eliminate the reducer all together. basically the plan would comprise of 2 tasks: TaskN-1: op chain.... -> LimitMapOp TaskN: concatenation on the results of TaskN-1 I think that will work fine in all the cases. 3. I think we should keep the topN separate from limit. The map reduce plan for topN would look something like op chain -> TopNmap -> reduce sink -< TopNreduce the reduce/merge that you point to can be done in the TopNreduce which would produce a single file and on this file we could apply the limitMapOp (which we can eliminate if we know that the previous stage produces a single file) followed by the concatenation task. That way we can have a generic implementation of limit - the merge being a part of the topN instead of limit.
          Hide
          Ashish Thusoo added a comment -

          In case the limit is inside a sub query e.g.

          select c1 from (select c2 from t join t2 on (..) limit 10)

          it may still be better to put the limit operation in the reducer...

          Show
          Ashish Thusoo added a comment - In case the limit is inside a sub query e.g. select c1 from (select c2 from t join t2 on (..) limit 10) it may still be better to put the limit operation in the reducer...
          Hide
          Joydeep Sen Sarma added a comment -

          one thing i found fairly ridiculous is that the current select * from <blah> actually runs a map-reduce job. we have to fix this .

          if the LimitMapOp can be run in a separate client side task that dumps to console instead of to a file (in case we are not emitting to a table) - that would kill two birds with one stone.

          the limit in the inner clause is interesting. how we wish there was a no-sort option for map-reduce! the sorting is high overhead - so a separate concatenator task (which may still be run on the cluster where the concatenation runs inside a single mapper no-reducer map-reduce job) may be better. (that is assuming we are doing a redundant sort - which may not be true in all cases).

          Show
          Joydeep Sen Sarma added a comment - one thing i found fairly ridiculous is that the current select * from <blah> actually runs a map-reduce job. we have to fix this . if the LimitMapOp can be run in a separate client side task that dumps to console instead of to a file (in case we are not emitting to a table) - that would kill two birds with one stone. the limit in the inner clause is interesting. how we wish there was a no-sort option for map-reduce! the sorting is high overhead - so a separate concatenator task (which may still be run on the cluster where the concatenation runs inside a single mapper no-reducer map-reduce job) may be better. (that is assuming we are doing a redundant sort - which may not be true in all cases).
          Hide
          Ashish Thusoo added a comment -

          Agreed... We do need a client side iterator operator and limit operator can be pushed to it in the simple case.

          Show
          Ashish Thusoo added a comment - Agreed... We do need a client side iterator operator and limit operator can be pushed to it in the simple case.
          Hide
          Ashish Thusoo added a comment -

          Fixed as part of JIRA-4230

          Show
          Ashish Thusoo added a comment - Fixed as part of JIRA-4230

            People

            • Assignee:
              Namit Jain
              Reporter:
              Ashish Thusoo
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development