Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (revision 1490514)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (working copy)
@@ -78,11 +78,17 @@
// Move them out of the tool and let the mapper handle its own validation.
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
+ public final static String COLUMNS_TYPE_CONF_KEY = "importtsv.columns.types";
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
final static String DEFAULT_SEPARATOR = "\t";
final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
+ // TODO can add other types.
+ public enum Type {
+ BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING
+ }
+
public static class TsvParser {
/**
* Column families and qualifiers mapped to the TSV columns
@@ -373,6 +379,10 @@
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
DEFAULT_MAPPER.getName() + "\n" +
" -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
+ " -D" + COLUMNS_TYPE_CONF_KEY + "=STRING,INT,SHORT - data types for each of the column.\n" +
+ " This is optional. If not passed all the column data types will be treated " +
+ " as String.\n If it is passed then user must specify types for all the columns " +
+ "except HBASE_TS_KEY(Including HBASE_ROW_KEY).\n" +
"For performance consider the following options:\n" +
" -Dmapred.map.tasks.speculative.execution=false\n" +
" -Dmapred.reduce.tasks.speculative.execution=false";
@@ -430,6 +440,37 @@
usage("One or more columns in addition to the row key and timestamp(optional) are required");
return -1;
}
+
+ String columnTypesFromArgs[] = getConf().getStrings(COLUMNS_TYPE_CONF_KEY);
+ // Check the number of types passed. When some types are passed it is supposed to pass the
+ // type for all the columns except the timestamp. A type needs to be specified for the rowkey
+ // also.
+ int expectedNoOfTypes = (tskeysFound == 0) ? columns.length : columns.length - tskeysFound;
+ // Specifying the types is optional. When no column types information is passed all the
+ // columns
+ // will be treated as of String type
+ if (columnTypesFromArgs == null) {
+ columnTypesFromArgs = new String[columns.length];
+ } else if (expectedNoOfTypes != columnTypesFromArgs.length) {
+ usage("No# of column types should match with no# columns including HBASE_ROW_KEY"
+ + "(Except for the HBASE_TS_KEY column)");
+ return -1;
+ } else {
+ // Final column types details. This includes a type for the TS column also. ie.LONG type.
+ // This is to make sure the index in both columns array and types array matching.
+ String columnTypes[] = new String[columns.length];
+ for (int i = 0, j = 0; i < columns.length; i++) {
+ if (columns[i].equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) {
+ columnTypes[i] = Type.LONG.name();
+ } else {
+ columnTypes[i] =
+ (columnTypesFromArgs[j] == null) ? Type.STRING.name() : columnTypesFromArgs[j];
+ j++;
+ }
+ }
+ // Setting the final column types into conf.
+ getConf().setStrings(COLUMNS_TYPE_CONF_KEY, columnTypes);
+ }
}
// If timestamp option is not specified, use current system time.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (revision 1490514)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (working copy)
@@ -20,8 +20,10 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
@@ -30,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
+import java.util.Arrays;
/**
* Write table content out to files in hdfs.
@@ -49,6 +52,9 @@
/** Should skip bad lines */
private boolean skipBadLines;
private Counter badLineCount;
+ private String[] columnTypes;
+ // To avoid excess string comparisons.
+ private boolean[] stringType;
private ImportTsv.TsvParser parser;
@@ -112,6 +118,14 @@
skipBadLines = context.getConfiguration().getBoolean(
ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+ columnTypes = context.getConfiguration().getStrings(ImportTsv.COLUMNS_TYPE_CONF_KEY);
+ // To avoid string comparison while checking for string type in map function.
+ if (columnTypes != null) {
+ stringType = new boolean[columnTypes.length];
+ for (int i = 0; i < columnTypes.length; i++) {
+ stringType[i] = columnTypes[i].equals(ImportTsv.Type.STRING.name());
+ }
+ }
}
/**
@@ -133,19 +147,35 @@
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
- Put put = new Put(rowKey.copyBytes());
+ byte[] row = rowKey.copyBytes();
+ int rowkeyColumnIndex = parser.getRowKeyColumnIndex();
+ if (columnTypes != null && !stringType[rowkeyColumnIndex]) {
+ // convert the rowkey to proper type bytes.
+ row = convertValueToProperType(Bytes.toString(row), columnTypes[rowkeyColumnIndex]);
+ rowKey.set(row);
+ }
+ Put put = new Put(row);
for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex()
- || i == parser.getTimestampKeyColumnIndex()) {
+ if (i == rowkeyColumnIndex || i == parser.getTimestampKeyColumnIndex()) {
continue;
}
- KeyValue kv = new KeyValue(
- lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
- parser.getFamily(i), 0, parser.getFamily(i).length,
- parser.getQualifier(i), 0, parser.getQualifier(i).length,
- ts,
- KeyValue.Type.Put,
- lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+ KeyValue kv = null;
+ if (columnTypes != null && !stringType[i]) {
+ // TODO there is a copying of bytes. Can we avoid this?
+ byte[] kvValue = Arrays.copyOfRange(lineBytes,
+ parsed.getColumnOffset(i),
+ parsed.getColumnOffset(i) + parsed.getColumnLength(i));
+ kvValue = convertValueToProperType(Bytes.toString(kvValue), columnTypes[i]);
+ kv =
+ new KeyValue(row, 0, row.length, parser.getFamily(i), 0, parser.getFamily(i).length,
+ parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, KeyValue.Type.Put,
+ kvValue, 0, kvValue.length);
+ } else {
+ kv =
+ new KeyValue(row, 0, row.length, parser.getFamily(i), 0, parser.getFamily(i).length,
+ parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, KeyValue.Type.Put,
+ lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+ }
put.add(kv);
}
context.write(rowKey, put);
@@ -173,4 +203,28 @@
e.printStackTrace();
}
}
+
+ private byte[] convertValueToProperType(String rawString, String type) throws BadTsvLineException {
+ try {
+ switch (ImportTsv.Type.valueOf(type)) {
+ case BYTE:
+ return new byte[] { Byte.parseByte(rawString) };
+ case DOUBLE:
+ return Bytes.toBytes(Double.parseDouble(rawString));
+ case FLOAT:
+ return Bytes.toBytes(Float.parseFloat(rawString));
+ case INT:
+ return Bytes.toBytes(Integer.parseInt(rawString));
+ case LONG:
+ return Bytes.toBytes(Long.parseLong(rawString));
+ case SHORT:
+ return Bytes.toBytes(Short.parseShort(rawString));
+ case STRING:
+ default:
+ return Bytes.toBytes(rawString);
+ }
+ } catch (NumberFormatException e) {
+ throw new BadTsvLineException(rawString + " cannot be converted to type: " + type);
+ }
+ }
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (revision 1490514)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (working copy)
@@ -185,6 +185,23 @@
return doMROnTableTest(util, family, data, args, 1);
}
+ @Test
+ public void testBulkOutputOfTypeSupportedDataWithAnExistingTable() throws Exception {
+ String table = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args =
+ new String[] {
+ "-D" + ImportTsv.COLUMNS_CONF_KEY
+ + "=FAM:A,HBASE_ROW_KEY,FAM:B,HBASE_TS_KEY,FAM:C,FAM:D,FAM:E",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+ "-D" + ImportTsv.COLUMNS_TYPE_CONF_KEY + "=INT,LONG,STRING,FLOAT,DOUBLE,BYTE", table };
+ String data = "1234,1234567890123456789,str,139233843,12.3,12233444455.23,c\n";
+ util.createTable(table, FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1);
+ util.deleteTable(table);
+ }
+
/**
* Run an ImportTsv job and perform basic validation on the results.
* Returns the ImportTsv Tool instance so that other tests can
@@ -193,9 +210,22 @@
* @param args Any arguments to pass BEFORE inputFile path is appended.
* @return The Tool instance used to run the test.
*/
- protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
- String data, String[] args, int valueMultiplier)
- throws Exception {
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+ String[] args, int valueMultiplier) throws Exception {
+ return doMROnTableTest(util, family, data, args, valueMultiplier, false);
+ }
+
+ /**
+ * Run an ImportTsv job and perform basic validation on the results.
+ * Returns the ImportTsv Tool instance so that other tests can
+ * inspect it for further validation as necessary. This method is static to
+ * insure non-reliance on instance's util/conf facilities.
+ * @param args Any arguments to pass BEFORE inputFile path is appended.
+ * @param typeSupport supporting data types or not.
+ * @return The Tool instance used to run the test.
+ */
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+ String[] args, int valueMultiplier, boolean typeSupport) throws Exception {
String table = args[args.length - 1];
Configuration conf = new Configuration(util.getConfiguration());
@@ -239,7 +269,7 @@
if (createdHFiles)
validateHFiles(fs, outputPath, family);
else
- validateTable(conf, table, family, valueMultiplier);
+ validateTable(conf, table, family, valueMultiplier, typeSupport);
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
@@ -251,8 +281,8 @@
/**
* Confirm ImportTsv via data in online table.
*/
- private static void validateTable(Configuration conf, String tableName,
- String family, int valueMultiplier) throws IOException {
+ private static void validateTable(Configuration conf, String tableName, String family,
+ int valueMultiplier, boolean typeSupport) throws IOException {
LOG.debug("Validating table.");
HTable table = new HTable(conf, tableName);
@@ -266,14 +296,30 @@
scan.addFamily(Bytes.toBytes(family));
ResultScanner resScanner = table.getScanner(scan);
for (Result res : resScanner) {
- assertTrue(res.size() == 2);
- List kvs = res.list();
- assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY"));
- assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY"));
- assertArrayEquals(kvs.get(0).getValue(),
- Bytes.toBytes("VALUE" + valueMultiplier));
- assertArrayEquals(kvs.get(1).getValue(),
- Bytes.toBytes("VALUE" + 2 * valueMultiplier));
+ if (typeSupport) {
+ assertTrue(res.size() == 5);
+ List kvs = res.list();
+ for (int j = 0; j < kvs.size(); j++) {
+ assertArrayEquals(kvs.get(j).getRow(),
+ Bytes.toBytes(Long.parseLong("1234567890123456789")));
+ }
+ assertArrayEquals(kvs.get(0).getValue(), Bytes.toBytes(Integer.parseInt("1234")));
+ assertArrayEquals(kvs.get(1).getValue(), Bytes.toBytes("str"));
+ assertArrayEquals(kvs.get(2).getValue(), Bytes.toBytes(Long.parseLong("139233843")));
+ assertArrayEquals(kvs.get(3).getValue(), Bytes.toBytes(Float.parseFloat("12.3")));
+ assertArrayEquals(kvs.get(4).getValue(), Bytes.toBytes(Double.parseDouble("12233444455.23")));
+ assertArrayEquals(kvs.get(5).getValue(), new byte[] { Byte.parseByte("c") });
+ // Only one result set is expected, so let it loop.
+ } else {
+ assertTrue(res.size() == 2);
+ List kvs = res.list();
+ assertArrayEquals(kvs.get(0).getRow(), Bytes.toBytes("KEY"));
+ assertArrayEquals(kvs.get(1).getRow(), Bytes.toBytes("KEY"));
+ assertArrayEquals(kvs.get(0).getValue(),
+ Bytes.toBytes("VALUE" + valueMultiplier));
+ assertArrayEquals(kvs.get(1).getValue(),
+ Bytes.toBytes("VALUE" + 2 * valueMultiplier));
+ }
// Only one result set is expected, so let it loop.
}
verified = true;