diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index a953c3e..a50fe2a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -84,6 +84,7 @@ public class ImportTsv extends Configured implements Tool { // TODO: the rest of these configs are used exclusively by TsvImporterMapper. // Move them out of the tool and let the mapper handle its own validation. public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String NO_STRICT_COL_FAMILY = "no.strict"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; @@ -438,6 +439,30 @@ public class ImportTsv extends Configured implements Tool { createTable(admin, tableName, columns); } HTable table = new HTable(conf, tableName); + boolean noStrict = Boolean.valueOf(conf.get(NO_STRICT_COL_FAMILY)); + // if no.strict is false then check column family + if(!noStrict) { + ArrayList unmatchedFamilies = new ArrayList(); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + if(table.getTableDescriptor().getFamily(Bytes.toBytes(cf)) == null) { + unmatchedFamilies.add(cf); + } + } + if(unmatchedFamilies.size() > 0) { + ArrayList familyNames = new ArrayList(); + for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) + familyNames.add(family.getNameAsString()); + String msg = + "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + + Bytes.toString(table.getTableName()) + " are: " + + familyNames + ".\n" + + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + "=true.\n" ; + usage(msg); + System.exit(-1); + } + } job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); @@ -474,6 +499,17 @@ public class ImportTsv extends Configured implements Tool { private static void createTable(Admin admin, TableName tableName, String[] columns) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); + } + + private static Set getColumnFamilies(String[] columns) { Set cfSet = new HashSet(); for (String aColumn : columns) { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) @@ -484,13 +520,7 @@ public class ImportTsv extends Configured implements Tool { // we are only concerned with the first one (in case this is a cf:cq) cfSet.add(aColumn.split(":", 2)[0]); } - for (String cf : cfSet) { - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); - htd.addFamily(hcd); - } - LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", - tableName, cfSet)); - admin.createTable(htd); + return cfSet; } /* @@ -534,6 +564,7 @@ public class ImportTsv extends Configured implements Tool { " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. Default is false\n" + "For performance consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index e3b3495..91888d3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -186,6 +186,23 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 3); util.deleteTable(table); } + + @Test + public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { + String table = "test-" + UUID.randomUUID(); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true", + table + }; + util.createTable(TableName.valueOf(table), FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); + } @Test public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {