diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 50da9bc..8896eb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; public class TableInputFormat extends TableInputFormatBase implements Configurable { + @SuppressWarnings("hiding") private static final Log LOG = LogFactory.getLog(TableInputFormat.class); /** Job parameter that specifies the input table. */ @@ -112,13 +113,6 @@ implements Configurable { @Override public void setConf(Configuration configuration) { this.conf = configuration; - TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); - try { - // NOTE: This connection doesn't currently get closed explicit1ly. - initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } Scan scan = null; @@ -180,6 +174,16 @@ implements Configurable { setScan(scan); } + @Override + protected void initialize() { + TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); + try { + initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + /** * Parses a combined family and qualifier and adds either both or just the * family in case there is no qualifier. This assumes the older colon diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index d6e814d..6ab7ba8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -67,12 +67,10 @@ import org.apache.hadoop.util.StringUtils; *
* class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
*
+ * private JobConf job;
+ *
* public void configure(JobConf job) {
- * Connection connection =
- * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- * TableName tableName = TableName.valueOf("exampleTable");
- * // mandatory
- * initializeTable(connection, tableName);
+ * this.job = job;
* Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"),
* Bytes.toBytes("cf2") };
* // mandatory
@@ -81,6 +79,14 @@ import org.apache.hadoop.util.StringUtils;
* // optional
* setRowFilter(exampleFilter);
* }
+ *
+ * protected void initialize() {
+ * Connection connection =
+ * ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ * TableName tableName = TableName.valueOf("exampleTable");
+ * // mandatory
+ * initializeTable(connection, tableName);
+ * }
*
* public void validateInput(JobConf job) throws IOException {
* }
@@ -116,13 +122,14 @@ extends InputFormat {
private RegionLocator regionLocator;
/** The reader scanning the table, can be a custom one. */
private TableRecordReader tableRecordReader = null;
+ /** The underlying {@link Connection} of the table. */
+ private Connection connection;
+
/** The reverse DNS lookup cache mapping: IPAddress => HostName */
private HashMap reverseDNSCacheMap =
new HashMap();
- private Connection connection;
-
/**
* Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
* the default.
@@ -140,6 +147,10 @@ extends InputFormat {
InputSplit split, TaskAttemptContext context)
throws IOException {
if (table == null) {
+ initialize();
+ }
+ if (getTable() == null) {
+ // initialize() must not have been implemented in the subclass.
throw new IOException("Cannot create a record reader because of a" +
" previous error. Please look at the previous logs lines from" +
" the task's full log for more details.");
@@ -152,19 +163,13 @@ extends InputFormat {
sc.setStartRow(tSplit.getStartRow());
sc.setStopRow(tSplit.getEndRow());
trr.setScan(sc);
- trr.setTable(table);
+ trr.setTable(getTable());
return new RecordReader() {
@Override
public void close() throws IOException {
trr.close();
- close(admin, table, regionLocator, connection);
- }
-
- private void close(Closeable... closables) throws IOException {
- for (Closeable c : closables) {
- if(c != null) { c.close(); }
- }
+ closeTable();
}
@Override
@@ -196,7 +201,7 @@ extends InputFormat {
}
protected Pair getStartEndKeys() throws IOException {
- return regionLocator.getStartEndKeys();
+ return getRegionLocator().getStartEndKeys();
}
/**
@@ -211,91 +216,109 @@ extends InputFormat {
*/
@Override
public List getSplits(JobContext context) throws IOException {
+ boolean closeOnFinish = false;
+
if (table == null) {
- throw new IOException("No table was provided.");
+ initialize();
+ closeOnFinish = true;
}
- RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin);
-
- Pair keys = getStartEndKeys();
- if (keys == null || keys.getFirst() == null ||
- keys.getFirst().length == 0) {
- HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
- List splits = new ArrayList(1);
- long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
- TableSplit split = new TableSplit(table.getName(),
- HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
- splits.add(split);
- return splits;
+ if (getTable() == null) {
+ // initialize() wasn't implemented, so the table is null.
+ throw new IOException("No table was provided.");
}
- List splits = new ArrayList(keys.getFirst().length);
- for (int i = 0; i < keys.getFirst().length; i++) {
- if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
- continue;
- }
- HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false);
- // The below InetSocketAddress creation does a name resolution.
- InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
- if (isa.isUnresolved()) {
- LOG.warn("Failed resolve " + isa);
- }
- InetAddress regionAddress = isa.getAddress();
- String regionLocation;
- try {
- regionLocation = reverseDNS(regionAddress);
- } catch (NamingException e) {
- LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
- regionLocation = location.getHostname();
- }
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 ||
- Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
- byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
- keys.getFirst()[i] : startRow;
- byte[] splitStop = (stopRow.length == 0 ||
- Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
- keys.getSecond()[i] : stopRow;
-
- byte[] regionName = location.getRegionInfo().getRegionName();
- long regionSize = sizeCalculator.getRegionSize(regionName);
- TableSplit split = new TableSplit(table.getName(),
- splitStart, splitStop, regionLocation, regionSize);
+ try {
+ RegionSizeCalculator sizeCalculator =
+ new RegionSizeCalculator(getRegionLocator(), getAdmin());
+
+ TableName tableName = getTable().getName();
+
+ Pair keys = getStartEndKeys();
+ if (keys == null || keys.getFirst() == null ||
+ keys.getFirst().length == 0) {
+ HRegionLocation regLoc =
+ getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+ if (null == regLoc) {
+ throw new IOException("Expecting at least one region.");
+ }
+ List splits = new ArrayList(1);
+ long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+ TableSplit split = new TableSplit(tableName,
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+ .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getSplits: split -> " + i + " -> " + split);
+ return splits;
+ }
+ List splits = new ArrayList(keys.getFirst().length);
+ for (int i = 0; i < keys.getFirst().length; i++) {
+ if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ continue;
+ }
+ HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
+ // The below InetSocketAddress creation does a name resolution.
+ InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
+ if (isa.isUnresolved()) {
+ LOG.warn("Failed resolve " + isa);
+ }
+ InetAddress regionAddress = isa.getAddress();
+ String regionLocation;
+ try {
+ regionLocation = reverseDNS(regionAddress);
+ } catch (NamingException e) {
+ LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
+ regionLocation = location.getHostname();
+ }
+
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+ // determine if the given start an stop key fall into the region
+ if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+ Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+ (stopRow.length == 0 ||
+ Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+ byte[] splitStart = startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+ keys.getFirst()[i] : startRow;
+ byte[] splitStop = (stopRow.length == 0 ||
+ Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+ keys.getSecond()[i].length > 0 ?
+ keys.getSecond()[i] : stopRow;
+
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ long regionSize = sizeCalculator.getRegionSize(regionName);
+ TableSplit split = new TableSplit(tableName,
+ splitStart, splitStop, regionLocation, regionSize);
+ splits.add(split);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getSplits: split -> " + i + " -> " + split);
+ }
}
}
- }
- //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
- boolean enableAutoBalance = context.getConfiguration()
- .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
- if (enableAutoBalance) {
- long totalRegionSize=0;
- for (int i = 0; i < splits.size(); i++){
- TableSplit ts = (TableSplit)splits.get(i);
- totalRegionSize += ts.getLength();
+ //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
+ boolean enableAutoBalance = context.getConfiguration()
+ .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
+ if (enableAutoBalance) {
+ long totalRegionSize=0;
+ for (int i = 0; i < splits.size(); i++){
+ TableSplit ts = (TableSplit)splits.get(i);
+ totalRegionSize += ts.getLength();
+ }
+ long averageRegionSize = totalRegionSize / splits.size();
+ // the averageRegionSize must be positive.
+ if (averageRegionSize <= 0) {
+ LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
+ "set it to 1.");
+ averageRegionSize = 1;
+ }
+ return calculateRebalancedSplits(splits, context, averageRegionSize);
+ } else {
+ return splits;
}
- long averageRegionSize = totalRegionSize / splits.size();
- // the averageRegionSize must be positive.
- if (averageRegionSize <= 0) {
- LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
- "set it to 1.");
- averageRegionSize = 1;
+ } finally {
+ if (closeOnFinish) {
+ closeTable();
}
- return calculateRebalancedSplits(splits, context, averageRegionSize);
- } else {
- return splits;
}
}
@@ -343,6 +366,7 @@ extends InputFormat {
int count = 0;
while (count < list.size()) {
TableSplit ts = (TableSplit)list.get(count);
+ TableName tableName = ts.getTable();
String regionLocation = ts.getRegionLocation();
long regionSize = ts.getLength();
if (regionSize >= dataSkewThreshold) {
@@ -351,9 +375,9 @@ extends InputFormat {
byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
//Set the size of child TableSplit as 1/2 of the region size. The exact size of the
// MapReduce input splits is not far off.
- TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
+ TableSplit t1 = new TableSplit(tableName, ts.getStartRow(), splitKey, regionLocation,
regionSize / 2);
- TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
+ TableSplit t2 = new TableSplit(tableName, splitKey, ts.getEndRow(), regionLocation,
regionSize - regionSize / 2);
resultList.add(t1);
resultList.add(t2);
@@ -380,7 +404,7 @@ extends InputFormat {
break;
}
}
- TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
+ TableSplit t = new TableSplit(tableName, splitStartKey, splitEndKey,
regionLocation, totalSize);
resultList.add(t);
}
@@ -515,13 +539,16 @@ extends InputFormat {
*/
@Deprecated
protected HTable getHTable() {
- return (HTable) this.table;
+ return (HTable) this.getTable();
}
/**
* Allows subclasses to get the {@link RegionLocator}.
*/
protected RegionLocator getRegionLocator() {
+ if (regionLocator == null) {
+ initialize();
+ }
return regionLocator;
}
@@ -529,6 +556,9 @@ extends InputFormat {
* Allows subclasses to get the {@link Table}.
*/
protected Table getTable() {
+ if (table == null) {
+ initialize();
+ }
return table;
}
@@ -536,6 +566,9 @@ extends InputFormat {
* Allows subclasses to get the {@link Admin}.
*/
protected Admin getAdmin() {
+ if (admin == null) {
+ initialize();
+ }
return admin;
}
@@ -550,7 +583,8 @@ extends InputFormat {
protected void setHTable(HTable table) throws IOException {
this.table = table;
this.regionLocator = table.getRegionLocator();
- this.admin = table.getConnection().getAdmin();
+ this.connection = table.getConnection();
+ this.admin = this.connection.getAdmin();
}
/**
@@ -595,4 +629,34 @@ extends InputFormat {
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
+
+ /**
+ * This method will be called when any of the following are referenced, but not yet initialized:
+ * admin, regionLocator, table. Subclasses will have the opportunity to call
+ * {@link #initializeTable(Connection, TableName)}
+ */
+ protected void initialize() {
+
+ }
+
+ /**
+ * Close the Table and related objects that were initialized via
+ * {@link #initializeTable(Connection, TableName)}.
+ *
+ * @throws IOException
+ */
+ protected void closeTable() throws IOException {
+ close(admin, table, regionLocator, connection);
+ admin = null;
+ table = null;
+ regionLocator = null;
+ connection = null;
+ }
+
+ private void close(Closeable... closables) throws IOException {
+ for (Closeable c : closables) {
+ if(c != null) { c.close(); }
+ }
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 107e7b6..80ba472 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -78,15 +78,26 @@ implements Configurable {
/** The configuration. */
private Configuration conf = null;
- private Table table;
- private Connection connection;
-
/**
* Writes the reducer output to an HBase table.
*/
protected class TableRecordWriter
extends RecordWriter {
+ private Connection connection;
+ private Table table;
+
+ /**
+ * @throws IOException
+ *
+ */
+ public TableRecordWriter() throws IOException {
+ String tableName = conf.get(OUTPUT_TABLE);
+ this.connection = ConnectionFactory.createConnection(conf);
+ this.table = connection.getTable(TableName.valueOf(tableName));
+ this.table.setAutoFlushTo(false);
+ LOG.info("Created table instance for " + tableName);
+ }
/**
* Closes the writer, in this case flush table commits.
*
@@ -164,6 +175,7 @@ implements Configurable {
return new TableOutputCommitter();
}
+ @Override
public Configuration getConf() {
return conf;
}
@@ -192,10 +204,6 @@ implements Configurable {
if (zkClientPort != 0) {
this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
- this.connection = ConnectionFactory.createConnection(this.conf);
- this.table = connection.getTable(TableName.valueOf(tableName));
- this.table.setAutoFlushTo(false);
- LOG.info("Created table instance for " + tableName);
} catch(IOException e) {
LOG.error(e);
throw new RuntimeException(e);