From 7404a3b6b81f1cb23eb3ffda32e6af873ac16d30 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Wed, 30 Jan 2019 15:47:33 -0800 Subject: [PATCH] HBASE-21773 - rowcounter utility should respond to pleas for help Change-Id: I6a10c413bc3fd414b1dddc8483919b4e06f45458 --- .../hadoop/hbase/mapreduce/RowCounter.java | 219 ++++++++++-------- .../hbase/mapreduce/TestRowCounter.java | 82 ++++--- 2 files changed, 181 insertions(+), 120 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 7fa5dec5ef..892f774312 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -24,12 +24,17 @@ import java.util.ArrayList; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterBase; @@ -40,15 +45,13 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; /** * A job with a just a map phase to count rows. Map outputs table rows IF the * input row has columns that have content. */ @InterfaceAudience.Public -public class RowCounter extends Configured implements Tool { +public class RowCounter extends AbstractHBaseTool { private static final Logger LOG = LoggerFactory.getLogger(RowCounter.class); @@ -58,6 +61,18 @@ public class RowCounter extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; + private final static String OPT_START_TIME = "starttime"; + private final static String OPT_END_TIME = "endtime"; + private final static String OPT_RANGE = "range"; + private final static String OPT_EXPECTED_COUNT = "expectedCount"; + + private String tableName; + private List rowRangeList; + private long startTime; + private long endTime; + private long expectedCount; + private List columns = new ArrayList<>(); + /** * Mapper that runs the count. */ @@ -89,75 +104,31 @@ public class RowCounter extends Configured implements Tool { * Sets up the actual job. * * @param conf The current configuration. - * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - String tableName = args[0]; - List rowRangeList = null; - long startTime = 0; - long endTime = 0; - - StringBuilder sb = new StringBuilder(); - - final String rangeSwitch = "--range="; - final String startTimeArgKey = "--starttime="; - final String endTimeArgKey = "--endtime="; - final String expectedCountArg = "--expected-count="; - - // First argument is table name, starting from second - for (int i = 1; i < args.length; i++) { - if (args[i].startsWith(rangeSwitch)) { - try { - rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); - } catch (IllegalArgumentException e) { - return null; - } - continue; - } - if (args[i].startsWith(startTimeArgKey)) { - startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); - continue; - } - if (args[i].startsWith(endTimeArgKey)) { - endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); - continue; - } - if (args[i].startsWith(expectedCountArg)) { - conf.setLong(EXPECTED_COUNT_KEY, - Long.parseLong(args[i].substring(expectedCountArg.length()))); - continue; - } - // if no switch, assume column names - sb.append(args[i]); - sb.append(" "); - } - if (endTime < startTime) { - printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); - return null; - } - + public Job createSubmittableJob(Configuration conf) throws IOException { Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); setScanFilter(scan, rowRangeList); - if (sb.length() > 0) { - for (String columnName : sb.toString().trim().split(" ")) { - String family = StringUtils.substringBefore(columnName, ":"); - String qualifier = StringUtils.substringAfter(columnName, ":"); - if (StringUtils.isBlank(qualifier)) { - scan.addFamily(Bytes.toBytes(family)); - } - else { - scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); - } + for (String columnName : this.columns) { + String family = StringUtils.substringBefore(columnName, ":"); + String qualifier = StringUtils.substringAfter(columnName, ":"); + if (StringUtils.isBlank(qualifier)) { + scan.addFamily(Bytes.toBytes(family)); + } else { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); } } - scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + + if(this.expectedCount >= 0) { + conf.setLong(EXPECTED_COUNT_KEY, this.expectedCount); + } + + scan.setTimeRange(startTime, endTime); job.setOutputFormatClass(NullOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); @@ -165,15 +136,12 @@ public class RowCounter extends Configured implements Tool { return job; } - private static List parseRowRangeParameter( - String arg, String rangeSwitch) { - final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); + private static List parseRowRangeParameter(String arg) { + final String[] ranges = arg.split(";"); final List rangeList = new ArrayList<>(); for (String range : ranges) { String[] startEnd = range.split(",", 2); if (startEnd.length != 2 || startEnd[1].contains(",")) { - printUsage("Please specify range in such format as \"--range=a,b\" " + - "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); throw new IllegalArgumentException("Wrong range specification: " + range); } String startKey = startEnd[0]; @@ -208,34 +176,87 @@ public class RowCounter extends Configured implements Tool { } } - /* - * @param errorMessage Can attach a message when error occurs. - */ - private static void printUsage(String errorMessage) { - System.err.println("ERROR: " + errorMessage); - printUsage(); + @Override + protected void printUsage() { + StringBuilder footerBuilder = new StringBuilder(); + footerBuilder.append("For performance, consider the following configuration properties:\n"); + footerBuilder.append("-Dhbase.client.scanner.caching=100\n"); + footerBuilder.append("-Dmapreduce.map.speculative=false\n"); + printUsage("hbase rowcounter [options] [ ...]", + "Options:", footerBuilder.toString()); } - /** - * Prints usage without error message. - * Note that we don't document --expected-count, because it's intended for test. - */ - private static void printUsage() { - System.err.println("Usage: hbase rowcounter [options] " - + "[--starttime= --endtime=] " - + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]"); - System.err.println("For performance consider the following options:\n" - + "-Dhbase.client.scanner.caching=100\n" - + "-Dmapreduce.map.speculative=false"); + @Override + protected void printUsage(final String usageStr, final String usageHeader, + final String usageFooter) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(120); + helpFormatter.setOptionComparator(new AbstractHBaseTool.OptionsOrderComparator()); + helpFormatter.setLongOptSeparator("="); + helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); } @Override - public int run(String[] args) throws Exception { - if (args.length < 1) { - printUsage("Wrong number of parameters: " + args.length); - return -1; + protected void addOptions() { + Option startTimeOption = Option.builder(null).valueSeparator('=').hasArg(true). + desc("starting time filter to start counting rows from.").longOpt(OPT_START_TIME).build(); + Option endTimeOption = Option.builder(null).valueSeparator('=').hasArg(true). + desc("end time filter limit, to only count rows up to this timestamp."). + longOpt(OPT_END_TIME).build(); + Option rangeOption = Option.builder(null).valueSeparator('=').hasArg(true). + desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); + Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true). + desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); + addOption(startTimeOption); + addOption(endTimeOption); + addOption(rangeOption); + addOption(expectedOption); + } + + @Override + protected void processOptions(CommandLine cmd) throws IllegalArgumentException{ + this.tableName = cmd.getArgList().get(0); + if(cmd.getOptionValue(OPT_RANGE)!=null) { + this.rowRangeList = parseRowRangeParameter(cmd.getOptionValue(OPT_RANGE)); + } + this.endTime = cmd.getOptionValue(OPT_END_TIME) == null ? HConstants.LATEST_TIMESTAMP : + Long.parseLong(cmd.getOptionValue(OPT_END_TIME)); + this.expectedCount = cmd.getOptionValue(OPT_EXPECTED_COUNT) == null ? Long.MIN_VALUE : + Long.parseLong(cmd.getOptionValue(OPT_EXPECTED_COUNT)); + this.startTime = cmd.getOptionValue(OPT_START_TIME) == null ? 0 : + Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); + + for(int i=1; i " - + "[--starttime= --endtime=] " - + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [ ...]")); - assertTrue(usage.contains("For performance consider the following options:")); - assertTrue(usage.contains("-Dhbase.client.scanner.caching=100")); - assertTrue(usage.contains("-Dmapreduce.map.speculative=false")); + assertTrue(usage.contains("usage: hbase rowcounter " + + " [options] [ ...]")); + assertTrue(usage.contains("Options:\n")); + assertTrue(usage.contains("--starttime= " + + "starting time filter to start counting rows from.\n")); + assertTrue(usage.contains("--endtime= " + + "end time filter limit, to only count rows up to this timestamp.\n")); + assertTrue(usage.contains("--range= " + + "[startKey],[endKey][;[startKey],[endKey]...]]\n")); + assertTrue(usage.contains("--expectedCount= expected number of rows to be count.\n")); + assertTrue(usage.contains("For performance, " + + "consider the following configuration properties:\n")); + assertTrue(usage.contains("-Dhbase.client.scanner.caching=100\n")); + assertTrue(usage.contains("-Dmapreduce.map.speculative=false\n")); } } -- 2.17.2 (Apple Git-113)