diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index de5f57a..9430b13 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -84,6 +84,7 @@ public class ImportTsv extends Configured implements Tool { // TODO: the rest of these configs are used exclusively by TsvImporterMapper. // Move them out of the tool and let the mapper handle its own validation. public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String STRICT_COL_FAMILY = "importtsv.strict.colfamily"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; @@ -438,6 +439,30 @@ public class ImportTsv extends Configured implements Tool { createTable(admin, tableName, columns); } HTable table = new HTable(conf, tableName); + String strict = conf.get(STRICT_COL_FAMILY); + if(strict == null || !strict.equals("false")) { + HColumnDescriptor[] families = table.getTableDescriptor().getColumnFamilies(); + ArrayList familyNames = new ArrayList(); + for (HColumnDescriptor family : families) { + familyNames.add(family.getNameAsString()); + } + ArrayList unmatchedFamilies = new ArrayList(); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + if (!familyNames.contains(cf)) { + unmatchedFamilies.add(cf); + } + } + if (unmatchedFamilies.size() > 0) { + String msg = + "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + + unmatchedFamilies + "; valid family names of table " + + Bytes.toString(table.getTableName()) + " are: " + familyNames + ".\n" + + "To disable column family check, use -D" + STRICT_COL_FAMILY + "=false.\n" ; + LOG.error(msg); + System.exit(-1); + } + } job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); @@ -473,6 +498,17 @@ public class ImportTsv extends Configured implements Tool { private static void createTable(Admin admin, String tableName, String[] columns) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); + } + + private static Set getColumnFamilies(String[] columns) { Set cfSet = new HashSet(); for (String aColumn : columns) { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) @@ -483,14 +519,8 @@ public class ImportTsv extends Configured implements Tool { // we are only concerned with the first one (in case this is a cf:cq) cfSet.add(aColumn.split(":", 2)[0]); } - for (String cf : cfSet) { - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); - htd.addFamily(hcd); - } - LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", - tableName, cfSet)); - admin.createTable(htd); - } + return cfSet; + } /* * @param errorMsg Error message. Can be null. @@ -533,6 +563,7 @@ public class ImportTsv extends Configured implements Tool { " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + STRICT_COL_FAMILY + "=false - ignore if column family does not match. Default is true\n" + "For performance consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 8706d9c..f69456f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -185,6 +185,43 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 3); util.deleteTable(table); } + + @Test + public void testBulkOutputWithAnExistingTableWrongFamily() throws Exception { + String table = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + table + }; + + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); + } + + @Test + public void testBulkOutputWithAnExistingTableWrongFamilyParam() throws Exception { + String table = "test-" + UUID.randomUUID(); + + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.STRICT_COL_FAMILY + "=false", + table + }; + + util.createTable(table, FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); + } @Test public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { @@ -285,8 +322,9 @@ public class TestImportTsv implements Configurable { } } - if (createdHFiles) + if (createdHFiles) { validateHFiles(fs, outputPath, family); + } else validateTable(conf, table, family, valueMultiplier);