Index: src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (revision 1293084) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** @@ -43,9 +45,44 @@ */ public class Import { final static String NAME = "import"; - public static final String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; + final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; + final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; /** + * A mapper that just writes out KeyValues. + */ + static class KeyValueImporter + extends TableMapper { + private Map cfRenameMap; + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + for (KeyValue kv : value.raw()) { + context.write(row, convertKv(kv, cfRenameMap)); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + } + } + + /** * Write table content out to files in hdfs. */ static class Importer @@ -76,26 +113,7 @@ Put put = null; Delete delete = null; for (KeyValue kv : result.raw()) { - if(cfRenameMap != null) { - // If there's a rename mapping for this CF, create a new KeyValue - byte[] newCfName = cfRenameMap.get(kv.getFamily()); - if(newCfName != null) { - kv = new KeyValue(kv.getBuffer(), // row buffer - kv.getRowOffset(), // row offset - kv.getRowLength(), // row length - newCfName, // CF buffer - 0, // CF offset - newCfName.length, // CF length - kv.getBuffer(), // qualifier buffer - kv.getQualifierOffset(), // qualifier offset - kv.getQualifierLength(), // qualifier length - kv.getTimestamp(), // timestamp - KeyValue.Type.codeToType(kv.getType()), // KV Type - kv.getBuffer(), // value buffer - kv.getValueOffset(), // value offset - kv.getValueLength()); // value length - } - } + kv = convertKv(kv, cfRenameMap); // Deletes and Puts are gathered and written when finished if (kv.isDelete()) { if (delete == null) { @@ -119,26 +137,56 @@ @Override public void setup(Context context) { - // Make a map from sourceCfName to destCfName by parsing a config key - cfRenameMap = null; - String allMappingsPropVal = context.getConfiguration().get(CF_RENAME_PROP); - if(allMappingsPropVal != null) { - // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... - String[] allMappings = allMappingsPropVal.split(","); - for (String mapping: allMappings) { - if(cfRenameMap == null) { - cfRenameMap = new TreeMap(Bytes.BYTES_COMPARATOR); - } - String [] srcAndDest = mapping.split(":"); - if(srcAndDest.length != 2) { - continue; - } - cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); + cfRenameMap = createCfRenameMap(context.getConfiguration()); + } + } + + // helper: create a new KeyValue based on CF rename map + private static KeyValue convertKv(KeyValue kv, Map cfRenameMap) { + if(cfRenameMap != null) { + // If there's a rename mapping for this CF, create a new KeyValue + byte[] newCfName = cfRenameMap.get(kv.getFamily()); + if(newCfName != null) { + kv = new KeyValue(kv.getBuffer(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getBuffer(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getType()), // KV Type + kv.getBuffer(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength()); // value length + } + } + return kv; + } + + // helper: make a map from sourceCfName to destCfName by parsing a config key + private static Map createCfRenameMap(Configuration conf) { + Map cfRenameMap = null; + String allMappingsPropVal = conf.get(CF_RENAME_PROP); + if(allMappingsPropVal != null) { + // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... + String[] allMappings = allMappingsPropVal.split(","); + for (String mapping: allMappings) { + if(cfRenameMap == null) { + cfRenameMap = new TreeMap(Bytes.BYTES_COMPARATOR); } + String [] srcAndDest = mapping.split(":"); + if(srcAndDest.length != 2) { + continue; + } + cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); } } + return cfRenameMap; } - + /** *

Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells * the mapper how to rename column families. @@ -190,11 +238,25 @@ job.setJarByClass(Importer.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(Importer.class); - // No reducers. Just write straight to table. Call initTableReducerJob - // because it sets up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(tableName, null, job); - job.setNumReduceTasks(0); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + job.setMapperClass(KeyValueImporter.class); + HTable table = new HTable(conf, tableName); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat.configureIncrementalLoad(job, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + job.setMapperClass(Importer.class); + TableMapReduceUtil.initTableReducerJob(tableName, null, job); + job.setNumReduceTasks(0); + } return job; } @@ -205,7 +267,10 @@ if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - System.err.println("Usage: Import "); + System.err.println("Usage: Import [options] "); + System.err.println("By default Import will load data directly into HBase. To instead generate"); + System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); } /**