Index: core/pom.xml
===================================================================
--- core/pom.xml (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d)
+++ core/pom.xml Mon Apr 26 11:28:59 CEST 2010
@@ -165,6 +165,10 @@
commons-cli
+ net.sf.jopt-simple
+ jopt-simple
+
+
commons-lang
commons-lang
${commons-lang.version}
Index: pom.xml
===================================================================
--- pom.xml (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d)
+++ pom.xml Sun Apr 25 21:27:25 CEST 2010
@@ -295,6 +295,7 @@
4.8.1
1.2.15
3.3.0
+ 3.2
@@ -305,6 +306,11 @@
${commons-cli.version}
+ net.sf.jopt-simple
+ jopt-simple
+ ${jopt.version}
+
+
commons-logging
commons-logging
${commons-logging.version}
Index: core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
===================================================================
--- core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (revision 18f6bf64f16f8b24bf3112df0c6339abbffa7c4d)
+++ core/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Tue Apr 27 07:40:01 CEST 2010
@@ -20,7 +20,15 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.Date;
+import static java.util.Arrays.*;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import static joptsimple.util.DateConverter.*;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -39,8 +47,17 @@
* back in again.
*/
public class Export {
+
final static String NAME = "export";
+ private static String tableName = null;
+ private static String outputDir = null;
+ private static Integer maxVersions = null;
+ private static Long startTime = null;
+ private static Long endTime = null;
+ private static Integer caching = null;
+ private static Boolean compress = null;
+
/**
* Mapper.
*/
@@ -70,48 +87,101 @@
* Sets up the actual job.
*
* @param conf The current configuration.
- * @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
- public static Job createSubmittableJob(Configuration conf, String[] args)
+ public static Job createSubmittableJob(Configuration conf)
throws IOException {
- String tableName = args[0];
- Path outputDir = new Path(args[1]);
+ Path outputPath = new Path(outputDir);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJobName(NAME + "_" + tableName);
job.setJarByClass(Exporter.class);
// TODO: Allow passing filter and subset of rows/columns.
- Scan s = new Scan();
+ Scan scan = new Scan();
// Optional arguments.
- int versions = args.length > 2? Integer.parseInt(args[2]): 1;
- s.setMaxVersions(versions);
- long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
- long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
- s.setTimeRange(startTime, endTime);
- Log.info("verisons=" + versions + ", starttime=" + startTime +
- ", endtime=" + endTime);
- TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null,
- null, job);
+ int versions = maxVersions != null ? maxVersions.intValue() : 1;
+ scan.setMaxVersions(versions);
+ long startRange = startTime != null ? startTime.longValue() : 0L;
+ long endRange = endTime != null ? endTime.longValue() : Long.MAX_VALUE;
+ scan.setTimeRange(startRange, endRange);
+ if (caching != null) {
+ scan.setCaching(caching.intValue());
+ }
+ Log.info("versions=" + versions + ", starttime=" + startRange +
+ ", endtime=" + endRange + ", caching=" + caching + ", compress=" +
+ compress);
+ TableMapReduceUtil.initTableMapperJob(tableName, scan, Exporter.class,
+ null, null, job);
// No reducers. Just write straight to output files.
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
- FileOutputFormat.setOutputPath(job, outputDir);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ if (compress != null && compress.booleanValue()) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job,
+ org.apache.hadoop.io.compress.GzipCodec.class);
+ }
return job;
}
- /*
- * @param errorMsg Error message. Can be null.
+ /**
+ * Parses the command line arguments.
+ *
+ * @param args The command line arguments.
+ * @return The parsed options as an OptionSet.
*/
- private static void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
+ private static void parseArgs(String[] args) throws IOException {
+ OptionParser parser = new OptionParser();
+ OptionSpec osTableName = parser.acceptsAll(
+ asList("t", "tablename"), "Table name").withRequiredArg();
+ OptionSpec osOutputDir = parser.acceptsAll(
+ asList("o", "outputdir"), "Output directory").withRequiredArg();
+ OptionSpec osMaxVersions = parser.acceptsAll(
+ asList("n", "versions"), "Maximum versions").
+ withRequiredArg().ofType(Integer.class);
+ OptionSpec osStartTime = parser.acceptsAll(
+ asList("s", "starttime"), "Start time as long value").
+ withRequiredArg().ofType(Long.class);
+ OptionSpec osEndTime = parser.acceptsAll(
+ asList("e", "endtime"), "End time as long value").
+ withRequiredArg().ofType(Long.class);
+ OptionSpec osStartDate = parser.accepts("startdate",
+ "Start date (alternative to --starttime)").
+ withRequiredArg().withValuesConvertedBy(datePattern("yyyyMMddHHmm"));
+ OptionSpec osEndDate = parser.accepts("enddate",
+ "End date (alternative to --endtime)").
+ withRequiredArg().withValuesConvertedBy(datePattern("yyyyMMddHHmm"));
+ OptionSpec osCaching = parser.acceptsAll(
+ asList("c", "caching"), "Number of rows for caching").
+ withRequiredArg().ofType(Integer.class);
+ OptionSpec osCompress = parser.acceptsAll(asList("z", "compress"),
+ "Enable compression of output files");
+ OptionSpec osHelp = parser.acceptsAll(asList( "h", "?", "help"),
+ "Show this help");
+ OptionSet options = parser.parse(args);
+ // check if help was invoked or params are missing
+ if (!options.has(osTableName) || !options.has(osOutputDir) ||
+ options.has(osHelp)) {
+ parser.printHelpOn(System.out);
+ System.exit(options.has(osHelp) ? 0 : -1);
}
- System.err.println("Usage: Export [ " +
- "[ []]]");
+ // Get everything needed later
+ tableName = osTableName.value(options);
+ outputDir = osOutputDir.value(options);
+ maxVersions = osMaxVersions.value(options);
+ startTime = osStartTime.value(options);
+ if (options.has(osStartDate)) {
+ startTime = osStartDate.value(options).getTime();
- }
+ }
+ endTime = osEndTime.value(options);
+ if (options.has(osEndDate)) {
+ endTime = osEndDate.value(options).getTime();
+ }
+ caching = osCaching.value(options);
+ compress = options.has(osCompress);
+ }
/**
* Main entry point.
@@ -122,11 +192,8 @@
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length < 2) {
- usage("Wrong number of arguments: " + otherArgs.length);
- System.exit(-1);
- }
- Job job = createSubmittableJob(conf, otherArgs);
+ parseArgs(otherArgs);
+ Job job = createSubmittableJob(conf);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}