Index: core/pom.xml =================================================================== --- core/pom.xml (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d) +++ core/pom.xml Mon Apr 26 11:28:59 CEST 2010 @@ -165,6 +165,10 @@ commons-cli + net.sf.jopt-simple + jopt-simple + + commons-lang commons-lang ${commons-lang.version} Index: pom.xml =================================================================== --- pom.xml (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d) +++ pom.xml Sun Apr 25 21:27:25 CEST 2010 @@ -295,6 +295,7 @@ 4.8.1 1.2.15 3.3.0 + 3.2 @@ -305,6 +306,11 @@ ${commons-cli.version} + net.sf.jopt-simple + jopt-simple + ${jopt.version} + + commons-logging commons-logging ${commons-logging.version} Index: core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java =================================================================== --- core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d) +++ core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Tue Apr 27 07:40:01 CEST 2010 @@ -20,7 +20,15 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.Date; +import static java.util.Arrays.*; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import static joptsimple.util.DateConverter.*; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -39,8 +47,17 @@ * back in again. */ public class Export { + final static String NAME = "export"; + private static String tableName = null; + private static String outputDir = null; + private static Integer maxVersions = null; + private static Long startTime = null; + private static Long endTime = null; + private static Integer caching = null; + private static Boolean compress = null; + /** * Mapper. */ @@ -70,48 +87,101 @@ * Sets up the actual job. * * @param conf The current configuration. - * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + public static Job createSubmittableJob(Configuration conf) throws IOException { - String tableName = args[0]; - Path outputDir = new Path(args[1]); + Path outputPath = new Path(outputDir); Job job = new Job(conf, NAME + "_" + tableName); job.setJobName(NAME + "_" + tableName); job.setJarByClass(Exporter.class); // TODO: Allow passing filter and subset of rows/columns. - Scan s = new Scan(); + Scan scan = new Scan(); // Optional arguments. - int versions = args.length > 2? Integer.parseInt(args[2]): 1; - s.setMaxVersions(versions); - long startTime = args.length > 3? Long.parseLong(args[3]): 0L; - long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; - s.setTimeRange(startTime, endTime); - Log.info("verisons=" + versions + ", starttime=" + startTime + - ", endtime=" + endTime); - TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null, - null, job); + int versions = maxVersions != null ? maxVersions.intValue() : 1; + scan.setMaxVersions(versions); + long startRange = startTime != null ? startTime.longValue() : 0L; + long endRange = endTime != null ? endTime.longValue() : Long.MAX_VALUE; + scan.setTimeRange(startRange, endRange); + if (caching != null) { + scan.setCaching(caching.intValue()); + } + Log.info("versions=" + versions + ", starttime=" + startRange + + ", endtime=" + endRange + ", caching=" + caching + ", compress=" + + compress); + TableMapReduceUtil.initTableMapperJob(tableName, scan, Exporter.class, + null, null, job); // No reducers. Just write straight to output files. job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Result.class); - FileOutputFormat.setOutputPath(job, outputDir); + FileOutputFormat.setOutputPath(job, outputPath); + if (compress != null && compress.booleanValue()) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, + org.apache.hadoop.io.compress.GzipCodec.class); + } return job; } - /* - * @param errorMsg Error message. Can be null. + /** + * Parses the command line arguments. + * + * @param args The command line arguments. + * @return The parsed options as an OptionSet. */ - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); + private static void parseArgs(String[] args) throws IOException { + OptionParser parser = new OptionParser(); + OptionSpec osTableName = parser.acceptsAll( + asList("t", "tablename"), "Table name").withRequiredArg(); + OptionSpec osOutputDir = parser.acceptsAll( + asList("o", "outputdir"), "Output directory").withRequiredArg(); + OptionSpec osMaxVersions = parser.acceptsAll( + asList("n", "versions"), "Maximum versions"). + withRequiredArg().ofType(Integer.class); + OptionSpec osStartTime = parser.acceptsAll( + asList("s", "starttime"), "Start time as long value"). + withRequiredArg().ofType(Long.class); + OptionSpec osEndTime = parser.acceptsAll( + asList("e", "endtime"), "End time as long value"). + withRequiredArg().ofType(Long.class); + OptionSpec osStartDate = parser.accepts("startdate", + "Start date (alternative to --starttime)"). + withRequiredArg().withValuesConvertedBy(datePattern("yyyyMMddHHmm")); + OptionSpec osEndDate = parser.accepts("enddate", + "End date (alternative to --endtime)"). + withRequiredArg().withValuesConvertedBy(datePattern("yyyyMMddHHmm")); + OptionSpec osCaching = parser.acceptsAll( + asList("c", "caching"), "Number of rows for caching"). + withRequiredArg().ofType(Integer.class); + OptionSpec osCompress = parser.acceptsAll(asList("z", "compress"), + "Enable compression of output files"); + OptionSpec osHelp = parser.acceptsAll(asList( "h", "?", "help"), + "Show this help"); + OptionSet options = parser.parse(args); + // check if help was invoked or params are missing + if (!options.has(osTableName) || !options.has(osOutputDir) || + options.has(osHelp)) { + parser.printHelpOn(System.out); + System.exit(options.has(osHelp) ? 0 : -1); } - System.err.println("Usage: Export [ " + - "[ []]]"); + // Get everything needed later + tableName = osTableName.value(options); + outputDir = osOutputDir.value(options); + maxVersions = osMaxVersions.value(options); + startTime = osStartTime.value(options); + if (options.has(osStartDate)) { + startTime = osStartDate.value(options).getTime(); - } + } + endTime = osEndTime.value(options); + if (options.has(osEndDate)) { + endTime = osEndDate.value(options).getTime(); + } + caching = osCaching.value(options); + compress = options.has(osCompress); + } /** * Main entry point. @@ -122,11 +192,8 @@ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - if (otherArgs.length < 2) { - usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); - } - Job job = createSubmittableJob(conf, otherArgs); + parseArgs(otherArgs); + Job job = createSubmittableJob(conf); System.exit(job.waitForCompletion(true)? 0 : 1); } }