diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index f586523..525881b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -85,6 +85,7 @@ public class ImportTsv extends Configured implements Tool { // TODO: the rest of these configs are used exclusively by TsvImporterMapper. // Move them out of the tool and let the mapper handle its own validation. public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String NO_STRICT_COL_FAMILY = "no.strict"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; @@ -133,7 +134,7 @@ public class ImportTsv extends Configured implements Tool { /** * @param columnsSpecification the list of columns to parser out, comma separated. * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC - * @param separatorStr + * @param separatorStr */ public TsvParser(String columnsSpecification, String separatorStr) { // Configure separator @@ -257,7 +258,7 @@ public class ImportTsv extends Configured implements Tool { public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } - + public long getTimestamp(long ts) throws BadTsvLineException { // Return ts if HBASE_TS_KEY is not configured in column spec if (!hasTimestamp()) { @@ -283,7 +284,7 @@ public class ImportTsv extends Configured implements Tool { getColumnLength(attrKeyColumnIndex)); } } - + public String[] getIndividualAttributes() { String attributes = getAttributes(); if (attributes != null) { @@ -292,7 +293,7 @@ public class ImportTsv extends Configured implements Tool { return null; } } - + public int getAttributeKeyOffset() { if (hasAttributes()) { return getColumnOffset(attrKeyColumnIndex); @@ -446,6 +447,30 @@ public class ImportTsv extends Configured implements Tool { } } HTable table = new HTable(conf, tableName); + boolean noStrict = Boolean.valueOf(conf.get(NO_STRICT_COL_FAMILY)); + // if no.strict is false then check column family + if(!noStrict) { + ArrayList unmatchedFamilies = new ArrayList(); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + if(table.getTableDescriptor().getFamily(Bytes.toBytes(cf)) == null) { + unmatchedFamilies.add(cf); + } + } + if(unmatchedFamilies.size() > 0) { + ArrayList familyNames = new ArrayList(); + for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) + familyNames.add(family.getNameAsString()); + String msg = + "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + + Bytes.toString(table.getTableName()) + " are: " + + familyNames + ".\n" + + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + "=true.\n" ; + usage(msg); + System.exit(-1); + } + } job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); @@ -487,6 +512,17 @@ public class ImportTsv extends Configured implements Tool { private static void createTable(Admin admin, TableName tableName, String[] columns) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); + } + + private static Set getColumnFamilies(String[] columns) { Set cfSet = new HashSet(); for (String aColumn : columns) { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) @@ -497,13 +533,7 @@ public class ImportTsv extends Configured implements Tool { // we are only concerned with the first one (in case this is a cf:cq) cfSet.add(aColumn.split(":", 2)[0]); } - for (String cf : cfSet) { - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); - htd.addFamily(hcd); - } - LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", - tableName, cfSet)); - admin.createTable(htd); + return cfSet; } /* @@ -513,7 +543,7 @@ public class ImportTsv extends Configured implements Tool { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - String usage = + String usage = "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c \n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + @@ -547,6 +577,7 @@ public class ImportTsv extends Configured implements Tool { " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. Default is false\n" + " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + "\n" + @@ -600,7 +631,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); return -1; } - + int attrKeysFound = 0; for (String col : columns) { if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) @@ -611,7 +642,7 @@ public class ImportTsv extends Configured implements Tool { + TsvParser.ATTRIBUTES_COLUMN_SPEC); return -1; } - + // Make sure one or more columns are specified excluding rowkey and // timestamp key if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { @@ -626,7 +657,7 @@ public class ImportTsv extends Configured implements Tool { // Set it back to replace invalid timestamp (non-numeric) with current // system time getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); - + Job job = createSubmittableJob(getConf(), otherArgs); return job.waitForCompletion(true) ? 0 : 1; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 7607c78..3844a64 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -187,6 +187,23 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 3); util.deleteTable(table); } + + @Test + public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { + String table = "test-" + UUID.randomUUID(); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true", + table + }; + util.createTable(TableName.valueOf(table), FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); + } @Test public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {