From 44065fa1fdd1871cade426b0e80b9ed2a2058ca7 Mon Sep 17 00:00:00 2001 From: Matt Warhaftig Date: Sat, 16 Apr 2016 20:49:26 -0400 Subject: [PATCH] HBASE-15287 Allow for binary row values in HBase MapReduce jobs. --- .../hadoop/hbase/mapred/GroupingTableMap.java | 2 +- .../apache/hadoop/hbase/mapreduce/CellCounter.java | 2 +- .../apache/hadoop/hbase/mapreduce/CopyTable.java | 4 +-- .../org/apache/hadoop/hbase/mapreduce/Export.java | 6 ++-- .../hbase/mapreduce/GroupingTableMapper.java | 2 +- .../apache/hadoop/hbase/mapreduce/RowCounter.java | 4 +-- .../mapreduce/SimpleTotalOrderPartitioner.java | 2 +- .../hadoop/hbase/mapreduce/TableInputFormat.java | 4 +-- .../hadoop/hbase/mapreduce/TestCellCounter.java | 42 ++++++++++++++++++++-- .../hadoop/hbase/mapreduce/TestCopyTable.java | 20 +++++------ .../hadoop/hbase/mapreduce/TestImportExport.java | 16 +++++++-- 11 files changed, 77 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java index 6cd0602..ee6da75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java @@ -154,6 +154,6 @@ implements TableMap { } sb.append(Bytes.toString(vals[i])); } - return new ImmutableBytesWritable(Bytes.toBytes(sb.toString())); + return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index 461ea6d..b67932f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -236,7 +236,7 @@ public class CellCounter { String regexPattern = filterCriteria.substring(1, filterCriteria.length()); rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern)); } else { - rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria)); + rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); } return rowFilter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 243e243..867b7df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -116,11 +116,11 @@ public class CopyTable extends Configured implements Tool { } if (startRow != null) { - scan.setStartRow(Bytes.toBytes(startRow)); + scan.setStartRow(Bytes.toBytesBinary(startRow)); } if (stopRow != null) { - scan.setStopRow(Bytes.toBytes(stopRow)); + scan.setStopRow(Bytes.toBytesBinary(stopRow)); } if(families != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index 67a9f7a..6a5e265 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -96,10 +96,10 @@ public class Export { s.setCacheBlocks(false); // set Start and Stop row if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { - s.setStartRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_START))); + s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); } if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { - s.setStopRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_STOP))); + s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); } // Set Scan Column Family boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); @@ -138,7 +138,7 @@ public class Export { String regexPattern = filterCriteria.substring(1, filterCriteria.length()); exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); } else { - exportFilter = new PrefixFilter(Bytes.toBytes(filterCriteria)); + exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); } return exportFilter; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java index e9c8927..8a9fa49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java @@ -145,7 +145,7 @@ extends TableMapper implements Configurable { } sb.append(Bytes.toString(vals[i])); } - return new ImmutableBytesWritable(Bytes.toBytes(sb.toString())); + return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 3531b16..f278a69 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -132,10 +132,10 @@ public class RowCounter { Scan scan = new Scan(); scan.setCacheBlocks(false); if (startKey != null && !startKey.equals("")) { - scan.setStartRow(Bytes.toBytes(startKey)); + scan.setStartRow(Bytes.toBytesBinary(startKey)); } if (endKey != null && !endKey.equals("")) { - scan.setStopRow(Bytes.toBytes(endKey)); + scan.setStopRow(Bytes.toBytesBinary(endKey)); } if (sb.length() > 0) { for (String columnName : sb.toString().trim().split(" ")) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java index 4cc7a8a..5370477 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java @@ -94,7 +94,7 @@ implements Configurable { } LOG.warn("Using deprecated configuration " + deprecatedKey + " - please use static accessor methods instead."); - return Bytes.toBytes(oldStyleVal); + return Bytes.toBytesBinary(oldStyleVal); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 814d82c..ebeb158 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -129,11 +129,11 @@ implements Configurable { scan = new Scan(); if (conf.get(SCAN_ROW_START) != null) { - scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START))); + scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); } if (conf.get(SCAN_ROW_STOP) != null) { - scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP))); + scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); } if (conf.get(SCAN_COLUMNS) != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java index 41b76ba..f3f4516 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java @@ -45,8 +45,8 @@ import static org.junit.Assert.fail; @Category(LargeTests.class) public class TestCellCounter { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final byte[] ROW1 = Bytes.toBytes("row1"); - private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1"); + private static final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2"); private static final String FAMILY_A_STRING = "a"; private static final String FAMILY_B_STRING = "b"; private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); @@ -112,6 +112,44 @@ public class TestCellCounter { } /** + * Test CellCounter all data should print to output + */ + @Test(timeout = 300000) + public void testCellCounterPrefix() throws Exception { + String sourceTable = "testCellCounterPrefix"; + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(Bytes.toBytes(sourceTable), families); + try { + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { sourceTable, FQ_OUTPUT_DIR.toString(), ";", "\\x01row1" }; + runCount(args); + FileInputStream inputStream = + new FileInputStream(OUTPUT_DIR + File.separator + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + } finally { + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** * Test CellCounter with time range all data should print to output */ @Test (timeout=300000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index b8ad5be..ea9de87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -106,7 +106,7 @@ public class TestCopyTable { assertEquals(1, r.size()); assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); } - + t1.close(); t2.close(); TEST_UTIL.deleteTable(TABLENAME1); @@ -121,7 +121,7 @@ public class TestCopyTable { public void testCopyTable() throws Exception { doCopyTableTest(false); } - + /** * Simple end-to-end test with bulkload. */ @@ -129,16 +129,16 @@ public class TestCopyTable { public void testCopyTableWithBulkload() throws Exception { doCopyTableTest(true); } - + @Test public void testStartStopRow() throws Exception { final TableName TABLENAME1 = TableName.valueOf("testStartStopRow1"); final TableName TABLENAME2 = TableName.valueOf("testStartStopRow2"); final byte[] FAMILY = Bytes.toBytes("family"); final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] ROW0 = Bytes.toBytes("row0"); - final byte[] ROW1 = Bytes.toBytes("row1"); - final byte[] ROW2 = Bytes.toBytes("row2"); + final byte[] ROW0 = Bytes.toBytesBinary("\\x01row0"); + final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1"); + final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2"); Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY); Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY); @@ -157,8 +157,8 @@ public class TestCopyTable { CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); assertEquals( 0, - copy.run(new String[] { "--new.name=" + TABLENAME2, "--startrow=row1", - "--stoprow=row2", TABLENAME1.getNameAsString() })); + copy.run(new String[] { "--new.name=" + TABLENAME2, "--startrow=\\x01row1", + "--stoprow=\\x01row2", TABLENAME1.getNameAsString() })); // verify the data was copied into table 2 // row1 exist, row0, row2 do not exist @@ -170,11 +170,11 @@ public class TestCopyTable { g = new Get(ROW0); r = t2.get(g); assertEquals(0, r.size()); - + g = new Get(ROW2); r = t2.get(g); assertEquals(0, r.size()); - + t1.close(); t2.close(); TEST_UTIL.deleteTable(TABLENAME1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 2154f99..e2131e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -90,8 +90,9 @@ import org.mockito.stubbing.Answer; public class TestImportExport { private static final Log LOG = LogFactory.getLog(TestImportExport.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final byte[] ROW1 = Bytes.toBytes("row1"); - private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); + private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); + private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); private static final String FAMILYA_STRING = "a"; private static final String FAMILYB_STRING = "b"; private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); @@ -179,9 +180,17 @@ public class TestImportExport { p.add(FAMILYA, QUAL, now+1, QUAL); p.add(FAMILYA, QUAL, now+2, QUAL); t.put(p); + p = new Put(ROW3); + p.add(FAMILYA, QUAL, now, QUAL); + p.add(FAMILYA, QUAL, now + 1, QUAL); + p.add(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); } String[] args = new String[] { + // Only export row1 & row2. + "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", + "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", EXPORT_TABLE, FQ_OUTPUT_DIR, "1000", // max number of key versions per key to export @@ -205,6 +214,9 @@ public class TestImportExport { g.setMaxVersions(); r = t.get(g); assertEquals(3, r.size()); + g = new Get(ROW3); + r = t.get(g); + assertEquals(0, r.size()); } } -- 2.6.4 (Apple Git-63)