diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 50da9bc..8896eb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; public class TableInputFormat extends TableInputFormatBase implements Configurable { + @SuppressWarnings("hiding") private static final Log LOG = LogFactory.getLog(TableInputFormat.class); /** Job parameter that specifies the input table. */ @@ -112,13 +113,6 @@ implements Configurable { @Override public void setConf(Configuration configuration) { this.conf = configuration; - TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); - try { - // NOTE: This connection doesn't currently get closed explicit1ly. - initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } Scan scan = null; @@ -180,6 +174,16 @@ implements Configurable { setScan(scan); } + @Override + protected void initialize() { + TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); + try { + initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + /** * Parses a combined family and qualifier and adds either both or just the * family in case there is no qualifier. This assumes the older colon diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 3bf001b..4f06d31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -66,12 +66,10 @@ import org.apache.hadoop.util.StringUtils; *
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
*
+ * private JobConf job;
+ *
* public void configure(JobConf job) {
- * Connection connection =
- * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- * TableName tableName = TableName.valueOf("exampleTable");
- * // mandatory
- * initializeTable(connection, tableName);
+ * this.job = job;
* Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"),
* Bytes.toBytes("cf2") };
* // mandatory
@@ -80,6 +78,14 @@ import org.apache.hadoop.util.StringUtils;
* // optional
* setRowFilter(exampleFilter);
* }
+ *
+ * protected void initialize() {
+ * Connection connection =
+ * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ * TableName tableName = TableName.valueOf("exampleTable");
+ * // mandatory
+ * initializeTable(connection, tableName);
+ * }
*
* public void validateInput(JobConf job) throws IOException {
* }
@@ -105,13 +111,14 @@ extends InputFormat {
private RegionLocator regionLocator;
/** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null;
+ /** The underlying {@link Connection} of the table. */
+ private Connection connection;
+
/** The reverse DNS lookup cache mapping: IPAddress => HostName */
private HashMap reverseDNSCacheMap =
new HashMap();
- private Connection connection;
-
/**
* Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
* the default.
@@ -129,6 +136,10 @@ extends InputFormat {
InputSplit split, TaskAttemptContext context)
throws IOException {
if (table == null) {
+ initialize();
+ }
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
throw new IOException("Cannot create a record reader because of a" +
" previous error. Please look at the previous logs lines from" +
" the task's full log for more details.");
@@ -141,19 +152,13 @@ extends InputFormat {
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
- trr.setTable(table);
+ trr.setTable(getTable());
return new RecordReader() {
@Override
public void close() throws IOException {
trr.close();
- close(admin, table, regionLocator, connection);
- }
-
- private void close(Closeable... closables) throws IOException {
- for (Closeable c : closables) {
- if(c != null) { c.close(); }
- }
+ closeTable();
}
@Override
@@ -185,7 +190,7 @@ extends InputFormat {
}
protected Pair getStartEndKeys() throws IOException {
- return regionLocator.getStartEndKeys();
+ return getRegionLocator().getStartEndKeys();
}
/**
@@ -200,10 +205,18 @@ extends InputFormat {
*/
@Override
public List getSplits(JobContext context) throws IOException {
+ boolean closeOnFinish = false;
+
if (table == null) {
+ initialize();
+ closeOnFinish = true;
+ }
+ if (table == null) {
+ // initialize() wasn't implemented, so the table is null.
throw new IOException("No table was provided.");
}
+ try {
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin);
Pair keys = getStartEndKeys();
@@ -267,6 +280,11 @@ extends InputFormat {
}
}
return splits;
+ } finally {
+ if (closeOnFinish) {
+ closeTable();
+ }
+ }
}
public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException {
@@ -321,13 +339,16 @@ extends InputFormat {
*/
@Deprecated
protected HTable getHTable() {
- return (HTable) this.table;
+ return (HTable) this.getTable();
}
/**
* Allows subclasses to get the {@link RegionLocator}.
*/
protected RegionLocator getRegionLocator() {
+ if (regionLocator == null) {
+ initialize();
+ }
return regionLocator;
}
@@ -335,6 +356,9 @@ extends InputFormat {
* Allows subclasses to get the {@link Table}.
*/
protected Table getTable() {
+ if (table == null) {
+ initialize();
+ }
return table;
}
@@ -342,6 +366,9 @@ extends InputFormat {
* Allows subclasses to get the {@link Admin}.
*/
protected Admin getAdmin() {
+ if (admin == null) {
+ initialize();
+ }
return admin;
}
@@ -356,7 +383,8 @@ extends InputFormat {
protected void setHTable(HTable table) throws IOException {
this.table = table;
this.regionLocator = table.getRegionLocator();
- this.admin = table.getConnection().getAdmin();
+ this.connection = table.getConnection();
+ this.admin = this.connection.getAdmin();
}
/**
@@ -401,4 +429,34 @@ extends InputFormat {
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
+
+ /**
+ * This method will be called when any of the following are referenced, but not yet initialized:
+ * admin, regionLocator, table. Subclasses will have the opportunity to call
+ * {@link #initializeTable(Connection, TableName)}
+ */
+ protected void initialize() {
+
+ }
+
+ /**
+ * Close the Table and related objects that were initialized via
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * @throws IOException
+ */
+ protected void closeTable() throws IOException {
+ close(admin, table, regionLocator, connection);
+ admin = null;
+ table = null;
+ regionLocator = null;
+ connection = null;
+ }
+
+ private void close(Closeable... closables) throws IOException {
+ for (Closeable c : closables) {
+ if(c != null) { c.close(); }
+ }
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index cd69a5b..c46f41f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -79,15 +79,26 @@ implements Configurable {
/** The configuration. */
private Configuration conf = null;
- private Table table;
- private Connection connection;
-
/**
* Writes the reducer output to an HBase table.
*/
protected class TableRecordWriter
extends RecordWriter {
+ private Connection connection;
+ private Table table;
+
+ /**
+ * @throws IOException
+ *
+ */
+ public TableRecordWriter() throws IOException {
+ String tableName = conf.get(OUTPUT_TABLE);
+ this.connection = ConnectionFactory.createConnection(conf);
+ this.table = connection.getTable(TableName.valueOf(tableName));
+ this.table.setAutoFlushTo(false);
+ LOG.info("Created table instance for " + tableName);
+ }
/**
* Closes the writer, in this case flush table commits.
*
@@ -165,6 +176,7 @@ implements Configurable {
return new TableOutputCommitter();
}
+ @Override
public Configuration getConf() {
return conf;
}
@@ -193,10 +205,6 @@ implements Configurable {
if (zkClientPort != 0) {
this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
- this.connection = ConnectionFactory.createConnection(this.conf);
- this.table = connection.getTable(TableName.valueOf(tableName));
- this.table.setAutoFlushTo(false);
- LOG.info("Created table instance for " + tableName);
} catch(IOException e) {
LOG.error(e);
throw new RuntimeException(e);