### Eclipse Workspace Patch 1.0 #P hbase Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (revision 1302273) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (working copy) @@ -61,6 +61,7 @@ assertNull(parser.getFamily(0)); assertNull(parser.getQualifier(0)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(-1, parser.getTimestapKeyColumnIndex()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); assertNull(parser.getFamily(0)); @@ -68,6 +69,7 @@ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(-1, parser.getTimestapKeyColumnIndex()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); assertNull(parser.getFamily(0)); @@ -77,6 +79,17 @@ assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(-1, parser.getTimestapKeyColumnIndex()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(2, parser.getTimestapKeyColumnIndex()); } @Test @@ -89,7 +102,7 @@ assertNull(parser.getFamily(2)); assertNull(parser.getQualifier(2)); assertEquals(2, parser.getRowKeyColumnIndex()); - + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); ParsedLine parsed = parser.parse(line, line.length); checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); @@ -120,28 +133,28 @@ public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes(""); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("key_only"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test @@ -159,10 +172,28 @@ INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); } @Test + public void testMROnTableWithTimestamp() + throws Exception { + String TABLE_NAME = "TestTable"; + String FAMILY = "FAM"; + String INPUT_FILE = "InputFile1.csv"; + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE }; + + String data = "KEY,1234,VALUE1,VALUE2\n"; + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1); + } + + + @Test public void testMROnTableWithCustomMapper() throws Exception { String TABLE_NAME = "TestTable"; @@ -176,16 +207,17 @@ INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); } - private void doMROnTableTest(String inputFile, String family, String tableName, - String[] args, int valueMultiplier) throws Exception { + private void doMROnTableTest(String inputFile, String family, + String tableName, String data, String[] args, int valueMultiplier) + throws Exception { // Cluster HBaseTestingUtility htu1 = new HBaseTestingUtility(); - MiniHBaseCluster cluster = htu1.startMiniCluster(); + htu1.startMiniCluster(); htu1.startMiniMapReduceCluster(); GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args); @@ -196,14 +228,13 @@ FileSystem fs = FileSystem.get(conf); FSDataOutputStream op = fs.create(new Path(inputFile), true); - String line = "KEY\u001bVALUE1\u001bVALUE2\n"; - op.write(line.getBytes(HConstants.UTF8_ENCODING)); + if (data == null) + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + op.write(data.getBytes(HConstants.UTF8_ENCODING)); op.close(); final byte[] FAM = Bytes.toBytes(family); final byte[] TAB = Bytes.toBytes(tableName); - final byte[] QA = Bytes.toBytes("A"); - final byte[] QB = Bytes.toBytes("B"); HTableDescriptor desc = new HTableDescriptor(TAB); desc.addFamily(new HColumnDescriptor(FAM)); @@ -212,7 +243,7 @@ Job job = ImportTsv.createSubmittableJob(conf, args); job.waitForCompletion(false); assertTrue(job.isSuccessful()); - + HTable table = new HTable(new Configuration(conf), TAB); boolean verified = false; long pause = conf.getLong("hbase.client.pause", 5 * 1000); @@ -254,7 +285,7 @@ htu1.shutdownMiniCluster(); } } - + public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { return new String(bytes, HConstants.UTF8_ENCODING); } Index: src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (revision 1302273) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Counter; @@ -105,7 +106,8 @@ separator = new String(Base64.decode(separator)); } - ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + // Should never get 0. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); @@ -128,10 +130,13 @@ new ImmutableBytesWritable(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + // Retrieve timestamp if exists + if (parser.getTimestapKeyColumnIndex() != -1) + ts = parsed.getTimestamp(); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex()) continue; + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestapKeyColumnIndex()) continue; KeyValue kv = new KeyValue( lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, Index: src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1302273) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy) @@ -77,8 +77,15 @@ private int rowKeyColumnIndex; + private int maxColumnCount; + + // Default value must be negative + private int timestampKeyColumnIndex = -1; + public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY"; + public static String TIMESTAMPKEY_COLUMN_SPEC="HBASE_TS_KEY"; + /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC @@ -94,15 +101,20 @@ ArrayList columnStrings = Lists.newArrayList( Splitter.on(',').trimResults().split(columnsSpecification)); - families = new byte[columnStrings.size()][]; - qualifiers = new byte[columnStrings.size()][]; - + maxColumnCount = columnStrings.size(); + families = new byte[maxColumnCount][]; + qualifiers = new byte[maxColumnCount][]; for (int i = 0; i < columnStrings.size(); i++) { String str = columnStrings.get(i); if (ROWKEY_COLUMN_SPEC.equals(str)) { rowKeyColumnIndex = i; continue; } + if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { + timestampKeyColumnIndex = i; + continue; + } + String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -117,6 +129,11 @@ public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } + + public int getTimestapKeyColumnIndex() { + return timestampKeyColumnIndex; + } + public byte[] getFamily(int idx) { return families[idx]; } @@ -139,10 +156,13 @@ tabOffsets.add(length); - if (tabOffsets.size() > families.length) { + if (tabOffsets.size() > maxColumnCount) { throw new BadTsvLineException("Excessive columns"); } else if (tabOffsets.size() <= getRowKeyColumnIndex()) { throw new BadTsvLineException("No row key"); + } else if (getTimestapKeyColumnIndex() > -1 + && tabOffsets.size() <= getTimestapKeyColumnIndex()) { + throw new BadTsvLineException("No timestamp"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -162,6 +182,14 @@ public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } + + public long getTimestamp() { + if(timestampKeyColumnIndex == -1) return 0; + return Long.parseLong(new String(lineBytes, + getColumnOffset(timestampKeyColumnIndex), + getColumnLength(timestampKeyColumnIndex))); + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; @@ -248,7 +276,7 @@ if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - String usage = + String usage = "Usage: " + NAME + " -Dimporttsv.columns=a,b,c \n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + @@ -259,7 +287,11 @@ "column name HBASE_ROW_KEY is used to designate that this column should be used\n" + "as the row key for each imported record. You must specify exactly one column\n" + "to be the row key, and you must specify a column name for every column that exists in the\n" + - "input data.\n" + + "input data. Another special column HBASE_TS_KEY designates that this column should be\n" + + "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" + + "You must specify atmost one column as timestamp key for each imported record.\n" + + "Record with invalid timestamps (blank, non-numeric values) will be treated as bad record.\n" + + "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" + "\n" + "By default importtsv will load data directly into HBase. To instead generate\n" + "HFiles of data to prepare for a bulk data load, pass the option:\n" + @@ -269,7 +301,7 @@ "Other options that may be specified with -D include:\n" + " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + - " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + + " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import. This option is ignored if HBASE_TS_KEY is specfied in 'importtsv.columns'\n" + " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n"; System.err.println(usage); @@ -307,12 +339,28 @@ System.exit(-1); } - // Make sure one or more columns are specified - if (columns.length < 2) { - usage("One or more columns in addition to the row key are required"); + // Make sure we have at most one column as the timestamp key + int tskeysFound=0; + for (String col : columns) { + if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) tskeysFound++; + } + if (tskeysFound > 1) { + usage("Must specify exactly one column as " + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); System.exit(-1); } + // Make sure one or more columns are specified excluding rowkey and timestamp key + if (columns.length - (rowkeysFound + tskeysFound) < 2) { + usage("One or more columns in addition to the row key and timestamp(optional) key are required"); + System.exit(-1); + } + + // If timestamp option is not specified, use current system time. + long timstamp = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + + // Set it back to replace invalid timestamp (non-numeric) with current system time + conf.setLong(TIMESTAMP_CONF_KEY, timstamp); + Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); }