From dcca538965b1cef49d52d61ef478d1c56001f5ba Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Fri, 13 Feb 2015 15:47:11 -0600 Subject: [PATCH] HBASE-13028 Cleanup MapReduce InputFormats --- .../hadoop/hbase/mapred/TableInputFormat.java | 18 +- .../hadoop/hbase/mapred/TableInputFormatBase.java | 186 +++++++++++++++++--- .../hadoop/hbase/mapreduce/TableInputFormat.java | 4 +- .../hbase/mapreduce/TableInputFormatBase.java | 119 ++++++++----- .../hadoop/hbase/mapred/TestTableInputFormat.java | 72 +++++++- .../hbase/mapreduce/TestTableInputFormat.java | 63 +++++-- 6 files changed, 364 insertions(+), 98 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java index 368510fd78ac2dad434e1d56cb10a63fde71144b..814daeaa1c7d8838c6d62923495be0cc93d495b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; @@ -50,6 +49,15 @@ public class TableInputFormat extends TableInputFormatBase implements public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; public void configure(JobConf job) { + try { + initialize(job); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + @Override + protected void initialize(JobConf job) throws IOException { Path[] tableNames = FileInputFormat.getInputPaths(job); String colArg = job.get(COLUMN_LIST); String[] colNames = colArg.split(" "); @@ -58,12 +66,8 @@ public class TableInputFormat extends TableInputFormatBase implements m_cols[i] = Bytes.toBytes(colNames[i]); } setInputColumns(m_cols); - try { - Connection connection = ConnectionFactory.createConnection(job); - setHTable((HTable) connection.getTable(TableName.valueOf(tableNames[0].getName()))); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } + Connection connection = ConnectionFactory.createConnection(job); + initializeTable(connection, TableName.valueOf(tableNames[0].getName())); } public void validateInput(JobConf job) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java index d98b5f4b522d11515487f94e01596fac01e941ad..b5b79d2d49a1df94140aa5a546095e9369471176 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.mapred; +import java.io.Closeable; import java.io.IOException; import org.apache.commons.logging.Log; @@ -25,6 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; @@ -40,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter; * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a * byte[] of input columns and optionally a {@link Filter}. * Subclasses may use other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * *

* An example of a subclass: *

- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *   class ExampleTIF extends TableInputFormatBase {
  *
  *     {@literal @}Override
- *     public void configure(JobConf job) {
- *       try {
- *         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
- *           Bytes.toBytes("exampleTable"));
- *         // mandatory
- *         setHTable(exampleTable);
- *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *           Bytes.toBytes("columnB") };
- *         // mandatory
- *         setInputColumns(inputColumns);
- *         // optional, by default we'll get everything for the given columns.
- *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- *         setRowFilter(exampleFilter);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to configure for job.", exception);
- *       }
+ *     protected void initialize(JobConf context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we hand it over in
+ *       // initializeTable.
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ *       initializeTable(connection, tableName);
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       // optional, by default we'll get everything for the given columns.
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ *       setRowFilter(exampleFilter);
  *     }
  *   }
  * 
@@ -74,9 +84,17 @@ implements InputFormat { private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); private byte [][] inputColumns; private HTable table; + private Connection connection; private TableRecordReader tableRecordReader; private Filter rowFilter; + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + /** * Builds a TableRecordReader. If no TableRecordReader was provided, uses * the default. @@ -87,19 +105,63 @@ implements InputFormat { public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { + // In case a subclass uses the deprecated approach or calls initializeTable directly + if (table == null) { + initialize(job); + } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + TableSplit tSplit = (TableSplit) split; - TableRecordReader trr = this.tableRecordReader; // if no table record reader was provided use default - if (trr == null) { - trr = new TableRecordReader(); - } + final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : + this.tableRecordReader; trr.setStartRow(tSplit.getStartRow()); trr.setEndRow(tSplit.getEndRow()); trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); trr.setRowFilter(this.rowFilter); trr.init(); - return trr; + return new RecordReader() { + + @Override + public void close() throws IOException { + trr.close(); + closeTable(); + } + + @Override + public ImmutableBytesWritable createKey() { + return trr.createKey(); + } + + @Override + public Result createValue() { + return trr.createValue(); + } + + @Override + public long getPos() throws IOException { + return trr.getPos(); + } + + @Override + public float getProgress() throws IOException { + return trr.getProgress(); + } + + @Override + public boolean next(ImmutableBytesWritable key, Result value) throws IOException { + return trr.next(key, value); + } + }; } /** @@ -123,8 +185,18 @@ implements InputFormat { */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { if (this.table == null) { - throw new IOException("No table was provided"); + initialize(job); } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + byte [][] startKeys = this.table.getStartKeys(); if (startKeys == null || startKeys.length == 0) { throw new IOException("Expecting at least one region"); @@ -152,6 +224,22 @@ implements InputFormat { } /** + * Allows subclasses to initialize the table information. + * + * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. + * @param tableName The {@link TableName} of the table to process. + * @throws IOException + */ + protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (table != null || connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } + this.table = (HTable) connection.getTable(tableName); + this.connection = connection; + } + + /** * @param inputColumns to be passed in {@link Result} to the map task. */ protected void setInputColumns(byte [][] inputColumns) { @@ -160,8 +248,20 @@ implements InputFormat { /** * Allows subclasses to get the {@link HTable}. + * @deprecated use {@link #getTable()} + */ + @Deprecated + protected HTable getHTable() { + return (HTable) getTable(); + } + + /** + * Allows subclasses to get the {@link Table}. */ - protected Table getHTable() { + protected Table getTable() { + if (table == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } return this.table; } @@ -169,7 +269,9 @@ implements InputFormat { * Allows subclasses to set the {@link HTable}. * * @param table to get the data from + * @deprecated use {@link #initializeTable(Connection,TableName)} */ + @Deprecated protected void setHTable(HTable table) { this.table = table; } @@ -192,4 +294,40 @@ implements InputFormat { protected void setRowFilter(Filter rowFilter) { this.rowFilter = rowFilter; } + + /** + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * + */ + protected void initialize(JobConf job) throws IOException { + } + + /** + * Close the Table and related objects that were initialized via + * {@link #initializeTable(Connection, TableName)}. + * + * @throws IOException + */ + protected void closeTable() throws IOException { + close(table, connection); + table = null; + connection = null; + } + + private void close(Closeable... closables) throws IOException { + for (Closeable c : closables) { + if(c != null) { c.close(); } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 8896eb0838965b0a1249732de942d75e3730990f..bc2537b7517fa40cab37e5f4b8686074febf0ffc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -175,7 +175,9 @@ implements Configurable { } @Override - protected void initialize() { + protected void initialize(JobContext context) throws IOException { + // Do we have to worry about mis-matches between the Configuration from setConf and the one + // in this context? TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); try { initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index adfe493096dd21ca7f27f9c4cc2ef0f95a830d40..6c42d7f446a05b87549bcca173c0d410bd270ae6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -64,16 +64,28 @@ import org.apache.hadoop.util.StringUtils; * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, * an {@link Scan} instance that defines the input columns etc. Subclasses may use * other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * *

* An example of a subclass: *

- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
- *
- *     private JobConf job;
+ *   class ExampleTIF extends TableInputFormatBase {
  *
  *     {@literal @}Override
- *     public void configure(JobConf job) {
- *       this.job = job;
+ *     protected void initialize(JobContext context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we hand it over in
+ *       // initializeTable.
+ *       Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+ *              job.getConfiguration()));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
+ *       initializeTable(connection, tableName);
  *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
  *         Bytes.toBytes("columnB") };
  *       // optional, by default we'll get everything for the table.
@@ -85,23 +97,6 @@ import org.apache.hadoop.util.StringUtils;
  *       scan.setFilter(exampleFilter);
  *       setScan(scan);
  *     }
- *
- *     {@literal @}Override
- *     protected void initialize() {
- *       if (job == null) {
- *         throw new IllegalStateException("must have already gotten the JobConf before " +
- *             "initialize is called.");
- *       }
- *       try {
- *         Connection connection =
- *            ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *         TableName tableName = TableName.valueOf("exampleTable");
- *         // mandatory
- *         initializeTable(connection, tableName);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to initialize.", exception);
- *       }
- *     }
  *   }
  * 
*/ @@ -122,6 +117,13 @@ extends InputFormat { final Log LOG = LogFactory.getLog(TableInputFormatBase.class); + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + /** Holds the details for the internal scanner. * * @see Scan */ @@ -158,14 +160,18 @@ extends InputFormat { public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { + // Just in case a subclass is relying on JobConfigurable magic. if (table == null) { - initialize(); + initialize(context); } - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException("Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."); + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); } TableSplit tSplit = (TableSplit) split; LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); @@ -230,14 +236,20 @@ extends InputFormat { public List getSplits(JobContext context) throws IOException { boolean closeOnFinish = false; + // Just in case a subclass is relying on JobConfigurable magic. if (table == null) { - initialize(); + initialize(context); closeOnFinish = true; } - if (getTable() == null) { - // initialize() wasn't implemented, so the table is null. - throw new IOException("No table was provided."); + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); } try { @@ -334,6 +346,10 @@ extends InputFormat { } } + /** + * @deprecated mistakenly made public in 0.98.7. scope will change to package-private + */ + @Deprecated public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { @@ -366,7 +382,7 @@ extends InputFormat { * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( * org.apache.hadoop.mapreduce.JobContext) */ - public List calculateRebalancedSplits(List list, JobContext context, + private List calculateRebalancedSplits(List list, JobContext context, long average) throws IOException { List resultList = new ArrayList(); Configuration conf = context.getConfiguration(); @@ -440,6 +456,7 @@ extends InputFormat { * @param isText It determines to use text key mode or binary key mode * @return The split point in the region. */ + @InterfaceAudience.Private public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { byte upperLimitByte; byte lowerLimitByte; @@ -519,8 +536,6 @@ extends InputFormat { } /** - * - * * Test if the given region is to be included in the InputSplit while splitting * the regions of a table. *

@@ -547,7 +562,7 @@ extends InputFormat { /** * Allows subclasses to get the {@link HTable}. * - * @deprecated + * @deprecated use {@link #getTable()} */ @Deprecated protected HTable getHTable() { @@ -559,7 +574,7 @@ extends InputFormat { */ protected RegionLocator getRegionLocator() { if (regionLocator == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return regionLocator; } @@ -569,7 +584,7 @@ extends InputFormat { */ protected Table getTable() { if (table == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return table; } @@ -579,7 +594,7 @@ extends InputFormat { */ protected Admin getAdmin() { if (admin == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return admin; } @@ -587,6 +602,9 @@ extends InputFormat { /** * Allows subclasses to set the {@link HTable}. * + * Will attempt to reuse the underlying Connection for our own needs, including + * retreiving an Admin interface to the HBase cluster. + * * @param table The table to get the data from. * @throws IOException * @deprecated Use {@link #initializeTable(Connection, TableName)} instead. @@ -623,6 +641,10 @@ extends InputFormat { * @throws IOException */ protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (table != null || connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } this.table = connection.getTable(tableName); this.regionLocator = connection.getRegionLocator(tableName); this.admin = connection.getAdmin(); @@ -659,12 +681,21 @@ extends InputFormat { } /** - * This method will be called when any of the following are referenced, but not yet initialized: - * admin, regionLocator, table. Subclasses will have the opportunity to call - * {@link #initializeTable(Connection, TableName)} + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * */ - protected void initialize() { - + protected void initialize(JobContext context) throws IOException { } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java index 234a2e801702db6939687ef4209f4d65e9e39034..d7dd8ec0612aeb10c7d1d5b96e2788fb7a7a7bd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java @@ -36,6 +36,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; @@ -322,8 +325,30 @@ public class TestTableInputFormat { LOG.info("testing use of an InputFormat taht extends InputFormatBase"); final Table table = createTable(Bytes.toBytes("exampleTable"), new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleTIF.class); + } + + @Test + public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "as it was given in 0.98."); + final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleDeprecatedTIF.class); + } + + @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + void testInputFormat(Class clazz) throws IOException { final JobConf job = MapreduceTestingShim.getJobConf(mrCluster); - job.setInputFormat(ExampleTIF.class); + job.setInputFormat(clazz); job.setOutputFormat(NullOutputFormat.class); job.setMapperClass(ExampleVerifier.class); job.setNumReduceTasks(0); @@ -373,13 +398,13 @@ public class TestTableInputFormat { } - public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable { + public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { @Override public void configure(JobConf job) { try { HTable exampleTable = new HTable(HBaseConfiguration.create(job), - Bytes.toBytes("exampleTable")); + Bytes.toBytes("exampleDeprecatedTable")); // mandatory setHTable(exampleTable); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), @@ -396,5 +421,46 @@ public class TestTableInputFormat { } + public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + initialize(job); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleJobConfigurableTable"); + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleTable"); + } + + protected void initialize(JobConf job, String table) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf(table); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + // mandatory + setInputColumns(inputColumns); + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + // optional + setRowFilter(exampleFilter); + } + + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java index 26029619e46fba3675d0aa8f90daf3eabbfd3ddb..566a6429c91ae37cf020f4535c3a03f1e662407d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java @@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.AfterClass; @@ -343,6 +344,16 @@ public class TestTableInputFormat { } @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() + throws IOException, InterruptedException, ClassNotFoundException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + @Test public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException, InterruptedException, ClassNotFoundException { LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + @@ -422,13 +433,43 @@ public class TestTableInputFormat { } - public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable { - private JobConf job; + public static class ExampleJobConfigurableTIF extends TableInputFormatBase + implements JobConfigurable { @Override public void configure(JobConf job) { - this.job = job; + try { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + //optional + Scan scan = new Scan(); + for (byte[] family : inputColumns) { + scan.addFamily(family); + } + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + scan.setFilter(exampleFilter); + setScan(scan); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobContext job) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( + job.getConfiguration())); + TableName tableName = TableName.valueOf("exampleTable"); + // mandatory + initializeTable(connection, tableName); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; //optional @@ -441,22 +482,6 @@ public class TestTableInputFormat { setScan(scan); } - @Override - protected void initialize() { - if (job == null) { - throw new IllegalStateException("must have already gotten the JobConf before initialize " + - "is called."); - } - try { - Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); - TableName tableName = TableName.valueOf("exampleTable"); - // mandatory - initializeTable(connection, tableName); - } catch (IOException exception) { - throw new RuntimeException("Failed to initialize.", exception); - } - } - } } -- 1.7.10.2 (Apple Git-33)