Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1490514) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy) @@ -78,11 +78,17 @@ // Move them out of the tool and let the mapper handle its own validation. public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; + public final static String COLUMNS_TYPE_CONF_KEY = "importtsv.columns.types"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; final static String DEFAULT_SEPARATOR = "\t"; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; + // TODO can add other types. + public enum Type { + BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING + } + public static class TsvParser { /** * Column families and qualifiers mapped to the TSV columns @@ -373,6 +379,10 @@ " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + COLUMNS_TYPE_CONF_KEY + "=STRING,INT,SHORT - data types for each of the column.\n" + + " This is optional. If not passed all the column data types will be treated " + + " as String.\n If it is passed then user must specify types for all the columns " + + "except HBASE_TS_KEY(Including HBASE_ROW_KEY).\n" + "For performance consider the following options:\n" + " -Dmapred.map.tasks.speculative.execution=false\n" + " -Dmapred.reduce.tasks.speculative.execution=false"; @@ -430,6 +440,37 @@ usage("One or more columns in addition to the row key and timestamp(optional) are required"); return -1; } + + String columnTypesFromArgs[] = getConf().getStrings(COLUMNS_TYPE_CONF_KEY); + // Check the number of types passed. When some types are passed it is supposed to pass the + // type for all the columns except the timestamp. A type needs to be specified for the rowkey + // also. + int expectedNoOfTypes = (tskeysFound == 0) ? columns.length : columns.length - tskeysFound; + // Specifying the types is optional. When no column types information is passed all the + // columns + // will be treated as of String type + if (columnTypesFromArgs == null) { + columnTypesFromArgs = new String[columns.length]; + } else if (expectedNoOfTypes != columnTypesFromArgs.length) { + usage("No# of column types should match with no# columns including HBASE_ROW_KEY" + + "(Except for the HBASE_TS_KEY column)"); + return -1; + } else { + // Final column types details. This includes a type for the TS column also. ie.LONG type. + // This is to make sure the index in both columns array and types array matching. + String columnTypes[] = new String[columns.length]; + for (int i = 0, j = 0; i < columns.length; i++) { + if (columns[i].equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) { + columnTypes[i] = Type.LONG.name(); + } else { + columnTypes[i] = + (columnTypesFromArgs[j] == null) ? Type.STRING.name() : columnTypesFromArgs[j]; + j++; + } + } + // Setting the final column types into conf. + getConf().setStrings(COLUMNS_TYPE_CONF_KEY, columnTypes); + } } // If timestamp option is not specified, use current system time. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (revision 1490514) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (working copy) @@ -20,8 +20,10 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Counter; @@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import java.io.IOException; +import java.util.Arrays; /** * Write table content out to files in hdfs. @@ -49,6 +52,9 @@ /** Should skip bad lines */ private boolean skipBadLines; private Counter badLineCount; + private String[] columnTypes; + // To avoid excess string comparisons. + private boolean[] stringType; private ImportTsv.TsvParser parser; @@ -112,6 +118,14 @@ skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); + columnTypes = context.getConfiguration().getStrings(ImportTsv.COLUMNS_TYPE_CONF_KEY); + // To avoid string comparison while checking for string type in map function. + if (columnTypes != null) { + stringType = new boolean[columnTypes.length]; + for (int i = 0; i < columnTypes.length; i++) { + stringType[i] = columnTypes[i].equals(ImportTsv.Type.STRING.name()); + } + } } /** @@ -133,19 +147,35 @@ // Retrieve timestamp if exists ts = parsed.getTimestamp(ts); - Put put = new Put(rowKey.copyBytes()); + byte[] row = rowKey.copyBytes(); + int rowkeyColumnIndex = parser.getRowKeyColumnIndex(); + if (columnTypes != null && !stringType[rowkeyColumnIndex]) { + // convert the rowkey to proper type bytes. + row = convertValueToProperType(Bytes.toString(row), columnTypes[rowkeyColumnIndex]); + rowKey.set(row); + } + Put put = new Put(row); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex() - || i == parser.getTimestampKeyColumnIndex()) { + if (i == rowkeyColumnIndex || i == parser.getTimestampKeyColumnIndex()) { continue; } - KeyValue kv = new KeyValue( - lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, - parser.getQualifier(i), 0, parser.getQualifier(i).length, - ts, - KeyValue.Type.Put, - lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + KeyValue kv = null; + if (columnTypes != null && !stringType[i]) { + // TODO there is a copying of bytes. Can we avoid this? + byte[] kvValue = Arrays.copyOfRange(lineBytes, + parsed.getColumnOffset(i), + parsed.getColumnOffset(i) + parsed.getColumnLength(i)); + kvValue = convertValueToProperType(Bytes.toString(kvValue), columnTypes[i]); + kv = + new KeyValue(row, 0, row.length, parser.getFamily(i), 0, parser.getFamily(i).length, + parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, KeyValue.Type.Put, + kvValue, 0, kvValue.length); + } else { + kv = + new KeyValue(row, 0, row.length, parser.getFamily(i), 0, parser.getFamily(i).length, + parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, KeyValue.Type.Put, + lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i)); + } put.add(kv); } context.write(rowKey, put); @@ -173,4 +203,28 @@ e.printStackTrace(); } } + + private byte[] convertValueToProperType(String rawString, String type) throws BadTsvLineException { + try { + switch (ImportTsv.Type.valueOf(type)) { + case BYTE: + return new byte[] { Byte.parseByte(rawString) }; + case DOUBLE: + return Bytes.toBytes(Double.parseDouble(rawString)); + case FLOAT: + return Bytes.toBytes(Float.parseFloat(rawString)); + case INT: + return Bytes.toBytes(Integer.parseInt(rawString)); + case LONG: + return Bytes.toBytes(Long.parseLong(rawString)); + case SHORT: + return Bytes.toBytes(Short.parseShort(rawString)); + case STRING: + default: + return Bytes.toBytes(rawString); + } + } catch (NumberFormatException e) { + throw new BadTsvLineException(rawString + " cannot be converted to type: " + type); + } + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (revision 1490514) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (working copy) @@ -185,6 +185,23 @@ return doMROnTableTest(util, family, data, args, 1); } + @Test + public void testBulkOutputOfTypeSupportedDataWithAnExistingTable() throws Exception { + String table = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=FAM:A,HBASE_ROW_KEY,FAM:B,HBASE_TS_KEY,FAM:C,FAM:D,FAM:E", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.COLUMNS_TYPE_CONF_KEY + "=INT,LONG,STRING,FLOAT,DOUBLE,BYTE", table }; + String data = "1234,1234567890123456789,str,139233843,12.3,12233444455.23,c\n"; + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(table); + } + /** * Run an ImportTsv job and perform basic validation on the results. * Returns the ImportTsv Tool instance so that other tests can @@ -193,9 +210,22 @@ * @param args Any arguments to pass BEFORE inputFile path is appended. * @return The Tool instance used to run the test. */ - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, - String data, String[] args, int valueMultiplier) - throws Exception { + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + return doMROnTableTest(util, family, data, args, valueMultiplier, false); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. + * Returns the ImportTsv Tool instance so that other tests can + * inspect it for further validation as necessary. This method is static to + * insure non-reliance on instance's util/conf facilities. + * @param args Any arguments to pass BEFORE inputFile path is appended. + * @param typeSupport supporting data types or not. + * @return The Tool instance used to run the test. + */ + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier, boolean typeSupport) throws Exception { String table = args[args.length - 1]; Configuration conf = new Configuration(util.getConfiguration()); @@ -239,7 +269,7 @@ if (createdHFiles) validateHFiles(fs, outputPath, family); else - validateTable(conf, table, family, valueMultiplier); + validateTable(conf, table, family, valueMultiplier, typeSupport); if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { LOG.debug("Deleting test subdirectory"); @@ -251,8 +281,8 @@ /** * Confirm ImportTsv via data in online table. */ - private static void validateTable(Configuration conf, String tableName, - String family, int valueMultiplier) throws IOException { + private static void validateTable(Configuration conf, String tableName, String family, + int valueMultiplier, boolean typeSupport) throws IOException { LOG.debug("Validating table."); HTable table = new HTable(conf, tableName); @@ -266,14 +296,30 @@ scan.addFamily(Bytes.toBytes(family)); ResultScanner resScanner = table.getScanner(scan); for (Result res : resScanner) { - assertTrue(res.size() == 2); - List kvs = res.list(); - assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY")); - assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY")); - assertArrayEquals(kvs.get(0).getValue(), - Bytes.toBytes("VALUE" + valueMultiplier)); - assertArrayEquals(kvs.get(1).getValue(), - Bytes.toBytes("VALUE" + 2 * valueMultiplier)); + if (typeSupport) { + assertTrue(res.size() == 5); + List kvs = res.list(); + for (int j = 0; j < kvs.size(); j++) { + assertArrayEquals(kvs.get(j).getRow(), + Bytes.toBytes(Long.parseLong("1234567890123456789"))); + } + assertArrayEquals(kvs.get(0).getValue(), Bytes.toBytes(Integer.parseInt("1234"))); + assertArrayEquals(kvs.get(1).getValue(), Bytes.toBytes("str")); + assertArrayEquals(kvs.get(2).getValue(), Bytes.toBytes(Long.parseLong("139233843"))); + assertArrayEquals(kvs.get(3).getValue(), Bytes.toBytes(Float.parseFloat("12.3"))); + assertArrayEquals(kvs.get(4).getValue(), Bytes.toBytes(Double.parseDouble("12233444455.23"))); + assertArrayEquals(kvs.get(5).getValue(), new byte[] { Byte.parseByte("c") }); + // Only one result set is expected, so let it loop. + } else { + assertTrue(res.size() == 2); + List kvs = res.list(); + assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY")); + assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY")); + assertArrayEquals(kvs.get(0).getValue(), + Bytes.toBytes("VALUE" + valueMultiplier)); + assertArrayEquals(kvs.get(1).getValue(), + Bytes.toBytes("VALUE" + 2 * valueMultiplier)); + } // Only one result set is expected, so let it loop. } verified = true;