Index: src/docbkx/book.xml =================================================================== --- src/docbkx/book.xml (revision 1081943) +++ src/docbkx/book.xml (working copy) @@ -1256,6 +1256,25 @@ ]]> +
+ Connecting to HBase via the Java API + The parameters for connecting to an HBase cluster are contained in an HBaseConfiguration instance. + Through the factory method on HBaseConfiguration... + +Configuration config = HBaseConfiguration.create(); + + .. a client can connect to the HBase configured in the hbase-site.xml available in the classpath. + However, it is also possible to specify the zookeeper quorum for the cluster programmatically via... + +Configuration config = HBaseConfiguration.create(); +config.set("hbase.zookeeper.quorum", "localhost"); // we are running zookeeper locally + + If multiple zookeeper instances are running in a cluster, they may be specified in a comma-list (just like in the XML file). + This Configuration instance can be passed to an + HTable instance via the overloaded constructor. + +
+ @@ -1342,126 +1361,6 @@ - - HBase and MapReduce - See HBase and MapReduce - up in javadocs. - - - - HBase and Schema Design -
- A good general introduction on the strength and weaknesses modelling on - the various non-rdbms datastores is Ian Varleys' Master thesis, - No Relation: The Mixed Blessings of Non-Relational Databases. - - - On the number of column families - - - HBase currently does not do well with anything about two or three column families so keep the number - of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so - if one column family is carrying the bulk of the data bringing on flushes, the adjacent families - will also be flushed though the amount of data they carry is small. Compaction is currently triggered - by the total number of files under a column family. Its not size based. When many column families the - flushing and compaction interaction can make for a bunch of needless i/o loading (To be addressed by - changing flushing and compaction to work on a per column family basis). - - Try to make do with one column famliy if you can in your schemas. Only introduce a - second and third column family in the case where data access is usually column scoped; - i.e. you query one column family or the other but usually not both at the one time. - -
-
- - Monotonically Increasing Row Keys/Timeseries Data - - - In the HBase chapter of Tom White's book Hadoop: The Definitive Guide (O'Reilly) there is a an optimization note on watching out for a phenomenon where an import process walks in lock-step with all clients in concert pounding one of the table's regions (and thus, a single node), then moving onto the next region, etc. With monotonically increasing row-keys (i.e., using a timestamp), this will happen. See this comic by IKai Lan on why monotically increasing row keys are problematic in BigTable-like datastores: - monotonically increasing values are bad. The pile-up on a single region brought on - by monoticially increasing keys can be mitigated by randomizing the input records to not be in sorted order, but in general its best to avoid using a timestamp as the row-key. - - - - If you do need to upload time series data into HBase, you should - study OpenTSDB as a - successful example. It has a page describing the schema it uses in - HBase. The key format in OpenTSDB is effectively [metric_type][event_timestamp], which would appear at first glance to contradict the previous advice about not using a timestamp as the key. However, the difference is that the timestamp is not in the lead position of the key, and the design assumption is that there are dozens or hundreds (or more) of different metric types. Thus, even with a continual stream of input data with a mix of metric types, the Puts are distributed across various points of regions in the table. - -
-
- Try to minimize row and column sizes - In HBase, values are always freighted with their coordinates; as a - cell value passes through the system, it'll be accompanied by its - row, column name, and timestamp. Always. If your rows and column names - are large, especially compared o the size of the cell value, then - you may run up against some interesting scenarios. One such is - the case described by Marc Limotte at the tail of - HBASE-3551 - (recommended!). - Therein, the indices that are kept on HBase storefiles (HFiles) - to facilitate random access may end up occupyng large chunks of the HBase - allotted RAM because the cell value coordinates are large. - Mark in the above cited comment suggests upping the block size so - entries in the store file index happen at a larger interval or - modify the table schema so it makes for smaller rows and column - names. - ` -
-
- - Table Creation: Pre-Creating Regions - - -Tables in HBase are initially created with one region by default. For bulk imports, this means that all clients will write to the same region until it is large enough to split and become distributed across the cluster. A useful pattern to speed up the bulk import process is to pre-create empty regions. Be somewhat conservative in this, because too-many regions can actually degrade performance. An example of pre-creation using hex-keys is as follows (note: this example may need to be tweaked to the individual applications keys): - - -
-  public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
-    throws IOException {
-      try {
-        admin.createTable( table, splits );
-        return true;
-      } catch (TableExistsException e) {
-        logger.info("table " + table.getNameAsString() + " already exists");
-         // the table already exists...
-        return false;  
-      }
-    }
-    public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
-      byte[][] splits = new byte[numRegions-1][];
-      BigInteger lowestKey = new BigInteger(startKey, 16);
-      BigInteger highestKey = new BigInteger(endKey, 16);
-      BigInteger range = highestKey.subtract(lowestKey);
- 
-      BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
-      lowestKey = lowestKey.add(regionIncrement);
-      for(int i=0; i < numRegions-1;i++) {
-        BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
-        byte[] b = String.format("%016x", key).getBytes();
-        splits[i] = b;
-      }
-
-      return splits;
-    }
-  
-
-
- -
- - - Metrics - See Metrics. - - - - - Cluster Replication - See Cluster Replication. - - - Data Model In short, applications store data into HBase tables. @@ -1478,6 +1377,9 @@ are sorted by row key. The sort is byte-ordered. All table accesses are via the table row key -- its primary key. + +TODO: port the CNN example from the HBase Architecture Wiki. +
Table @@ -1589,7 +1491,8 @@ Get/Scan Gets are implemented on top of Scans. The below discussion of - Get applies equally to Scans. + Get applies equally to Scans. By default, i.e. if you specify no explicit version, when doing a get, the cell whose version has the @@ -1613,6 +1516,29 @@ desired version and set the max versions to 1. +
+ Default Get Example + The following Get will only retrieve the current version of the row + +Get get = new Get( Bytes.toBytes("row1") ); +Result r = htable.get(get); +byte[] b = r.getValue( Bytes.toBytes("cf"), Bytes.toBytes("attr") ); // returns current version of value + + +
+
+ Versioned Get Example + The following Get will return the last 3 versions of the row. + +Get get = new Get( Bytes.toBytes("row1") ); +get.setMaxVersions(3); // will return last 3 versions of row +Result r = htable.get(get); +byte[] b = r.getValue( Bytes.toBytes("cf"), Bytes.toBytes("attr") ); // returns current version of value +List<KeyValue> kv = r.getColumn( Bytes.toBytes("cf"), Bytes.toBytes("attr") ); // returns all versions of this column + + +
+
@@ -1628,6 +1554,27 @@ To overwrite an existing value, do a put at exactly the same row, column, and version as that of the cell you would overshadow. +
+ Implicit Version Example + The following Put will be implicitly versioned by HBase with the current time. + +Put put = new Put( Bytes.toBytes( row ) ); +put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data)); +htable.put(put); + + +
+
+ Explicit Version Example + The following Put has the version timestamp explicitly set. + +Put put = new Put( Bytes.toBytes( row ) ); +long explicitTimeInMs = 555; // just an example +put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), explicitTimeInMs, Bytes.toBytes( data)); +htable.put(put); + + +
@@ -1717,6 +1664,240 @@
+ + HBase and Schema Design +
+ + Schema Creation + + HBase schemas can be created or updated through the HBase shell or through + HBaseAdmin in the Java API. + +
+
+ A good general introduction on the strength and weaknesses modelling on + the various non-rdbms datastores is Ian Varleys' Master thesis, + No Relation: The Mixed Blessings of Non-Relational Databases. + + + On the number of column families + + + HBase currently does not do well with anything about two or three column families so keep the number + of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so + if one column family is carrying the bulk of the data bringing on flushes, the adjacent families + will also be flushed though the amount of data they carry is small. Compaction is currently triggered + by the total number of files under a column family. Its not size based. When many column families the + flushing and compaction interaction can make for a bunch of needless i/o loading (To be addressed by + changing flushing and compaction to work on a per column family basis). + + Try to make do with one column famliy if you can in your schemas. Only introduce a + second and third column family in the case where data access is usually column scoped; + i.e. you query one column family or the other but usually not both at the one time. + +
+
+ + Monotonically Increasing Row Keys/Timeseries Data + + + In the HBase chapter of Tom White's book Hadoop: The Definitive Guide (O'Reilly) there is a an optimization note on watching out for a phenomenon where an import process walks in lock-step with all clients in concert pounding one of the table's regions (and thus, a single node), then moving onto the next region, etc. With monotonically increasing row-keys (e.g., using a timestamp), this will happen. See this comic by IKai Lan on why monotically increasing row keys are problematic in BigTable-like datastores: + monotonically increasing values are bad. The pile-up on a single region brought on + by monoticially increasing keys can be mitigated by randomizing the input records to not be in sorted order, but in general its best to avoid using a timestamp or a sequence (e.g., 1,2,3...) as the row-key. + + + + If you do need to upload time series data into HBase, you should + study OpenTSDB as a + successful example. It has a page describing the schema it uses in + HBase. The key format in OpenTSDB is effectively [metric_type][event_timestamp], which would appear at first glance to contradict the previous advice about not using a timestamp as the key. However, the difference is that the timestamp is not in the lead position of the key, and the design assumption is that there are dozens or hundreds (or more) of different metric types. Thus, even with a continual stream of input data with a mix of metric types, the Puts are distributed across various regions in the table. + +
+
+ Try to minimize row and column sizes + In HBase, values are always freighted with their coordinates; as a + cell value passes through the system, it'll be accompanied by its + row, column name, and timestamp. Always. If your rows and column names + are large, especially compared o the size of the cell value, then + you may run up against some interesting scenarios. One such is + the case described by Marc Limotte at the tail of + HBASE-3551 + (recommended!). + Therein, the indices that are kept on HBase storefiles (HFiles) + to facilitate random access may end up occupyng large chunks of the HBase + allotted RAM because the cell value coordinates are large. + Mark in the above cited comment suggests upping the block size so + entries in the store file index happen at a larger interval or + modify the table schema so it makes for smaller rows and column + names. + ` +
+
+ + Table Creation: Pre-Creating Regions + + +Tables in HBase are initially created with one region by default. For bulk imports, this means that all clients will write to the same region until it is large enough to split and become distributed across the cluster. A useful pattern to speed up the bulk import process is to pre-create empty regions. Be somewhat conservative in this, because too-many regions can actually degrade performance. An example of pre-creation using hex-keys is as follows (note: this example may need to be tweaked to the individual applications keys): + + + + public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits) + throws IOException { + try { + admin.createTable( table, splits ); + return true; + } catch (TableExistsException e) { + logger.info("table " + table.getNameAsString() + " already exists"); + // the table already exists... + return false; + } + } + public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) { + byte[][] splits = new byte[numRegions-1][]; + BigInteger lowestKey = new BigInteger(startKey, 16); + BigInteger highestKey = new BigInteger(endKey, 16); + BigInteger range = highestKey.subtract(lowestKey); + + BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions)); + lowestKey = lowestKey.add(regionIncrement); + for(int i=0; i < numRegions-1;i++) { + BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i))); + byte[] b = String.format("%016x", key).getBytes(); + splits[i] = b; + } + + return splits; + } + + +
+ +
+ + + HBase and MapReduce + See HBase and MapReduce + up in javadocs. +
+ HBase and MapTasks + When an HBase table is used as a MapReduce source, a map-task will be created for each region in the table. + Thus, if there are 100 regions in the table, there will be 100 map-tasks for the job - regardless of how many column families are selected in the Scan. + +
+
+ HBase Input MapReduce Example + To use HBase as a MapReduce source, the job would be configured via TableMapReduceUtil in the following manner... + +Job job = ...; +Scan scan = new Scan(); +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); +// set other scan attrs + +TableMapReduceUtil.initTableMapperJob( + tableName, // input HBase table name + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper + Text.class, // reducer key + LongWritable.class, // reducer value + job // job instance + ); + + ...and the mapper instance would extend TableMapper... + +public class MyMapper extends TableMapper<Text, LongWritable> { + + public void map(ImmutableBytesWritable row, Result value, Context context) + throws InterruptedException, IOException { + + // process data for the row from the Result instance. + + +
+
+ Accessing Other HBase Tables in a MapReduce Job + Although the framework currently allows one HBase table as input to a MapReduce job, other HBase tables can + be accessed as lookup tables, etc., in a MapReduce job via creating an HTable instance in the setup method of the Mapper. + +public class MyMapper extends TableMapper<Text, LongWritable> { + + private HTable myOtherTable; + + @Override + public void setup(Context context) { + myOtherTable = new HTable("myOtherTable"); + } + + +
+
+ + + Metrics +
+ Metric Setup + See Metrics. + +
+ +
+ Region Server Metrics +
<varname>hbase.regionserver.blockCacheCount</varname> + +
+
<varname>hbase.regionserver.blockCacheFree</varname> + +
+
<varname>hbase.regionserver.blockCacheHitRatio</varname> + +
+
<varname>hbase.regionserver.blockCacheSize</varname> + +
+
<varname>hbase.regionserver.fsReadLatency_avg_time</varname> + +
+
<varname>hbase.regionserver.fsReadLatency_num_ops</varname> + +
+
<varname>hbase.regionserver.fsSyncLatency_avg_time</varname> + +
+
<varname>hbase.regionserver.fsSyncLatency_num_ops</varname> + +
+
<varname>hbase.regionserver.fsWriteLatency_avg_time</varname> + +
+
<varname>hbase.regionserver.fsWriteLatency_num_ops</varname> + +
+
<varname>hbase.regionserver.memstoreSizeMB</varname> + +
+
<varname>hbase.regionserver.regions</varname> + +
+
<varname>hbase.regionserver.requests</varname> + +
+
<varname>hbase.regionserver.storeFileIndexSizeMB</varname> + +
+
<varname>hbase.regionserver.stores</varname> + +
+
+ +
+ + + Cluster Replication + See Cluster Replication. + + + + @@ -1737,7 +1918,7 @@ -
+
Region Size Region size is one of those tricky things, there are a few factors @@ -1927,6 +2108,89 @@
+
+ Configuration + See the section on recommended configurations. + +
+ Number of Regions + The number of regions for an HBase table is driven by the filesize. Also, see the + architecture section on region size +
+
+ Managing Compactions + For larger systems, managing compactions and splits may be something you want to consider. +
+
+ Compression + Production systems should use compression such as LZO compression with their column family definitions. +
+
+
+ Number of Column Families + See the section on Number of Column Families. +
+
+ Data Clumping + If all your data is being written to one region, then re-read the section on processing timeseries data. +
+
+ Batch Loading + See the section on Pre Creating Regions as well as bulk loading +
+
+ HBase Client +
+ AutoFlush + + When performing a lot of Puts, make sure that setAutoFlush is set to false on HTable instance. Otherwise, the Puts will be sent one at a time to the regionserver. + Puts added via... + +htable.add(Put); + +... and ... + +htable.add( <List> Put); + +... wind up in the same write buffer. If autoFlush=false, these messages are not sent until the write-buffer is filled. To explicitly flush the messages, call .flushCommits(). Calling .close() on the htable instance will invoke flushCommits(). + +
+
+ Scan Caching + If HBase is used as an input source for a MapReduce job, for example, make sure that the input Scan instance to the MapReduce job has setCaching set to something greater than the default (which is 1). + Using the default value means that the map-task will make call back to the region-server for every record processed. Setting this value to 500, for example, will + transfer 500 rows at a time to the client to be processed. There is a cost/benefit to have the cache value be large because it costs more in memory for both client and regionserver, so bigger isn't always better. +
+
+ Close ResultScanners + This isn't so much about improving performance but rather avoiding performance problems. If you forget to close ResultScanners you can cause problems on the regionservers. + Always have ResultScanner processing enclosed in try/catch blocks... + +Scan scan = new Scan(); +// set attrs... +ResultScanner rs = htable.getScanner(scan); +try { + for (Result r = rs.next(); r != null; r = rs.next()) { + // process result... + } + +} finally { + rs.close(); // always close the ResultScanner! +} +htable.close(); + + +
+
+ Block Cache + Scan instances can be set to use the + block cache in the region server via the setCacheBlocks method. For input Scans to MapReduce jobs, this should be false. For frequently access rows, it is advisable to use the block cache. + +
+