Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (revision 1443813) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (working copy) @@ -64,6 +64,7 @@ assertNull(parser.getFamily(0)); assertNull(parser.getQualifier(0)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); assertNull(parser.getFamily(0)); @@ -71,6 +72,7 @@ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); assertNull(parser.getFamily(0)); @@ -80,6 +82,19 @@ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", + "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); } @Test @@ -93,11 +108,33 @@ assertNull(parser.getQualifier(2)); assertEquals(2, parser.getRowKeyColumnIndex()); + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser + .getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); ParsedLine parsed = parser.parse(line, line.length); checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); } + + + @Test + public void testTsvParserWithTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertNull(parser.getFamily(1)); + assertNull(parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(1234l, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + private void checkParsing(ParsedLine parsed, Iterable expected) { ArrayList parsedCols = new ArrayList(); for (int i = 0; i < parsed.getColumnCount(); i++) { @@ -123,29 +160,48 @@ public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes(""); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("key_only"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserInvalidTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); ParsedLine parsed = parser.parse(line, line.length); + assertEquals(-1, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserNoTimestampValue() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a"); + parser.parse(line, line.length); + } + @Test public void testMROnTable() @@ -162,9 +218,26 @@ INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); } + + @Test + public void testMROnTableWithTimestamp() throws Exception { + String TABLE_NAME = "TestTable"; + String FAMILY = "FAM"; + String INPUT_FILE = "InputFile1.csv"; + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE }; + + String data = "KEY,1234,VALUE1,VALUE2\n"; + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1); + } + + @Test public void testMROnTableWithCustomMapper() throws Exception { @@ -179,11 +252,11 @@ INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); } private void doMROnTableTest(String inputFile, String family, String tableName, - String[] args, int valueMultiplier) throws Exception { + String data, String[] args, int valueMultiplier) throws Exception { // Cluster HBaseTestingUtility htu1 = new HBaseTestingUtility(); @@ -198,8 +271,10 @@ try { FileSystem fs = FileSystem.get(conf); FSDataOutputStream op = fs.create(new Path(inputFile), true); - String line = "KEY\u001bVALUE1\u001bVALUE2\n"; - op.write(line.getBytes(HConstants.UTF8_ENCODING)); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); op.close(); final byte[] FAM = Bytes.toBytes(family); @@ -273,11 +348,11 @@ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME, INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); } public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { - return new String(bytes, HConstants.UTF8_ENCODING); + return new String(bytes); } @org.junit.Rule Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (revision 1443813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (working copy) @@ -101,7 +101,9 @@ separator = new String(Base64.decode(separator)); } - ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + // Should never get 0 as we are setting this to a valid value in job + // configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); @@ -124,10 +126,15 @@ new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex()) continue; + if (i == parser.getRowKeyColumnIndex() + || i == parser.getTimestampKeyColumnIndex()) { + continue; + } KeyValue kv = new KeyValue( lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, Index: src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1443813) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy) @@ -79,8 +79,17 @@ private int rowKeyColumnIndex; - public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY"; + private int maxColumnCount; + // Default value must be negative + public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1; + + private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX; + + public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY"; + + public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; + /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC @@ -96,8 +105,9 @@ ArrayList columnStrings = Lists.newArrayList( Splitter.on(',').trimResults().split(columnsSpecification)); - families = new byte[columnStrings.size()][]; - qualifiers = new byte[columnStrings.size()][]; + maxColumnCount = columnStrings.size(); + families = new byte[maxColumnCount][]; + qualifiers = new byte[maxColumnCount][]; for (int i = 0; i < columnStrings.size(); i++) { String str = columnStrings.get(i); @@ -105,6 +115,12 @@ rowKeyColumnIndex = i; continue; } + + if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { + timestampKeyColumnIndex = i; + continue; + } + String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -116,6 +132,14 @@ } } + public boolean hasTimestamp() { + return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX; + } + + public int getTimestampKeyColumnIndex() { + return timestampKeyColumnIndex; + } + public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } @@ -129,7 +153,7 @@ public ParsedLine parse(byte[] lineBytes, int length) throws BadTsvLineException { // Enumerate separator offsets - ArrayList tabOffsets = new ArrayList(families.length); + ArrayList tabOffsets = new ArrayList(maxColumnCount); for (int i = 0; i < length; i++) { if (lineBytes[i] == separatorByte) { tabOffsets.add(i); @@ -141,10 +165,13 @@ tabOffsets.add(length); - if (tabOffsets.size() > families.length) { + if (tabOffsets.size() > maxColumnCount) { throw new BadTsvLineException("Excessive columns"); } else if (tabOffsets.size() <= getRowKeyColumnIndex()) { throw new BadTsvLineException("No row key"); + } else if (hasTimestamp() + && tabOffsets.size() <= getTimestampKeyColumnIndex()) { + throw new BadTsvLineException("No timestamp"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -164,6 +191,24 @@ public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } + + public long getTimestamp(long ts) throws BadTsvLineException { + // Return ts if HBASE_TS_KEY is not configured in column spec + if (!hasTimestamp()) { + return ts; + } + + String timeStampStr = Bytes.toString(lineBytes, + getColumnOffset(timestampKeyColumnIndex), + getColumnLength(timestampKeyColumnIndex)); + try { + return Long.parseLong(timeStampStr); + } catch (NumberFormatException nfe) { + // treat this record as bad record + throw new BadTsvLineException("Invalid timestamp " + timeStampStr); + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; @@ -285,7 +330,11 @@ "column name HBASE_ROW_KEY is used to designate that this column should be used\n" + "as the row key for each imported record. You must specify exactly one column\n" + "to be the row key, and you must specify a column name for every column that exists in the\n" + - "input data.\n" + + "input data. Another special column HBASE_TS_KEY designates that this column should be\n" + + "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" + + "You must specify atmost one column as timestamp key for each imported record.\n" + + "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + + "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" + "\n" + "By default importtsv will load data directly into HBase. To instead generate\n" + "HFiles of data to prepare for a bulk data load, pass the option:\n" + @@ -344,11 +393,33 @@ System.exit(-1); } - // Make sure one or more columns are specified - if (columns.length < 2) { - usage("One or more columns in addition to the row key are required"); + // Make sure we have at most one column as the timestamp key + int tskeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) + tskeysFound++; + } + if (tskeysFound > 1) { + usage("Must specify at most one column as " + + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); System.exit(-1); } + + // Make sure one or more columns are specified excluding rowkey and + // timestamp key + if (columns.length - (rowkeysFound + tskeysFound) < 1) { + usage("One or more columns in addition to the row key and timestamp(optional) are required"); + System.exit(-1); + } + + // If timestamp option is not specified, use current system time. + long timstamp = conf + .getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + + // Set it back to replace invalid timestamp (non-numeric) with current + // system time + conf.setLong(TIMESTAMP_CONF_KEY, timstamp); + hbaseAdmin = new HBaseAdmin(conf); Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1);