From be123fb35994e8ac90911aa8cc64d9fb29128741 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Wed, 17 Sep 2014 20:59:33 +0530 Subject: [PATCH] HBASE-8139 Allow job names to be overridden --- .../main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java | 4 +++- .../src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java | 6 ++++-- .../src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java | 6 +++++- .../src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java | 6 +++++- .../src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java | 2 +- .../src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java | 4 +++- .../src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java | 6 +++++- .../hadoop/hbase/mapreduce/replication/VerifyReplication.java | 4 +++- 8 files changed, 29 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index f4916ae..4137e2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -81,6 +81,8 @@ public class CellCounter extends Configured implements Tool { */ static final String NAME = "CellCounter"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * Mapper that runs the count. */ @@ -188,7 +190,7 @@ public class CellCounter extends Configured implements Tool { Path outputDir = new Path(args[1]); String reportSeparatorString = (args.length > 2) ? args[2]: ":"; conf.set("ReportSeparator", reportSeparatorString); - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(CellCounter.class); Scan scan = getConfiguredScanForJob(conf, args); TableMapReduceUtil.initTableMapperJob(tableName, scan, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index decb6a7..cf79a31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -55,7 +55,9 @@ public class CopyTable extends Configured implements Tool { static String peerAddress = null; static String families = null; static boolean allCells = false; - + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public CopyTable(Configuration conf) { super(conf); } @@ -72,7 +74,7 @@ public class CopyTable extends Configured implements Tool { if (!doCommandLine(args)) { return null; } - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(CopyTable.class); Scan scan = new Scan(); scan.setCacheBlocks(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java index fdff068..a91e3cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -58,6 +58,8 @@ public class Export extends Configured implements Tool { final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * Sets up the actual job. * @@ -70,7 +72,7 @@ public class Export extends Configured implements Tool { throws IOException { String tableName = args[0]; Path outputDir = new Path(args[1]); - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJobName(NAME + "_" + tableName); job.setJarByClass(Export.class); // Set optional scan parameters @@ -166,6 +168,8 @@ public class Export extends Configured implements Tool { System.err.println(" -D " + RAW_SCAN + "=true"); System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "="); System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "="); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the export"); System.err.println("For performance consider the following properties:\n" + " -Dhbase.client.scanner.caching=100\n" + " -Dmapreduce.map.speculative=false\n" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 7751c11..9737839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -78,6 +78,8 @@ public class Import extends Configured implements Tool { public final static String TABLE_NAME = "import.table.name"; public final static String WAL_DURABILITY = "import.wal.durability"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * A mapper that just writes out KeyValues. */ @@ -402,7 +404,7 @@ public class Import extends Configured implements Tool { String tableName = args[0]; conf.set(TABLE_NAME, tableName); Path inputDir = new Path(args[1]); - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(Importer.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(SequenceFileInputFormat.class); @@ -461,6 +463,8 @@ public class Import extends Configured implements Tool { + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;" + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" + " the KeyValue."); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the import"); System.err.println("For performance consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false\n" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 0748481..15c8fc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -416,7 +416,7 @@ public class ImportTsv extends Configured implements Tool { String tableName = args[0]; Path inputDir = new Path(args[1]); String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName); - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, jobName); job.setJarByClass(mapperClass); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(TextInputFormat.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index c0cf32c..18ddccd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -51,6 +51,8 @@ public class RowCounter extends Configured implements Tool { /** Name of this 'program'. */ static final String NAME = "rowcounter"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * Mapper that runs the count. */ @@ -115,7 +117,7 @@ public class RowCounter extends Configured implements Tool { } } - Job job = Job.getInstance(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 3f0cde4..23204ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -67,6 +67,8 @@ public class WALPlayer extends Configured implements Tool { final static String TABLES_KEY = "hlog.input.tables"; final static String TABLE_MAP_KEY = "hlog.input.tablesmap"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * A mapper that just writes out KeyValues. * This one can be used together with {@link KeyValueSortReducer} @@ -231,7 +233,7 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); - Job job = Job.getInstance(conf, NAME + "_" + inputDir); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir)); job.setJarByClass(WALPlayer.class); FileInputFormat.setInputPaths(job, inputDir); job.setInputFormatClass(HLogInputFormat.class); @@ -285,6 +287,8 @@ public class WALPlayer extends Configured implements Tool { System.err.println("Other options: (specify time range to WAL edit to consider)"); System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the wal player"); System.err.println("For performance also consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 7748675..959cd64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -74,6 +74,8 @@ public class VerifyReplication extends Configured implements Tool { static String families = null; static String peerId = null; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + /** * Map-only comparator for 2 tables */ @@ -207,7 +209,7 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); LOG.info("Peer Quorum Address: " + peerQuorumAddress); - Job job = new Job(conf, NAME + "_" + tableName); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(VerifyReplication.class); Scan scan = new Scan(); -- 1.9.2.msysgit.0