Index: src/docbkx/book.xml =================================================================== --- src/docbkx/book.xml (revision 1183716) +++ src/docbkx/book.xml (working copy) @@ -64,312 +64,529 @@ + + Data Model + In short, applications store data into an HBase table. + Tables are made of rows and columns. + All columns in HBase belong to a particular column family. + Table cells -- the intersection of row and column + coordinates -- are versioned. + A cell’s content is an uninterpreted array of bytes. + + Table row keys are also byte arrays so almost anything can + serve as a row key from strings to binary representations of longs or + even serialized data structures. Rows in HBase tables + are sorted by row key. The sort is byte-ordered. All table accesses are + via the table row key -- its primary key. + +
Conceptual View + + The following example is a slightly modified form of the one on page + 2 of the BigTable paper. + There is a table called webtable that contains two column families named + contents and anchor. + In this example, anchor contains two + columns (anchor:cssnsi.com, anchor:my.look.ca) + and contents contains one column (contents:html). + + Column Names + + By convention, a column name is made of its column family prefix and a + qualifier. For example, the + column + contents:html is of the column family contents + The colon character (:) delimits the column family from the + column family qualifier. + + + Table <varname>webtable</varname> + + + + + + + Row KeyTime StampColumnFamily contentsColumnFamily anchor + + + "com.cnn.www"t9anchor:cnnsi.com = "CNN" + "com.cnn.www"t8anchor:my.look.ca = "CNN.com" + "com.cnn.www"t6contents:html = "<html>..." + "com.cnn.www"t5contents:html = "<html>..." + "com.cnn.www"t3contents:html = "<html>..." + + +
+
+
+
Physical View + + Although at a conceptual level tables may be viewed as a sparse set of rows. + Physically they are stored on a per-column family basis. New columns + (i.e., columnfamily:column) can be added to any + column family without pre-announcing them. + ColumnFamily <varname>anchor</varname> + + + + + + Row KeyTime StampColumn Family anchor + + + "com.cnn.www"t9anchor:cnnsi.com = "CNN" + "com.cnn.www"t8anchor:my.look.ca = "CNN.com" + + +
+ ColumnFamily <varname>contents</varname> + + + + + + Row KeyTime StampColumnFamily "contents:" + + + "com.cnn.www"t6contents:html = "<html>..." + "com.cnn.www"t5contents:html = "<html>..." + "com.cnn.www"t3contents:html = "<html>..." + + +
+ It is important to note in the diagram above that the empty cells shown in the + conceptual view are not stored since they need not be in a column-oriented + storage format. Thus a request for the value of the contents:html + column at time stamp t8 would return no value. Similarly, a + request for an anchor:my.look.ca value at time stamp + t9 would return no value. However, if no timestamp is + supplied, the most recent value for a particular column would be returned + and would also be the first one found since timestamps are stored in + descending order. Thus a request for the values of all columns in the row + com.cnn.www if no timestamp is specified would be: + the value of contents:html from time stamp + t6, the value of anchor:cnnsi.com + from time stamp t9, the value of + anchor:my.look.ca from time stamp t8. +
+
- - HBase and MapReduce - See HBase and MapReduce up in javadocs. - Start there. Below is some additional help. -
- Map-Task Spitting -
- The Default HBase MapReduce Splitter - When TableInputFormat - is used to source an HBase table in a MapReduce job, - its splitter will make a map task for each region of the table. - Thus, if there are 100 regions in the table, there will be - 100 map-tasks for the job - regardless of how many column families are selected in the Scan. +
+ Table + + Tables are declared up front at schema definition time. +
-
- Custom Splitters - For those interested in implementing custom splitters, see the method getSplits in - TableInputFormatBase. - That is where the logic for map-task assignment resides. - + +
+ Row + Row keys are uninterrpreted bytes. Rows are + lexicographically sorted with the lowest order appearing first + in a table. The empty byte array is used to denote both the + start and end of a tables' namespace.
-
-
- HBase MapReduce Examples -
- HBase MapReduce Read Example - The following is an example of using HBase as a MapReduce source in read-only manner. Specifically, - there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. There job would be defined - as follows... - -Configuration config = HBaseConfiguration.create(); -Job job = new Job(config, "ExampleRead"); -job.setJarByClass(MyReadJob.class); // class that contains mapper - -Scan scan = new Scan(); -scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs -scan.setCacheBlocks(false); // don't set to true for MR jobs -// set other scan attrs -... - -TableMapReduceUtil.initTableMapperJob( - tableName, // input HBase table name - scan, // Scan instance to control CF and attribute selection - MyMapper.class, // mapper - null, // mapper output key - null, // mapper output value - job); -job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper - -boolean b = job.waitForCompletion(true); -if (!b) { - throw new IOException("error with job!"); -} - - ...and the mapper instance would extend TableMapper... - -public static class MyMapper extends TableMapper<Text, Text> { - public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { - // process data for the row from the Result instance. - } -} - - -
-
- HBase MapReduce Read/Write Example - The following is an example of using HBase both as a source and as a sink with MapReduce. - This example will simply copy data from one table to another. - -Configuration config = HBaseConfiguration.create(); -Job job = new Job(config,"ExampleReadWrite"); -job.setJarByClass(MyReadWriteJob.class); // class that contains mapper - -Scan scan = new Scan(); -scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs -scan.setCacheBlocks(false); // don't set to true for MR jobs -// set other scan attrs - -TableMapReduceUtil.initTableMapperJob( - sourceTable, // input table - scan, // Scan instance to control CF and attribute selection - MyMapper.class, // mapper class - null, // mapper output key - null, // mapper output value - job); -TableMapReduceUtil.initTableReducerJob( - targetTable, // output table - null, // reducer class - job); -job.setNumReduceTasks(0); - -boolean b = job.waitForCompletion(true); -if (!b) { - throw new IOException("error with job!"); -} - - An explanation is required of what TableMapReduceUtil is doing, especially with the reducer. - TableOutputFormat is being used - as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as - well as setting the reducer output key to ImmutableBytesWritable and reducer value to Writable. - These could be set by the programmer on the job and conf, but TableMapReduceUtil tries to make things easier. - The following is the example mapper, which will create a Put and matching the input Result - and emit it. Note: this is what the CopyTable utility does. - - -public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { +
+ Column Family<indexterm><primary>Column Family</primary></indexterm> + + Columns in HBase are grouped into column families. + All column members of a column family have the same prefix. For example, the + columns courses:history and + courses:math are both members of the + courses column family. + The colon character (:) delimits the column family from the + column family qualifierColumn Family Qualifier. + The column family prefix must be composed of + printable characters. The qualifying tail, the + column family qualifier, can be made of any + arbitrary bytes. Column families must be declared up front + at schema definition time whereas columns do not need to be + defined at schema time but can be conjured on the fly while + the table is up an running. + Physically, all column family members are stored together on the + filesystem. Because tunings and + storage specifications are done at the column family level, it is + advised that all column family members have the same general access + pattern and size characteristics. - public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { - // this example is just copying the data from the source table... - context.write(row, resultToPut(row,value)); - } - - private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { - Put put = new Put(key.get()); - for (KeyValue kv : result.raw()) { - put.add(kv); - } - return put; - } -} - - There isn't actually a reducer step, so TableOutputFormat takes care of sending the Put - to the target table. - - This is just an example, developers could choose not to use TableOutputFormat and connect to the - target table themselves. - - +
-
- HBase MapReduce Summary Example - The following example uses HBase as a MapReduce source and sink with a summarization step. This example will - count the number of distinct instances of a value in a table and write those summarized counts in another table. - -Configuration config = HBaseConfiguration.create(); -Job job = new Job(config,"ExampleSummary"); -job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer - +
+ Cells<indexterm><primary>Cells</primary></indexterm> + A {row, column, version} tuple exactly + specifies a cell in HBase. + Cell content is uninterrpreted bytes +
+
+ Data Model Operations + The four primary data model operations are Get, Put, Scan, and Delete. Operations are applied via + HTable instances. + +
+ Get + Get returns + attributes for a specified row. Gets are executed via + + HTable.get. + +
+
+ Put + Put either + adds new rows to a table (if the key is new) or can update existing rows (if the key already exists). Puts are executed via + + HTable.put (writeBuffer) or + HTable.batch (non-writeBuffer). + +
+
+ Scans + Scan allow + iteration over multiple rows for specified attributes. + + The following is an example of a + on an HTable table instance. Assume that a table is populated with rows with keys "row1", "row2", "row3", + and then another set of rows with the keys "abc1", "abc2", and "abc3". The following example shows how startRow and stopRow + can be applied to a Scan instance to return the rows beginning with "row". + +HTable htable = ... // instantiate HTable + Scan scan = new Scan(); -scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs -scan.setCacheBlocks(false); // don't set to true for MR jobs -// set other scan attrs - -TableMapReduceUtil.initTableMapperJob( - sourceTable, // input table - scan, // Scan instance to control CF and attribute selection - MyMapper.class, // mapper class - Text.class, // mapper output key - IntWritable.class, // mapper output value - job); -TableMapReduceUtil.initTableReducerJob( - targetTable, // output table - MyReducer.class, // reducer class - job); -job.setNumReduceTasks(1); // at least one, adjust as required - -boolean b = job.waitForCompletion(true); -if (!b) { - throw new IOException("error with job!"); -} - - In this example mapper a column with a String-value is chosen as the value to summarize upon. - This value is used as the key to emit from the mapper, and an IntWritable represents an instance counter. - -public static class MyMapper extends TableMapper<Text, IntWritable> { - - private final IntWritable ONE = new IntWritable(1); - private Text text = new Text(); - - public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { - String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1"))); - text.set(val); // we can only emit Writables... - - context.write(text, ONE); - } -} - - In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a Put. - -public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { - - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { - int i = 0; - for (IntWritable val : values) { - i += val.get(); - } - Put put = new Put(Bytes.toBytes(key.toString())); - put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); - - context.write(null, put); - } +scan.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("attr")); +scan.setStartRow( Bytes.toBytes("row")); +scan.setStopRow( Bytes.toBytes("row" + new byte[] {0})); // note: stop key != start key +for(Result result : htable.getScanner(scan)) { + // process Result instance } - - - + + +
+
+ Delete + Delete removes + a row from a table. Deletes are executed via + + HTable.delete. + +
+
-
-
- Accessing Other HBase Tables in a MapReduce Job - Although the framework currently allows one HBase table as input to a - MapReduce job, other HBase tables can - be accessed as lookup tables, etc., in a - MapReduce job via creating an HTable instance in the setup method of the Mapper. - public class MyMapper extends TableMapper<Text, LongWritable> { - private HTable myOtherTable; - public void setup(Context context) { - myOtherTable = new HTable("myOtherTable"); - } - - public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { - // process Result... - // use 'myOtherTable' for lookups - } - - - -
-
- Speculative Execution - It is generally advisable to turn off speculative execution for - MapReduce jobs that use HBase as a source. This can either be done on a - per-Job basis through properties, on on the entire cluster. Especially - for longer running jobs, speculative execution will create duplicate - map-tasks which will double-write your data to HBase; this is probably - not what you want. - -
- - - HBase and Schema Design - A good general introduction on the strength and weaknesses modelling on - the various non-rdbms datastores is Ian Varleys' Master thesis, - No Relation: The Mixed Blessings of Non-Relational Databases. - Recommended. Also, read for how HBase stores data internally. - -
- - Schema Creation - - HBase schemas can be created or updated with - or by using HBaseAdmin in the Java API. - - Tables must be disabled when making ColumnFamily modifications, for example.. - -Configuration config = HBaseConfiguration.create(); -HBaseAdmin admin = new HBaseAdmin(conf); -String table = "myTable"; +
+ Versions<indexterm><primary>Versions</primary></indexterm> -admin.disableTable(table); + A {row, column, version} tuple exactly + specifies a cell in HBase. Its possible to have an + unbounded number of cells where the row and column are the same but the + cell address differs only in its version dimension. -HColumnDescriptor cf1 = ...; -admin.addColumn(table, cf1 ); // adding new ColumnFamily -HColumnDescriptor cf2 = ...; -admin.modifyColumn(table, cf2 ); // modifying existing ColumnFamily + While rows and column keys are expressed as bytes, the version is + specified using a long integer. Typically this long contains time + instances such as those returned by + java.util.Date.getTime() or + System.currentTimeMillis(), that is: the difference, + measured in milliseconds, between the current time and midnight, January + 1, 1970 UTC. -admin.enableTable(table); - - See for more information about configuring client connections. - - -
-
- - On the number of column families - - - HBase currently does not do well with anything above two or three column families so keep the number - of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so - if one column family is carrying the bulk of the data bringing on flushes, the adjacent families - will also be flushed though the amount of data they carry is small. Compaction is currently triggered - by the total number of files under a column family. Its not size based. When many column families the - flushing and compaction interaction can make for a bunch of needless i/o loading (To be addressed by - changing flushing and compaction to work on a per column family basis). - - Try to make do with one column family if you can in your schemas. Only introduce a - second and third column family in the case where data access is usually column scoped; - i.e. you query one column family or the other but usually not both at the one time. - -
-
Rowkey Design -
- - Monotonically Increasing Row Keys/Timeseries Data - - - In the HBase chapter of Tom White's book Hadoop: The Definitive Guide (O'Reilly) there is a an optimization note on watching out for a phenomenon where an import process walks in lock-step with all clients in concert pounding one of the table's regions (and thus, a single node), then moving onto the next region, etc. With monotonically increasing row-keys (i.e., using a timestamp), this will happen. See this comic by IKai Lan on why monotonically increasing row keys are problematic in BigTable-like datastores: - monotonically increasing values are bad. The pile-up on a single region brought on - by monotonically increasing keys can be mitigated by randomizing the input records to not be in sorted order, but in general its best to avoid using a timestamp or a sequence (e.g. 1, 2, 3) as the row-key. - + The HBase version dimension is stored in decreasing order, so that + when reading from a store file, the most recent values are found + first. + There is a lot of confusion over the semantics of + cell versions, in HBase. In particular, a couple + questions that often come up are: + + If multiple writes to a cell have the same version, are all + versions maintained or just the last? + Currently, only the last written is fetchable. + + - If you do need to upload time series data into HBase, you should - study OpenTSDB as a - successful example. It has a page describing the schema it uses in - HBase. The key format in OpenTSDB is effectively [metric_type][event_timestamp], which would appear at first glance to contradict the previous advice about not using a timestamp as the key. However, the difference is that the timestamp is not in the lead position of the key, and the design assumption is that there are dozens or hundreds (or more) of different metric types. Thus, even with a continual stream of input data with a mix of metric types, the Puts are distributed across various points of regions in the table. - -
-
- Try to minimize row and column sizes - Or why are my StoreFile indices large? - In HBase, values are always freighted with their coordinates; as a + + Is it OK to write cells in a non-increasing version + order? + Yes + + + + + Below we describe how the version dimension in HBase currently + works + See HBASE-2406 + for discussion of HBase versions. Bending time + in HBase makes for a good read on the version, or time, + dimension in HBase. It has more detail on versioning than is + provided here. As of this writing, the limiitation + Overwriting values at existing timestamps + mentioned in the article no longer holds in HBase. This section is + basically a synopsis of this article by Bruno Dumon. + . + +
+ Versions and HBase Operations + + In this section we look at the behavior of the version dimension + for each of the core HBase operations. + +
+ Get/Scan + + Gets are implemented on top of Scans. The below discussion of + Get applies equally to Scans. + + By default, i.e. if you specify no explicit version, when + doing a get, the cell whose version has the + largest value is returned (which may or may not be the latest one + written, see later). The default behavior can be modified in the + following ways: + + + + to return more than one version, see Get.setMaxVersions() + + + + to return versions other than the latest, see Get.setTimeRange() + + To retrieve the latest version that is less than or equal + to a given value, thus giving the 'latest' state of the record + at a certain point in time, just use a range from 0 to the + desired version and set the max versions to 1. + + + +
+
+ Default Get Example + The following Get will only retrieve the current version of the row + +Get get = new Get(Bytes.toBytes("row1")); +Result r = htable.get(get); +byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value + + +
+
+ Versioned Get Example + The following Get will return the last 3 versions of the row. + +Get get = new Get(Bytes.toBytes("row1")); +get.setMaxVersions(3); // will return last 3 versions of row +Result r = htable.get(get); +byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value +List<KeyValue> kv = r.getColumn(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns all versions of this column + + +
+ +
+ Put + + Doing a put always creates a new version of a + cell, at a certain timestamp. By default the + system uses the server's currentTimeMillis, but + you can specify the version (= the long integer) yourself, on a + per-column level. This means you could assign a time in the past or + the future, or use the long value for non-time purposes. + + To overwrite an existing value, do a put at exactly the same + row, column, and version as that of the cell you would + overshadow. +
+ Implicit Version Example + The following Put will be implicitly versioned by HBase with the current time. + +Put put = new Put(Bytes.toBytes(row)); +put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data)); +htable.put(put); + + +
+
+ Explicit Version Example + The following Put has the version timestamp explicitly set. + +Put put = new Put( Bytes.toBytes(row)); +long explicitTimeInMs = 555; // just an example +put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), explicitTimeInMs, Bytes.toBytes(data)); +htable.put(put); + + Caution: the version timestamp is internally by HBase for things like time-to-live calculations. + It's usually best to avoid setting the timestamp yourself. + +
+ +
+ +
+ Delete + + When performing a delete operation in HBase, there are two + ways to specify the versions to be deleted + + + + Delete all versions older than a certain timestamp + + + + Delete the version at a specific timestamp + + + + A delete can apply to a complete row, a complete column + family, or to just one column. It is only in the last case that you + can delete explicit versions. For the deletion of a row or all the + columns within a family, it always works by deleting all cells older + than a certain version. + + Deletes work by creating tombstone + markers. For example, let's suppose we want to delete a row. For + this you can specify a version, or else by default the + currentTimeMillis is used. What this means is + delete all cells where the version is less than or equal to + this version. HBase never modifies data in place, so for + example a delete will not immediately delete (or mark as deleted) + the entries in the storage file that correspond to the delete + condition. Rather, a so-called tombstone is + written, which will mask the deleted values + When HBase does a major compaction, the tombstones are + processed to actually remove the dead values, together with the + tombstones themselves. + . If the version you specified when deleting a row is + larger than the version of any value in the row, then you can + consider the complete row to be deleted. +
+
+ +
+ Current Limitations + + There are still some bugs (or at least 'undecided behavior') + with the version dimension that will be addressed by later HBase + releases. + +
+ Deletes mask Puts + + Deletes mask puts, even puts that happened after the delete + was entered + HBASE-2256 + . Remember that a delete writes a tombstone, which only + disappears after then next major compaction has run. Suppose you do + a delete of everything <= T. After this you do a new put with a + timestamp <= T. This put, even if it happened after the delete, + will be masked by the delete tombstone. Performing the put will not + fail, but when you do a get you will notice the put did have no + effect. It will start working again after the major compaction has + run. These issues should not be a problem if you use + always-increasing versions for new puts to a row. But they can occur + even if you do not care about time: just do delete and put + immediately after each other, and there is some chance they happen + within the same millisecond. +
+ +
+ Major compactions change query results + + ...create three cell versions at t1, t2 and t3, with a + maximum-versions setting of 2. So when getting all versions, only + the values at t2 and t3 will be returned. But if you delete the + version at t2 or t3, the one at t1 will appear again. Obviously, + once a major compaction has run, such behavior will not be the case + anymore... + See Garbage Collection in Bending + time in HBase + +
+
+
+ + + + HBase and Schema Design + A good general introduction on the strength and weaknesses modelling on + the various non-rdbms datastores is Ian Varleys' Master thesis, + No Relation: The Mixed Blessings of Non-Relational Databases. + Recommended. Also, read for how HBase stores data internally. + +
+ + Schema Creation + + HBase schemas can be created or updated with + or by using HBaseAdmin in the Java API. + + Tables must be disabled when making ColumnFamily modifications, for example.. + +Configuration config = HBaseConfiguration.create(); +HBaseAdmin admin = new HBaseAdmin(conf); +String table = "myTable"; + +admin.disableTable(table); + +HColumnDescriptor cf1 = ...; +admin.addColumn(table, cf1 ); // adding new ColumnFamily +HColumnDescriptor cf2 = ...; +admin.modifyColumn(table, cf2 ); // modifying existing ColumnFamily + +admin.enableTable(table); + + See for more information about configuring client connections. + + +
+
+ + On the number of column families + + + HBase currently does not do well with anything above two or three column families so keep the number + of column families in your schema low. Currently, flushing and compactions are done on a per Region basis so + if one column family is carrying the bulk of the data bringing on flushes, the adjacent families + will also be flushed though the amount of data they carry is small. Compaction is currently triggered + by the total number of files under a column family. Its not size based. When many column families the + flushing and compaction interaction can make for a bunch of needless i/o loading (To be addressed by + changing flushing and compaction to work on a per column family basis). + + Try to make do with one column family if you can in your schemas. Only introduce a + second and third column family in the case where data access is usually column scoped; + i.e. you query one column family or the other but usually not both at the one time. + +
+
Rowkey Design +
+ + Monotonically Increasing Row Keys/Timeseries Data + + + In the HBase chapter of Tom White's book Hadoop: The Definitive Guide (O'Reilly) there is a an optimization note on watching out for a phenomenon where an import process walks in lock-step with all clients in concert pounding one of the table's regions (and thus, a single node), then moving onto the next region, etc. With monotonically increasing row-keys (i.e., using a timestamp), this will happen. See this comic by IKai Lan on why monotonically increasing row keys are problematic in BigTable-like datastores: + monotonically increasing values are bad. The pile-up on a single region brought on + by monotonically increasing keys can be mitigated by randomizing the input records to not be in sorted order, but in general its best to avoid using a timestamp or a sequence (e.g. 1, 2, 3) as the row-key. + + + + If you do need to upload time series data into HBase, you should + study OpenTSDB as a + successful example. It has a page describing the schema it uses in + HBase. The key format in OpenTSDB is effectively [metric_type][event_timestamp], which would appear at first glance to contradict the previous advice about not using a timestamp as the key. However, the difference is that the timestamp is not in the lead position of the key, and the design assumption is that there are dozens or hundreds (or more) of different metric types. Thus, even with a continual stream of input data with a mix of metric types, the Puts are distributed across various points of regions in the table. + +
+
+ Try to minimize row and column sizes + Or why are my StoreFile indices large? + In HBase, values are always freighted with their coordinates; as a cell value passes through the system, it'll be accompanied by its row, column name, and timestamp - always. If your rows and column names are large, especially compared to the size of the cell value, then @@ -605,461 +822,240 @@
-
- - - - Data Model - In short, applications store data into an HBase table. - Tables are made of rows and columns. - All columns in HBase belong to a particular column family. - Table cells -- the intersection of row and column - coordinates -- are versioned. - A cell’s content is an uninterpreted array of bytes. - - Table row keys are also byte arrays so almost anything can - serve as a row key from strings to binary representations of longs or - even serialized data structures. Rows in HBase tables - are sorted by row key. The sort is byte-ordered. All table accesses are - via the table row key -- its primary key. - + -
Conceptual View - - The following example is a slightly modified form of the one on page - 2 of the BigTable paper. - There is a table called webtable that contains two column families named - contents and anchor. - In this example, anchor contains two - columns (anchor:cssnsi.com, anchor:my.look.ca) - and contents contains one column (contents:html). - - Column Names - - By convention, a column name is made of its column family prefix and a - qualifier. For example, the - column - contents:html is of the column family contents - The colon character (:) delimits the column family from the - column family qualifier. - - - Table <varname>webtable</varname> - - - - - - - Row KeyTime StampColumnFamily contentsColumnFamily anchor - - - "com.cnn.www"t9anchor:cnnsi.com = "CNN" - "com.cnn.www"t8anchor:my.look.ca = "CNN.com" - "com.cnn.www"t6contents:html = "<html>..." - "com.cnn.www"t5contents:html = "<html>..." - "com.cnn.www"t3contents:html = "<html>..." - - -
-
-
-
Physical View - - Although at a conceptual level tables may be viewed as a sparse set of rows. - Physically they are stored on a per-column family basis. New columns - (i.e., columnfamily:column) can be added to any - column family without pre-announcing them. - ColumnFamily <varname>anchor</varname> - - - - - - Row KeyTime StampColumn Family anchor - - - "com.cnn.www"t9anchor:cnnsi.com = "CNN" - "com.cnn.www"t8anchor:my.look.ca = "CNN.com" - - -
- ColumnFamily <varname>contents</varname> - - - - - - Row KeyTime StampColumnFamily "contents:" - - - "com.cnn.www"t6contents:html = "<html>..." - "com.cnn.www"t5contents:html = "<html>..." - "com.cnn.www"t3contents:html = "<html>..." - - -
- It is important to note in the diagram above that the empty cells shown in the - conceptual view are not stored since they need not be in a column-oriented - storage format. Thus a request for the value of the contents:html - column at time stamp t8 would return no value. Similarly, a - request for an anchor:my.look.ca value at time stamp - t9 would return no value. However, if no timestamp is - supplied, the most recent value for a particular column would be returned - and would also be the first one found since timestamps are stored in - descending order. Thus a request for the values of all columns in the row - com.cnn.www if no timestamp is specified would be: - the value of contents:html from time stamp - t6, the value of anchor:cnnsi.com - from time stamp t9, the value of - anchor:my.look.ca from time stamp t8. -
-
- -
- Table - - Tables are declared up front at schema definition time. - -
- -
- Row - Row keys are uninterrpreted bytes. Rows are - lexicographically sorted with the lowest order appearing first - in a table. The empty byte array is used to denote both the - start and end of a tables' namespace. -
- -
- Column Family<indexterm><primary>Column Family</primary></indexterm> - - Columns in HBase are grouped into column families. - All column members of a column family have the same prefix. For example, the - columns courses:history and - courses:math are both members of the - courses column family. - The colon character (:) delimits the column family from the - column family qualifierColumn Family Qualifier. - The column family prefix must be composed of - printable characters. The qualifying tail, the - column family qualifier, can be made of any - arbitrary bytes. Column families must be declared up front - at schema definition time whereas columns do not need to be - defined at schema time but can be conjured on the fly while - the table is up an running. - Physically, all column family members are stored together on the - filesystem. Because tunings and - storage specifications are done at the column family level, it is - advised that all column family members have the same general access - pattern and size characteristics. - - + + HBase and MapReduce + See HBase and MapReduce up in javadocs. + Start there. Below is some additional help. +
+ Map-Task Spitting +
+ The Default HBase MapReduce Splitter + When TableInputFormat + is used to source an HBase table in a MapReduce job, + its splitter will make a map task for each region of the table. + Thus, if there are 100 regions in the table, there will be + 100 map-tasks for the job - regardless of how many column families are selected in the Scan.
-
- Cells<indexterm><primary>Cells</primary></indexterm> - A {row, column, version} tuple exactly - specifies a cell in HBase. - Cell content is uninterrpreted bytes +
+ Custom Splitters + For those interested in implementing custom splitters, see the method getSplits in + TableInputFormatBase. + That is where the logic for map-task assignment resides. +
-
- Data Model Operations - The four primary data model operations are Get, Put, Scan, and Delete. Operations are applied via - HTable instances. - -
- Get - Get returns - attributes for a specified row. Gets are executed via - - HTable.get. - -
-
- Put - Put either - adds new rows to a table (if the key is new) or can update existing rows (if the key already exists). Puts are executed via - - HTable.put (writeBuffer) or - HTable.batch (non-writeBuffer). - -
-
- Scans - Scan allow - iteration over multiple rows for specified attributes. - - The following is an example of a - on an HTable table instance. Assume that a table is populated with rows with keys "row1", "row2", "row3", - and then another set of rows with the keys "abc1", "abc2", and "abc3". The following example shows how startRow and stopRow - can be applied to a Scan instance to return the rows beginning with "row". - -HTable htable = ... // instantiate HTable - +
+
+ HBase MapReduce Examples +
+ HBase MapReduce Read Example + The following is an example of using HBase as a MapReduce source in read-only manner. Specifically, + there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper. There job would be defined + as follows... + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config, "ExampleRead"); +job.setJarByClass(MyReadJob.class); // class that contains mapper + Scan scan = new Scan(); -scan.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("attr")); -scan.setStartRow( Bytes.toBytes("row")); -scan.setStopRow( Bytes.toBytes("row" + new byte[] {0})); // note: stop key != start key -for(Result result : htable.getScanner(scan)) { - // process Result instance +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs +... + +TableMapReduceUtil.initTableMapperJob( + tableName, // input HBase table name + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper + null, // mapper output key + null, // mapper output value + job); +job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper + +boolean b = job.waitForCompletion(true); +if (!b) { + throw new IOException("error with job!"); } - - -
-
- Delete - Delete removes - a row from a table. Deletes are executed via - - HTable.delete. - -
- -
- - -
- Versions<indexterm><primary>Versions</primary></indexterm> - - A {row, column, version} tuple exactly - specifies a cell in HBase. Its possible to have an - unbounded number of cells where the row and column are the same but the - cell address differs only in its version dimension. - - While rows and column keys are expressed as bytes, the version is - specified using a long integer. Typically this long contains time - instances such as those returned by - java.util.Date.getTime() or - System.currentTimeMillis(), that is: the difference, - measured in milliseconds, between the current time and midnight, January - 1, 1970 UTC. - - The HBase version dimension is stored in decreasing order, so that - when reading from a store file, the most recent values are found - first. - - There is a lot of confusion over the semantics of - cell versions, in HBase. In particular, a couple - questions that often come up are: - - If multiple writes to a cell have the same version, are all - versions maintained or just the last? - Currently, only the last written is fetchable. - - - - - Is it OK to write cells in a non-increasing version - order? - Yes - - - - - Below we describe how the version dimension in HBase currently - works - See HBASE-2406 - for discussion of HBase versions. Bending time - in HBase makes for a good read on the version, or time, - dimension in HBase. It has more detail on versioning than is - provided here. As of this writing, the limiitation - Overwriting values at existing timestamps - mentioned in the article no longer holds in HBase. This section is - basically a synopsis of this article by Bruno Dumon. - . - -
- Versions and HBase Operations - - In this section we look at the behavior of the version dimension - for each of the core HBase operations. - -
- Get/Scan - - Gets are implemented on top of Scans. The below discussion of - Get applies equally to Scans. - - By default, i.e. if you specify no explicit version, when - doing a get, the cell whose version has the - largest value is returned (which may or may not be the latest one - written, see later). The default behavior can be modified in the - following ways: - - - - to return more than one version, see Get.setMaxVersions() - - - - to return versions other than the latest, see Get.setTimeRange() - - To retrieve the latest version that is less than or equal - to a given value, thus giving the 'latest' state of the record - at a certain point in time, just use a range from 0 to the - desired version and set the max versions to 1. - - - -
-
- Default Get Example - The following Get will only retrieve the current version of the row - -Get get = new Get(Bytes.toBytes("row1")); -Result r = htable.get(get); -byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value - - -
-
- Versioned Get Example - The following Get will return the last 3 versions of the row. - -Get get = new Get(Bytes.toBytes("row1")); -get.setMaxVersions(3); // will return last 3 versions of row -Result r = htable.get(get); -byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value -List<KeyValue> kv = r.getColumn(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns all versions of this column - - -
- -
- Put - - Doing a put always creates a new version of a - cell, at a certain timestamp. By default the - system uses the server's currentTimeMillis, but - you can specify the version (= the long integer) yourself, on a - per-column level. This means you could assign a time in the past or - the future, or use the long value for non-time purposes. - - To overwrite an existing value, do a put at exactly the same - row, column, and version as that of the cell you would - overshadow. -
- Implicit Version Example - The following Put will be implicitly versioned by HBase with the current time. - -Put put = new Put(Bytes.toBytes(row)); -put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data)); -htable.put(put); - - -
-
- Explicit Version Example - The following Put has the version timestamp explicitly set. - -Put put = new Put( Bytes.toBytes(row)); -long explicitTimeInMs = 555; // just an example -put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), explicitTimeInMs, Bytes.toBytes(data)); -htable.put(put); - - Caution: the version timestamp is internally by HBase for things like time-to-live calculations. - It's usually best to avoid setting the timestamp yourself. - -
- -
- -
- Delete - - When performing a delete operation in HBase, there are two - ways to specify the versions to be deleted - - - - Delete all versions older than a certain timestamp - - - - Delete the version at a specific timestamp - - - - A delete can apply to a complete row, a complete column - family, or to just one column. It is only in the last case that you - can delete explicit versions. For the deletion of a row or all the - columns within a family, it always works by deleting all cells older - than a certain version. - - Deletes work by creating tombstone - markers. For example, let's suppose we want to delete a row. For - this you can specify a version, or else by default the - currentTimeMillis is used. What this means is - delete all cells where the version is less than or equal to - this version. HBase never modifies data in place, so for - example a delete will not immediately delete (or mark as deleted) - the entries in the storage file that correspond to the delete - condition. Rather, a so-called tombstone is - written, which will mask the deleted values - When HBase does a major compaction, the tombstones are - processed to actually remove the dead values, together with the - tombstones themselves. - . If the version you specified when deleting a row is - larger than the version of any value in the row, then you can - consider the complete row to be deleted. -
-
- -
- Current Limitations + + ...and the mapper instance would extend TableMapper... + +public static class MyMapper extends TableMapper<Text, Text> { - There are still some bugs (or at least 'undecided behavior') - with the version dimension that will be addressed by later HBase - releases. + public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { + // process data for the row from the Result instance. + } +} + + +
+
+ HBase MapReduce Read/Write Example + The following is an example of using HBase both as a source and as a sink with MapReduce. + This example will simply copy data from one table to another. + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config,"ExampleReadWrite"); +job.setJarByClass(MyReadWriteJob.class); // class that contains mapper + +Scan scan = new Scan(); +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs + +TableMapReduceUtil.initTableMapperJob( + sourceTable, // input table + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper class + null, // mapper output key + null, // mapper output value + job); +TableMapReduceUtil.initTableReducerJob( + targetTable, // output table + null, // reducer class + job); +job.setNumReduceTasks(0); + +boolean b = job.waitForCompletion(true); +if (!b) { + throw new IOException("error with job!"); +} + + An explanation is required of what TableMapReduceUtil is doing, especially with the reducer. + TableOutputFormat is being used + as the outputFormat class, and several parameters are being set on the config (e.g., TableOutputFormat.OUTPUT_TABLE), as + well as setting the reducer output key to ImmutableBytesWritable and reducer value to Writable. + These could be set by the programmer on the job and conf, but TableMapReduceUtil tries to make things easier. + The following is the example mapper, which will create a Put and matching the input Result + and emit it. Note: this is what the CopyTable utility does. + + +public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { -
- Deletes mask Puts + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + // this example is just copying the data from the source table... + context.write(row, resultToPut(row,value)); + } + + private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { + Put put = new Put(key.get()); + for (KeyValue kv : result.raw()) { + put.add(kv); + } + return put; + } +} + + There isn't actually a reducer step, so TableOutputFormat takes care of sending the Put + to the target table. + + This is just an example, developers could choose not to use TableOutputFormat and connect to the + target table themselves. + + +
+
+ HBase MapReduce Summary Example + The following example uses HBase as a MapReduce source and sink with a summarization step. This example will + count the number of distinct instances of a value in a table and write those summarized counts in another table. + +Configuration config = HBaseConfiguration.create(); +Job job = new Job(config,"ExampleSummary"); +job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer + +Scan scan = new Scan(); +scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs +scan.setCacheBlocks(false); // don't set to true for MR jobs +// set other scan attrs + +TableMapReduceUtil.initTableMapperJob( + sourceTable, // input table + scan, // Scan instance to control CF and attribute selection + MyMapper.class, // mapper class + Text.class, // mapper output key + IntWritable.class, // mapper output value + job); +TableMapReduceUtil.initTableReducerJob( + targetTable, // output table + MyReducer.class, // reducer class + job); +job.setNumReduceTasks(1); // at least one, adjust as required + +boolean b = job.waitForCompletion(true); +if (!b) { + throw new IOException("error with job!"); +} + + In this example mapper a column with a String-value is chosen as the value to summarize upon. + This value is used as the key to emit from the mapper, and an IntWritable represents an instance counter. + +public static class MyMapper extends TableMapper<Text, IntWritable> { - Deletes mask puts, even puts that happened after the delete - was entered - HBASE-2256 - . Remember that a delete writes a tombstone, which only - disappears after then next major compaction has run. Suppose you do - a delete of everything <= T. After this you do a new put with a - timestamp <= T. This put, even if it happened after the delete, - will be masked by the delete tombstone. Performing the put will not - fail, but when you do a get you will notice the put did have no - effect. It will start working again after the major compaction has - run. These issues should not be a problem if you use - always-increasing versions for new puts to a row. But they can occur - even if you do not care about time: just do delete and put - immediately after each other, and there is some chance they happen - within the same millisecond. -
+ private final IntWritable ONE = new IntWritable(1); + private Text text = new Text(); + + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr1"))); + text.set(val); // we can only emit Writables... -
- Major compactions change query results + context.write(text, ONE); + } +} + + In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a Put. + +public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { + + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { + int i = 0; + for (IntWritable val : values) { + i += val.get(); + } + Put put = new Put(Bytes.toBytes(key.toString())); + put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); - ...create three cell versions at t1, t2 and t3, with a - maximum-versions setting of 2. So when getting all versions, only - the values at t2 and t3 will be returned. But if you delete the - version at t2 or t3, the one at t1 will appear again. Obviously, - once a major compaction has run, such behavior will not be the case - anymore... - See Garbage Collection in Bending - time in HBase - -
-
+ context.write(null, put); + } +} + + +
- - +
+
+ Accessing Other HBase Tables in a MapReduce Job + Although the framework currently allows one HBase table as input to a + MapReduce job, other HBase tables can + be accessed as lookup tables, etc., in a + MapReduce job via creating an HTable instance in the setup method of the Mapper. + public class MyMapper extends TableMapper<Text, LongWritable> { + private HTable myOtherTable; + public void setup(Context context) { + myOtherTable = new HTable("myOtherTable"); + } + + public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { + // process Result... + // use 'myOtherTable' for lookups + } + + + +
+
+ Speculative Execution + It is generally advisable to turn off speculative execution for + MapReduce jobs that use HBase as a source. This can either be done on a + per-Job basis through properties, on on the entire cluster. Especially + for longer running jobs, speculative execution will create duplicate + map-tasks which will double-write your data to HBase; this is probably + not what you want. + +
+ + Architecture @@ -1715,13 +1711,13 @@
-
+ + - - +