diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index 15036ee..45359e3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -28,7 +28,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -85,8 +89,8 @@ public abstract class MultiTableInputFormatBase extends + " previous error. Please look at the previous logs lines from" + " the task's full log for more details."); } - Table table = - new HTable(context.getConfiguration(), tSplit.getTableName()); + Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = connection.getTable(tSplit.getTable()); TableRecordReader trr = this.tableRecordReader; @@ -99,10 +103,11 @@ public abstract class MultiTableInputFormatBase extends sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); - trr.setHTable(table); + trr.setTable(table); } catch (IOException ioe) { // If there is an exception make sure that all // resources are closed and released. + connection.close(); table.close(); trr.close(); throw ioe; @@ -127,31 +132,38 @@ public abstract class MultiTableInputFormatBase extends List splits = new ArrayList(); for (Scan scan : scans) { - byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); - if (tableName == null) + byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); + if (tableNameBytes == null) throw new IOException("A scan object did not have a table name"); - HTable table = null; + TableName tableName = TableName.valueOf(tableNameBytes); + Table table = null; + RegionLocator regionLocator = null; + Connection conn = null; try { - table = new HTable(context.getConfiguration(), tableName); - Pair keys = table.getStartEndKeys(); + conn = ConnectionFactory.createConnection(context.getConfiguration()); + table = conn.getTable(tableName); + regionLocator = conn.getRegionLocator(tableName); + regionLocator = (RegionLocator) table; + Pair keys = regionLocator.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region for table : " - + Bytes.toString(tableName)); + + tableName.getNameAsString()); } int count = 0; byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table); for (int i = 0; i < keys.getFirst().length; i++) { if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } - HRegionLocation hregionLocation = table.getRegionLocation(keys.getFirst()[i], false); + HRegionLocation hregionLocation = regionLocator.getRegionLocation( + keys.getFirst()[i], false); String regionHostname = hregionLocation.getHostname(); HRegionInfo regionInfo = hregionLocation.getRegionInfo(); @@ -181,6 +193,8 @@ public abstract class MultiTableInputFormatBase extends } } finally { if (null != table) table.close(); + if (null != regionLocator) regionLocator.close(); + if (null != conn) conn.close(); } } return splits; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 7416093..1341266 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -33,9 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -48,11 +52,11 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; /** - * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an - * {@link Scan} instance that defines the input columns etc. Subclasses may use + * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, + * an {@link Scan} instance that defines the input columns etc. Subclasses may use * other TableRecordReader implementations. *

* An example of a subclass: @@ -60,10 +64,11 @@ import org.apache.hadoop.util.StringUtils; * class ExampleTIF extends TableInputFormatBase implements JobConfigurable { * * public void configure(JobConf job) { - * HTable exampleTable = new HTable(HBaseConfiguration.create(job), - * Bytes.toBytes("exampleTable")); + * Connection connection = + * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + * TableName tableName = TableName.valueOf("exampleTable"); * // mandatory - * setHTable(exampleTable); + * initializeTable(connection, tableName); * Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"), * Bytes.toBytes("cf2") }; * // mandatory @@ -85,10 +90,14 @@ extends InputFormat { final Log LOG = LogFactory.getLog(TableInputFormatBase.class); - /** Holds the details for the internal scanner. */ + /** Holds the details for the internal scanner. + * + * @see Scan */ private Scan scan = null; - /** The table to scan. */ - private HTable table = null; + /** The {@link Table} to scan. */ + private Table table; + /** The {@link RegionLocator} of the table. */ + private RegionLocator regionLocator; /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; @@ -101,7 +110,7 @@ extends InputFormat { private String nameServer = null; /** - * Builds a TableRecordReader. If no TableRecordReader was provided, uses + * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses * the default. * * @param split The split to work with. @@ -116,13 +125,16 @@ extends InputFormat { public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { - if (table == null) { - throw new IOException("Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."); + if (this.table == null) { + throw new IOException("No table was provided"); + } + if (this.regionLocator == null) { + throw new IOException("No regionLocator was provided"); } TableSplit tSplit = (TableSplit) split; - LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); + LOG.info("Input split length: " + + TraditionalBinaryPrefix.long2String(tSplit.getLength(), "", 1) + + " bytes."); TableRecordReader trr = this.tableRecordReader; // if no table record reader was provided use default if (trr == null) { @@ -132,7 +144,7 @@ extends InputFormat { sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); - trr.setHTable(table); + trr.setTable(table); return trr; } @@ -155,12 +167,12 @@ extends InputFormat { this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator((HTable) table); - Pair keys = table.getStartEndKeys(); + Pair keys = regionLocator.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); if (null == regLoc) { throw new IOException("Expecting at least one region."); } @@ -177,7 +189,7 @@ extends InputFormat { if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { continue; } - HRegionLocation location = table.getRegionLocation(keys.getFirst()[i], false); + HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false); // The below InetSocketAddress creation does a name resolution. InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); if (isa.isUnresolved()) { @@ -233,13 +245,13 @@ extends InputFormat { /** * * - * Test if the given region is to be included in the InputSplit while splitting + * Test if the given region is to be included in the {@link InputSplit} while splitting * the regions of a table. *

* This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, * (and hence, not contributing to the InputSplit), given the start and end keys of the same.
* Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, - * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. + * continuously. In addition to reducing {@link InputSplit}s, reduces the load on the region server as well, due to the ordering of the keys. *
*
* Note: It is possible that endKey.length() == 0 , for the last (recent) region. @@ -258,24 +270,61 @@ extends InputFormat { /** * Allows subclasses to get the {@link HTable}. + * + * @deprecated Use {@link #getTable()} and {@link #getRegionLocator()} instead. */ + @Deprecated protected HTable getHTable() { - return this.table; + return (HTable) this.table; } /** * Allows subclasses to set the {@link HTable}. - * - * @param table The table to get the data from. + * + * @param table The {@link HTable} to get the data from. + * @deprecated Use {@link #initializeTable(Connection, TableName)} instead. */ + @Deprecated protected void setHTable(HTable table) { this.table = table; + this.regionLocator = table; + } + + /** + * Initializes the internal {@link Table} and {@link RegionLocator}. + * + * @param connection The externally managed {@link Connection}. + * @param tableName The {@link TableName} of the {@link Table} to operate on. + * + * @throws IOException when the {@link Table} cannot be created. + */ + protected void initializeTable(Connection connection, TableName tableName) throws IOException { + this.table = connection.getTable(tableName); + this.regionLocator = connection.getRegionLocator(tableName); + } + + /** + * Allows subclasses to get the {@link Table}. + * + * @return The internal {@link Table} instance. + */ + protected Table getTable() { + return table; + } + + /** + * Allows subclasses to get the {@link RegionLocator}. + * + * @return The internal {@link RegionLocator} instance. + */ + protected RegionLocator getRegionLocator() { + return regionLocator; } /** - * Gets the scan defining the actual details like columns etc. + * Gets the {@link Scan} defining the actual details like columns etc. * - * @return The internal scan instance. + * @return The internal {@link Scan} instance. */ public Scan getScan() { if (this.scan == null) this.scan = new Scan(); @@ -283,9 +332,9 @@ extends InputFormat { } /** - * Sets the scan defining the actual details like columns etc. + * Sets the {@link Scan} defining the actual details like columns etc. * - * @param scan The scan to set. + * @param scan The {@link Scan} to set. */ public void setScan(Scan scan) { this.scan = scan; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java index 825729d..a29103b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -57,9 +56,18 @@ extends RecordReader { * Sets the HBase table. * * @param htable The {@link HTable} to scan. + * @deprecated Use setTable() instead. */ + @Deprecated public void setHTable(Table htable) { - this.recordReaderImpl.setHTable(htable); + this.setTable(htable); + } + + /** + * @param table the {@link Table} to scan. + */ + public void setTable(Table table) { + this.recordReaderImpl.setHTable(table); } /**