HBase
  1. HBase
  2. HBASE-4435

Add Group By functionality using Coprocessors

    Details

      Description

      Adds in a Group By -like functionality to HBase, using the Coprocessor framework.

      It provides the ability to group the result set on one or more columns (groupBy families). It computes statistics (max, min, sum, count, sum of squares, number missing) for a second column, called the stats column.

      To use, I've provided two implementations.

      1. In the first, you specify a single group-by column and a stats field:

      statsMap = gbc.getStats(tableName, scan, groupByFamily, groupByQualifier, statsFamily, statsQualifier, statsFieldColumnInterpreter);

      The result is a map with the Group By column value (as a String) to a GroupByStatsValues object. The GroupByStatsValues object has max,min,sum etc. of the stats column for that group.

      2. The second implementation allows you to specify a list of group-by columns and a stats field. The List of group-by columns is expected to contain lists of

      {column family, qualifier}

      pairs.

      statsMap = gbc.getStats(tableName, scan, listOfGroupByColumns, statsFamily, statsQualifier, statsFieldColumnInterpreter);

      The GroupByStatsValues code is adapted from the Solr Stats component.

      1. HBase-4435.patch
        29 kB
        Nichole Treadway
      2. HBASE-4435-v2.patch
        52 kB
        Aaron Tokhy

        Issue Links

          Activity

          Hide
          Aaron Tokhy added a comment -

          I was working on other coprocessors in addition to this one in performing reverse indexing for range queries as well as filters that could be used to only scan a small portion of a region given a key-list.

          I created a Github for this work, so that I could split this work out into individual JIRA tickets.

          https://github.com/atokhy/secondary-index-coprocessor

          I'll be working with HBase 0.94.1 until I have a complete working implementation, and eventually rewrite most of it to use Google's protobuf API as another attempt.

          Show
          Aaron Tokhy added a comment - I was working on other coprocessors in addition to this one in performing reverse indexing for range queries as well as filters that could be used to only scan a small portion of a region given a key-list. I created a Github for this work, so that I could split this work out into individual JIRA tickets. https://github.com/atokhy/secondary-index-coprocessor I'll be working with HBase 0.94.1 until I have a complete working implementation, and eventually rewrite most of it to use Google's protobuf API as another attempt.
          Hide
          Anoop Sam John added a comment -

          May be good to add to the CP examples section. Someone working with this?

          Show
          Anoop Sam John added a comment - May be good to add to the CP examples section. Someone working with this?
          Hide
          Aaron Tokhy added a comment -

          Thanks for the quick review, I'll soon update JIRA with a new patch, based off of SVN trunk, though not at the moment. Also I'll have to clean up some of the code, thanks for the quick feedback!

          I may also change a few other things, such as using HashedBytes instead of Text to be able to perform roll-ups of types other than UTF-8 strings.

          Show
          Aaron Tokhy added a comment - Thanks for the quick review, I'll soon update JIRA with a new patch, based off of SVN trunk, though not at the moment. Also I'll have to clean up some of the code, thanks for the quick feedback! I may also change a few other things, such as using HashedBytes instead of Text to be able to perform roll-ups of types other than UTF-8 strings.
          Hide
          Ted Yu added a comment -

          I didn't find any test in the patch. It would be difficult for a feature to be accepted without new tests.
          Should GroupByStatsValues be named GroupByStats (since stats imply some values) ?

          + * Copyright 2012 The Apache Software Foundation
          

          The above line is no longer needed in license header.

          BigDecimalColumnInterpreter is covered in HBASE-6669. To make the workload reasonable for this JIRA, you can exclude it from patch.

          +public class CharacterColumnInterpreter implements ColumnInterpreter<Character, Character> {
          

          Add annotation for audience and stability for public classes.

          In GroupByClient.java, the following import can be removed:

          +import com.sun.istack.logging.Logger;
          
          +    Map<Text, GroupByStatsValues<T, S>> getStats(
          +      final byte[] tableName, final Scan scan, 
          +      final List<byte [][]> groupByTuples, final byte[][] statsTuple, 
          

          The @param for the above method doesn't match actual parameters - probably you changed API in later iteration.

          +    class RowNumCallback implements
          

          The above class can be made private.
          I think we should find a better name for the above class - it does aggregation.

          +        long bt = System.currentTimeMillis();
          

          Please use EnvironmentEdge instead.

          +    table.close();
          

          Please enclose the above in finally clause.

          Show
          Ted Yu added a comment - I didn't find any test in the patch. It would be difficult for a feature to be accepted without new tests. Should GroupByStatsValues be named GroupByStats (since stats imply some values) ? + * Copyright 2012 The Apache Software Foundation The above line is no longer needed in license header. BigDecimalColumnInterpreter is covered in HBASE-6669 . To make the workload reasonable for this JIRA, you can exclude it from patch. + public class CharacterColumnInterpreter implements ColumnInterpreter< Character , Character > { Add annotation for audience and stability for public classes. In GroupByClient.java, the following import can be removed: + import com.sun.istack.logging.Logger; + Map<Text, GroupByStatsValues<T, S>> getStats( + final byte [] tableName, final Scan scan, + final List< byte [][]> groupByTuples, final byte [][] statsTuple, The @param for the above method doesn't match actual parameters - probably you changed API in later iteration. + class RowNumCallback implements The above class can be made private. I think we should find a better name for the above class - it does aggregation. + long bt = System .currentTimeMillis(); Please use EnvironmentEdge instead. + table.close(); Please enclose the above in finally clause.
          Hide
          Ted Yu added a comment -

          Thanks for the patch.
          Can you provide trunk patch following the example of:
          HBASE-6785 'Convert AggregateProtocol to protobuf defined coprocessor service'

          Will provide comments soon.

          For patch of this size, review board (https://reviews.apache.org) would help reviewers.

          Show
          Ted Yu added a comment - Thanks for the patch. Can you provide trunk patch following the example of: HBASE-6785 'Convert AggregateProtocol to protobuf defined coprocessor service' Will provide comments soon. For patch of this size, review board ( https://reviews.apache.org ) would help reviewers.
          Hide
          Aaron Tokhy added a comment -

          I have a newer version of the patch:

          Improvements:

          1) Added implementations of ColumnInterpreter classes so both AggregationClient and GroupByClient could perform aggregations on Long, Short, Integer, Double, Float, Character (or unsigned short), and BigDecimal types.

          2) The GroupByStatsValues class is a Java generic that constrains on Java types that only implement the 'Number' interface. This way the generic is constrained for those types at compile time.

          3) Previously, a HashMap was returned at the end of each RPC call. HashMap uses java.io.Serializable, which is relatively heavyweight. Switched to using the Hadoop Writable interface so all objects passed between clients and regionservers use the Hadoop Writable interface.

          4) Fixed some validateParameter bugs in the previous patch which would allow selections of column qualifiers not found in the Scan object to go through.

          Caveats:

          1) This works well if your resultset fits into memory as group by values are aggregated into a HashMap on the client. Therefore, if the cardinality of the aggregation table is too high, you may get an OOME.

          2) All aggregations are calculated by the 'GroupByStatsValues' container. Perhaps at object construction, a 'statsvalues' can be constructed to only perform some of the aggregations instead of all of them at the same time. However this operation is Scan (IO) bound, so improvements would be minimal here.

          3) Like all coprocessors that accept a Scan object, if the aggregation is performing a full table scan, this will run on all regionservers. Each region level coprocessor is loaded into an IPC handler (default of 10) on the regionserver. If the regionserver has more regions than IPC handlers, only 10 group by operations will run at a time.

          Depending on your table schema, region size and blockCacheHitRatio, your mileage may vary. If data can be preaggregated for a group by operation, this patch would be handy for aggregating a single column value projection of the original full table. A column oriented representation of the original table would work well in this case, or possibly a client/coprocessor managed secondary index.

          The patch applies cleanly onto HBase 0.92.1 and HBase 0.94.1.

          Show
          Aaron Tokhy added a comment - I have a newer version of the patch: Improvements: 1) Added implementations of ColumnInterpreter classes so both AggregationClient and GroupByClient could perform aggregations on Long, Short, Integer, Double, Float, Character (or unsigned short), and BigDecimal types. 2) The GroupByStatsValues class is a Java generic that constrains on Java types that only implement the 'Number' interface. This way the generic is constrained for those types at compile time. 3) Previously, a HashMap was returned at the end of each RPC call. HashMap uses java.io.Serializable, which is relatively heavyweight. Switched to using the Hadoop Writable interface so all objects passed between clients and regionservers use the Hadoop Writable interface. 4) Fixed some validateParameter bugs in the previous patch which would allow selections of column qualifiers not found in the Scan object to go through. Caveats: 1) This works well if your resultset fits into memory as group by values are aggregated into a HashMap on the client. Therefore, if the cardinality of the aggregation table is too high, you may get an OOME. 2) All aggregations are calculated by the 'GroupByStatsValues' container. Perhaps at object construction, a 'statsvalues' can be constructed to only perform some of the aggregations instead of all of them at the same time. However this operation is Scan (IO) bound, so improvements would be minimal here. 3) Like all coprocessors that accept a Scan object, if the aggregation is performing a full table scan, this will run on all regionservers. Each region level coprocessor is loaded into an IPC handler (default of 10) on the regionserver. If the regionserver has more regions than IPC handlers, only 10 group by operations will run at a time. Depending on your table schema, region size and blockCacheHitRatio, your mileage may vary. If data can be preaggregated for a group by operation, this patch would be handy for aggregating a single column value projection of the original full table. A column oriented representation of the original table would work well in this case, or possibly a client/coprocessor managed secondary index. The patch applies cleanly onto HBase 0.92.1 and HBase 0.94.1.
          Hide
          Ted Yu added a comment -

          @Nicole:
          The attached patch is half year old. Do you have a newer version ?

          Show
          Ted Yu added a comment - @Nicole: The attached patch is half year old. Do you have a newer version ?
          Hide
          lifeng added a comment -

          when can this patch be put into hbase?

          Show
          lifeng added a comment - when can this patch be put into hbase?
          Hide
          lifeng added a comment -

          60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel

          Show
          lifeng added a comment - 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel
          Hide
          Jeff Hammerbacher added a comment -

          How does this approach compare to HBASE-1512?

          Show
          Jeff Hammerbacher added a comment - How does this approach compare to HBASE-1512 ?
          Hide
          Nichole Treadway added a comment -

          My first patch included some additional unrelated changes to other parts of the code base that I did not want to include in this patch...sorry about that.

          Show
          Nichole Treadway added a comment - My first patch included some additional unrelated changes to other parts of the code base that I did not want to include in this patch...sorry about that.

            People

            • Assignee:
              Unassigned
              Reporter:
              Nichole Treadway
            • Votes:
              1 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:

                Development