diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 0e4a494..f497c21 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -22,7 +22,10 @@ import static java.lang.String.format; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -82,8 +86,9 @@ public class ImportTsv extends Configured implements Tool { public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; - + public final static String TAG_SEPERATOR_CONF_KEY = "importtsv.tag.seperator"; final static String DEFAULT_SEPARATOR = "\t"; + final static String DEFAULT_TAG_SEPERATOR = "\r"; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; public static class TsvParser { @@ -94,11 +99,13 @@ public class ImportTsv extends Configured implements Tool { private final byte[][] qualifiers; private final byte separatorByte; + + // TODO :Decide on how to pass this. + private byte tagSeperatorByte = -1; private int rowKeyColumnIndex; private int maxColumnCount; - // Default value must be negative public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1; @@ -107,14 +114,23 @@ public class ImportTsv extends Configured implements Tool { public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY"; public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; - /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC + * @param tagSeperatorStr */ - public TsvParser(String columnsSpecification, String separatorStr) { + public TsvParser(String columnsSpecification, String seperatorStr) { + this(columnsSpecification, seperatorStr, null); + } + public TsvParser(String columnsSpecification, String separatorStr, String tagSeperatorStr) { // Configure separator byte[] separator = Bytes.toBytes(separatorStr); + if(tagSeperatorStr != null) { + byte[] tagSeperator = Bytes.toBytes(tagSeperatorStr); + Preconditions.checkArgument(tagSeperator.length == 1, + "TsvParser only supports single-byte tag separators"); + tagSeperatorByte = tagSeperator[0]; + } Preconditions.checkArgument(separator.length == 1, "TsvParser only supports single-byte separators"); separatorByte = separator[0]; @@ -133,12 +149,10 @@ public class ImportTsv extends Configured implements Tool { rowKeyColumnIndex = i; continue; } - if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { timestampKeyColumnIndex = i; continue; } - String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -161,6 +175,7 @@ public class ImportTsv extends Configured implements Tool { public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } + public byte[] getFamily(int idx) { return families[idx]; } @@ -172,11 +187,33 @@ public class ImportTsv extends Configured implements Tool { throws BadTsvLineException { // Enumerate separator offsets ArrayList tabOffsets = new ArrayList(maxColumnCount); + // Maintains the starting offset of the tag inside the col specifications with the key + // as the index corresponding the corresponding col entry + Map tagOffsets = new HashMap(maxColumnCount); + int tagStartOffset = 0; + boolean tagFound = false; + int currColIndex = 0; for (int i = 0; i < length; i++) { if (lineBytes[i] == separatorByte) { tabOffsets.add(i); + if (tagFound) { + tagFound = false; + tagOffsets.put(currColIndex, (tagStartOffset)); + } + currColIndex++; + } + if (tagSeperatorByte != -1) { + if (lineBytes[i] == tagSeperatorByte) { + tagFound = true; + tagStartOffset = i + 1; + } } } + if (tagFound) { + // If there no bytes after the taginformation + tagFound = false; + tagOffsets.put(currColIndex, (tagStartOffset)); + } if (tabOffsets.isEmpty()) { throw new BadTsvLineException("No delimiter"); } @@ -191,16 +228,18 @@ public class ImportTsv extends Configured implements Tool { && tabOffsets.size() <= getTimestampKeyColumnIndex()) { throw new BadTsvLineException("No timestamp"); } - return new ParsedLine(tabOffsets, lineBytes); + return new ParsedLine(tabOffsets, lineBytes, tagOffsets); } class ParsedLine { private final ArrayList tabOffsets; private byte[] lineBytes; + private final Map tagOffsets; - ParsedLine(ArrayList tabOffsets, byte[] lineBytes) { + ParsedLine(ArrayList tabOffsets, byte[] lineBytes, Map tagOffsets) { this.tabOffsets = tabOffsets; this.lineBytes = lineBytes; + this.tagOffsets = tagOffsets; } public int getRowKeyOffset() { @@ -209,6 +248,7 @@ public class ImportTsv extends Configured implements Tool { public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } + public long getTimestamp(long ts) throws BadTsvLineException { // Return ts if HBASE_TS_KEY is not configured in column spec @@ -227,18 +267,83 @@ public class ImportTsv extends Configured implements Tool { } } + public List getTag(int idx) throws BadTsvLineException { + if (getTagLength(idx) != -1) { + String tagStr = Bytes.toString(lineBytes, getTagOffset(idx), getTagLength(idx)); + // The tag details should be written with ",". + // Even if 1 tag is specified let the user specify 1 + // TODO : change it later + String[] split = tagStr.split(","); + int noOfTags = 0; + try { + noOfTags = split.length; + } catch (NumberFormatException nfe) { + throw new BadTsvLineException("Invalid tag input" + tagStr); + } + if (noOfTags > 0) { + List tagList = new ArrayList(noOfTags); + for (int i = 0; i < split.length; i++) { + String[] tagSplit = split[i].split(":"); + try { + Tag tag = new Tag(Byte.valueOf(tagSplit[0]), + Bytes.toBytes(tagSplit[1])); + tagList.add(tag); + } catch (Exception e) { + throw new BadTsvLineException("Invalid tag input " + tagStr); + } + } + return tagList; + } else { + throw new BadTsvLineException("Invalid tag input" + tagStr); + } + } else { + return null; + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; else return 0; } + public int getColumnLength(int idx) { + if (tagOffsets.get(idx) != null) { + return getActualColLength(idx); + } return tabOffsets.get(idx) - getColumnOffset(idx); } + + private int getActualColLength(int idx) { + return (tabOffsets.get(idx) - getColumnOffset(idx)) - getTagLength(idx) - 1; + } + public int getColumnCount() { return tabOffsets.size(); } + + public int getTagOffset(int idx) { + if (tagOffsets.get(idx) != null) { + return tagOffsets.get(idx); + } else { + return -1; + } + } + + public int getTagLength(int idx) { + if (getTagOffset(idx) != -1) { + return tabOffsets.get(idx) - getTagOffset(idx); + } else { + return -1; + } + + } + + public int getTagCount() { + return tagOffsets.size(); + } + public byte[] getLineBytes() { return lineBytes; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 6360b2e..1ddcff8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -17,19 +17,19 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Counter; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Mapper; /** * Write table content out to files in hdfs. @@ -45,6 +45,9 @@ extends Mapper /** Column seperator */ private String separator; + + /** Tag seperator **/ + private String tagSeperator; /** Should skip bad lines */ private boolean skipBadLines; @@ -81,9 +84,8 @@ extends Mapper doSetup(context); Configuration conf = context.getConfiguration(); - parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), - separator); + separator, tagSeperator); if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } @@ -104,7 +106,7 @@ extends Mapper } else { separator = new String(Base64.decode(separator)); } - + tagSeperator = conf.get(ImportTsv.TAG_SEPERATOR_CONF_KEY); // Should never get 0 as we are setting this to a valid value in job // configuration. ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); @@ -139,13 +141,19 @@ extends Mapper || i == parser.getTimestampKeyColumnIndex()) { continue; } - KeyValue kv = new KeyValue( - lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, - parser.getQualifier(i), 0, parser.getQualifier(i).length, - ts, - KeyValue.Type.Put, - lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + KeyValue kv; + if (parsed.getTagLength(i) != -1) { + // Pass the Tags to form the Keyvalue + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i), parsed.getTag(i)); + } else { + kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), + parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, + parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, + parsed.getColumnOffset(i), parsed.getColumnLength(i)); + } put.add(kv); } context.write(rowKey, put); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java index bfebfb5..9abfacb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java @@ -17,18 +17,18 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; +import org.apache.hadoop.mapreduce.Mapper; /** * Write table content out to map output files. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index e8d2ac9..07e9339 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -26,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; @@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -100,20 +102,20 @@ public class TestImportTsv implements Configurable { util.shutdownMiniCluster(); } - @Test + @Test public void testMROnTable() throws Exception { - String table = "test-" + UUID.randomUUID(); + String tableName = "test-" + UUID.randomUUID(); // Prepare the arguments required for the test. String[] args = new String[] { "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - table + tableName }; - util.createTable(table, FAMILY); + util.createTable(tableName, FAMILY); doMROnTableTest(util, FAMILY, null, args, 1); - util.deleteTable(table); + util.deleteTable(tableName); } @Test @@ -134,6 +136,51 @@ public class TestImportTsv implements Configurable { util.deleteTable(table); } + @Test + public void testMROnTableWithTags() throws Exception { + String tableName = "testtags-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.TAG_SEPERATOR_CONF_KEY+ "=$", + tableName + }; + + util.createTable(tableName, FAMILY); + String data = "KEY\u001bVALUE1$:PRIVATE\u001bVALUE2$:PRIVATE,2:PUBLIC\n"; + doMROnTableTest(util, FAMILY, data, args, 1); + HTable table = new HTable(util.getConfiguration(), tableName); + Scan s = new Scan(); + ResultScanner scanner = table.getScanner(s); + for (Result res : scanner) { + LOG.debug("Getting results for tags"); + assertTrue(res.size() == 2); + List kvs = res.listCells(); + Cell cell = kvs.get(0); + assertTrue(cell.getTagsLength() > 0); + Cell cell2 = kvs.get(1); + assertTrue(cell2.getTagsLength() > 0); + KeyValue kv = KeyValueUtil.ensureKeyValue(cell2); + Iterator tagsIterator = kv.tagsIterator(); + boolean tagFound1 = false; + boolean tagFound2 = false; + while (tagsIterator.hasNext()) { + Tag tag = tagsIterator.next(); + LOG.debug(tag.getValue()); + if(Bytes.equals(tag.getValue(), Bytes.toBytes("PRIVATE"))) { + tagFound1 = true; + } else if(Bytes.equals(tag.getValue(), Bytes.toBytes("PUBLIC"))) { + tagFound2 = true; + } + } + assertTrue(tagFound1); + assertTrue(tagFound2); + } + table.close(); + util.deleteTable(tableName); + } @Test public void testMROnTableWithCustomMapper() @@ -229,6 +276,8 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, data, args, 4); } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args) throws Exception { return doMROnTableTest(util, family, data, args, 1); @@ -284,7 +333,7 @@ public class TestImportTsv implements Configurable { break; } } - + LOG.debug("validating the table "+createdHFiles); if (createdHFiles) validateHFiles(fs, outputPath, family); else @@ -315,6 +364,7 @@ public class TestImportTsv implements Configurable { scan.addFamily(Bytes.toBytes(family)); ResultScanner resScanner = table.getScanner(scan); for (Result res : resScanner) { + LOG.debug("Getting results "+res.size()); assertTrue(res.size() == 2); List kvs = res.listCells(); assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java index edc927b..6c9ab0b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -25,9 +25,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; @@ -54,7 +56,18 @@ public class TestImportTsvParser { ArrayList parsedCols = new ArrayList(); for (int i = 0; i < parsed.getColumnCount(); i++) { parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i), - parsed.getColumnLength(i))); + parsed.getColumnLength(i))); + + if (parsed.getTagLength(i) != -1) { + try { + List tag = parsed.getTag(i); + for (Tag t : tag) { + System.out.println(Bytes.toString(t.getValue())); + } + } catch (BadTsvLineException e) { + e.printStackTrace(); + } + } } if (!Iterables.elementsEqual(parsedCols, expected)) { fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:" @@ -138,6 +151,83 @@ public class TestImportTsvParser { assertEquals(1234l, parsed.getTimestamp(-1)); checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); } + + @Test + public void testTsvParserWithTags() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t", "\r"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + assertEquals(2, parser.getRowKeyColumnIndex()); + + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, + parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("val_a\r2:ABC,4:DFS\tval_b\tval_c\r2:ACL\tval_d"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(("val_a\tval_b\tval_c\tval_d"))); + } + + @Test + public void testTSVParserWithTagsWithSpecialUniCodeCharAsSeperators() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t", "\u001c"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + assertEquals(2, parser.getRowKeyColumnIndex()); + + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, + parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("val_a\u001c2:ABC,4:DFS\tval_b\tval_c\u001c2:ACL\tval_d"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(("val_a\tval_b\tval_c\tval_d"))); + } + + @Test + public void testTsvParserWithTagsAtEnd() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t", "\r"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + assertEquals(2, parser.getRowKeyColumnIndex()); + + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, + parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("val_a\r2:ABC,4:DFS\tval_b\tval_c\r2:ACL\tval_d\r4:VISIBILITY,5:PRIVATE"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(("val_a\tval_b\tval_c\tval_d"))); + } + + + @Test + public void testTsvParserWithTimestampWithTags() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t", "\r"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertNull(parser.getFamily(1)); + assertNull(parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("rowkey\t1234\tval_a\r1:ABC"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(1234l, parsed.getTimestamp(-1)); + String[] split = Bytes.toString(line).split("\r"); + checkParsing(parsed, Splitter.on("\t").split(split[0])); + } /** * Test cases that throw BadTsvLineException