Uploaded image for project: 'Hive'
  1. Hive
  2. HIVE-22666

Introduce TopNKey operator for PTF Reduce Sink

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 4.0.0-alpha-1
    • None
    • None

    Description

      EXPLAIN EXTENDED
      SELECT s_state, ranking
      FROM (
       SELECT s_state AS s_state,
       rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
       FROM testtable_n1000) tmp1
       WHERE ranking <= 3;
      
      STAGE DEPENDENCIES:
        Stage-1 is a root stage
        Stage-0 depends on stages: Stage-1
      
      STAGE PLANS:
        Stage: Stage-1
          Tez
      #### A masked pattern was here ####
            Edges:
              Reducer 2 <- Map 1 (SIMPLE_EDGE)
      #### A masked pattern was here ####
            Vertices:
              Map 1 
                  Map Operator Tree:
                      TableScan
                        alias: testtable_n1000
                        Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE Column stats: COMPLETE
                        GatherStats: false
                        Reduce Output Operator
                          key expressions: s_state (type: string), ss_net_profit (type: double)
                          null sort order: az
                          sort order: ++
                          Map-reduce partition columns: s_state (type: string)
                          Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE Column stats: COMPLETE
                          tag: -1
                          TopN: 4
                          TopN Hash Memory Usage: 0.1
                          auto parallelism: true
                  Execution mode: vectorized, llap
                  LLAP IO: no inputs
                  Path -> Alias:
      #### A masked pattern was here ####
                  Path -> Partition:
      #### A masked pattern was here ####
                      Partition
                        base file name: testtable_n1000
                        input format: org.apache.hadoop.mapred.TextInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                        properties:
                          COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
                          bucket_count -1
                          bucketing_version 2
                          column.name.delimiter ,
                          columns s_state,ss_net_profit
                          columns.comments 
                          columns.types string:double
      #### A masked pattern was here ####
                          name default.testtable_n1000
                          numFiles 1
                          numRows 10
                          rawDataSize 80
                          serialization.ddl struct testtable_n1000 { string s_state, double ss_net_profit}
                          serialization.format 1
                          serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          totalSize 90
      #### A masked pattern was here ####
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          properties:
                            COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
                            bucket_count -1
                            bucketing_version 2
                            column.name.delimiter ,
                            columns s_state,ss_net_profit
                            columns.comments 
                            columns.types string:double
      #### A masked pattern was here ####
                            name default.testtable_n1000
                            numFiles 1
                            numRows 10
                            rawDataSize 80
                            serialization.ddl struct testtable_n1000 { string s_state, double ss_net_profit}
                            serialization.format 1
                            serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                            totalSize 90
      #### A masked pattern was here ####
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                          name: default.testtable_n1000
                        name: default.testtable_n1000
                  Truncated Path -> Alias:
                    /testtable_n1000 [testtable_n1000]
              Reducer 2 
                  Execution mode: vectorized, llap
                  Needs Tagging: false
                  Reduce Operator Tree:
                    Select Operator
                      expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: double)
                      outputColumnNames: _col0, _col1
                      Statistics: Num rows: 10 Data size: 3620 Basic stats: COMPLETE Column stats: COMPLETE
                      PTF Operator
                        Function definitions:
                            Input definition
                              input alias: ptf_0
                              output shape: _col0: string, _col1: double
                              type: WINDOWING
                            Windowing table definition
                              input alias: ptf_1
                              name: windowingtablefunction
                              order by: _col1 ASC NULLS LAST
                              partition by: _col0
                              raw input shape:
                              window functions:
                                  window function definition
                                    alias: rank_window_0
                                    arguments: _col1
                                    name: rank
                                    window function: GenericUDAFRankEvaluator
                                    window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
                                    isPivotResult: true
                        Statistics: Num rows: 10 Data size: 3620 Basic stats: COMPLETE Column stats: COMPLETE
                        Filter Operator
                          isSamplingPred: false
                          predicate: (rank_window_0 <= 3) (type: boolean)
                          Statistics: Num rows: 3 Data size: 1086 Basic stats: COMPLETE Column stats: COMPLETE
                          Select Operator
                            expressions: _col0 (type: string), rank_window_0 (type: int)
                            outputColumnNames: _col0, _col1
                            Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE
                            File Output Operator
                              compressed: false
                              GlobalTableId: 0
      #### A masked pattern was here ####
                              NumFilesPerFileSink: 1
                              Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE
      #### A masked pattern was here ####
                              table:
                                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                                  properties:
                                    columns _col0,_col1
                                    columns.types string:int
                                    escape.delim \
                                    hive.serialization.extend.additional.nesting.levels true
                                    serialization.escape.crlf true
                                    serialization.format 1
                                    serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                              TotalFiles: 1
                              GatherStats: false
                              MultiFileSpray: false
      
        Stage: Stage-0
          Fetch Operator
            limit: -1
            Processor Tree:
              ListSink
      

      In this case the topN value (3+1) will be pushed to the ReduceSink (Reduce Output Operator) operator in Map 1
      https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257

      ReduceSink operator uses PTFTopNHash to get the topN rows for each partition key (s_state) value.

      The goals of this jira are:

      • implement supporting partitioning in TopNKeyOperator
      • enable push down of partitioned TopNKeyOperator

      Attachments

        1. HIVE-22666.1.patch
          259 kB
          Krisztian Kasa
        2. HIVE-22666.2.patch
          301 kB
          Krisztian Kasa
        3. HIVE-22666.3.patch
          227 kB
          Krisztian Kasa
        4. HIVE-22666.3.patch
          227 kB
          Krisztian Kasa
        5. HIVE-22666.4.patch
          228 kB
          Krisztian Kasa
        6. HIVE-22666.4.patch
          228 kB
          Krisztian Kasa
        7. HIVE-22666.4.patch
          228 kB
          Krisztian Kasa
        8. HIVE-22666.4.patch
          228 kB
          Krisztian Kasa
        9. HIVE-22666.5.patch
          226 kB
          Krisztian Kasa

        Issue Links

          Activity

            People

              kkasa Krisztian Kasa
              kkasa Krisztian Kasa
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: