diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a1650c1..af4e4da 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1557,6 +1557,14 @@ public class HTable implements HTableInterface, RegionLocator {
}
/**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setClearBufferOnFail(boolean clearBufferOnFail) {
+ this.clearBufferOnFail = clearBufferOnFail;
+ }
+
+ /**
* Returns the maximum size in bytes of the write buffer for this HTable.
*
* The default value comes from the configuration parameter
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index 888fd93..f1f647e 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public abstract class RegionServerCallable implements RetryingCallable {
// Public because used outside of this package over in ipc.
static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
- protected final HConnection connection;
+ protected final Connection connection;
protected final TableName tableName;
protected final byte[] row;
protected HRegionLocation location;
@@ -61,7 +61,7 @@ public abstract class RegionServerCallable implements RetryingCallable {
* @param tableName Table name to which row belongs.
* @param row The row we want in tableName.
*/
- public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) {
+ public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
this.connection = connection;
this.tableName = tableName;
this.row = row;
@@ -75,7 +75,7 @@ public abstract class RegionServerCallable implements RetryingCallable {
*/
@Override
public void prepare(final boolean reload) throws IOException {
- this.location = connection.getRegionLocation(tableName, row, reload);
+ this.location = connection.getRegionLocator(tableName).getRegionLocation(row, reload);
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
@@ -87,7 +87,7 @@ public abstract class RegionServerCallable implements RetryingCallable {
* @return {@link HConnection} instance used by this Callable.
*/
HConnection getConnection() {
- return this.connection;
+ return (HConnection) this.connection;
}
protected ClientService.BlockingInterface getStub() {
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 07e4c08..10ee831 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -522,6 +522,14 @@ public interface Table extends Closeable {
void setAutoFlushTo(boolean autoFlush);
/**
+ * Set clearBufferOnFail to true to erase the buffer after
+ * {@link #flushCommits()} has been called, regardless of success.
+ *
+ * @param clearBufferOnFail
+ */
+ void setClearBufferOnFail(boolean clearBufferOnFail);
+
+ /**
* Returns the maximum size in bytes of the write buffer for this HTable.
*
* The default value comes from the configuration parameter
diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index 9dac95b..b59a1a7 100644
--- hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -45,10 +45,13 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -246,47 +249,52 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
EnvironmentEdgeManager.currentTime();
Configuration conf = new Configuration(util.getConfiguration());
Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
- HTable table = new HTable(conf, getTablename());
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(getTablename());
+ RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
+ conf.setBoolean("mapreduce.map.speculative", false);
+ conf.setBoolean("mapreduce.reduce.speculative", false);
+ conf.setInt(ROUND_NUM_KEY, iteration);
- conf.setBoolean("mapreduce.map.speculative", false);
- conf.setBoolean("mapreduce.reduce.speculative", false);
- conf.setInt(ROUND_NUM_KEY, iteration);
+ Job job = new Job(conf);
- Job job = new Job(conf);
+ job.setJobName(jobName);
- job.setJobName(jobName);
+ // set the input format so that we can create map tasks with no data input.
+ job.setInputFormatClass(ITBulkLoadInputFormat.class);
- // set the input format so that we can create map tasks with no data input.
- job.setInputFormatClass(ITBulkLoadInputFormat.class);
+ // Set the mapper classes.
+ job.setMapperClass(LinkedListCreationMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
- // Set the mapper classes.
- job.setMapperClass(LinkedListCreationMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
+ // Use the identity reducer
+ // So nothing to do here.
- // Use the identity reducer
- // So nothing to do here.
+ // Set this jar.
+ job.setJarByClass(getClass());
- // Set this jar.
- job.setJarByClass(getClass());
+ // Set where to place the hfiles.
+ FileOutputFormat.setOutputPath(job, p);
- // Set where to place the hfiles.
- FileOutputFormat.setOutputPath(job, p);
+ // Configure the partitioner and other things needed for HFileOutputFormat.
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
- // Configure the partitioner and other things needed for HFileOutputFormat.
- HFileOutputFormat.configureIncrementalLoad(job, table);
+ // Run the job making sure it works.
+ assertEquals(true, job.waitForCompletion(true));
- // Run the job making sure it works.
- assertEquals(true, job.waitForCompletion(true));
+ // Create a new loader.
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- // Create a new loader.
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ // Load the HFiles in.
+ loader.doBulkLoad(p, conn, table);
+
+ // Delete the files.
+ util.getTestFileSystem().delete(p, true);
+ }
- // Load the HFiles in.
- loader.doBulkLoad(p, table);
- // Delete the files.
- util.getTestFileSystem().delete(p, true);
}
public static class EmptySplit extends InputSplit implements Writable {
diff --git hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index 65bf509..e482c4a 100644
--- hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -821,6 +821,11 @@ public class RemoteHTable implements HTableInterface {
}
@Override
+ public void setClearBufferOnFail(boolean clearBufferOnFail) {
+ throw new UnsupportedOperationException("setClearBufferOnFail not implemented");
+ }
+
+ @Override
public long getWriteBufferSize() {
throw new UnsupportedOperationException("getWriteBufferSize not implemented");
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
index afc8a09..f727136 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
@@ -308,6 +308,11 @@ public class HTableWrapper implements HTableInterface {
}
@Override
+ public void setClearBufferOnFail(boolean clearBufferOnFail) {
+ table.setClearBufferOnFail(clearBufferOnFail);
+ }
+
+ @Override
public long getWriteBufferSize() {
return table.getWriteBufferSize();
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
index 150bb25..60d1ad5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
@@ -32,7 +32,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -66,44 +67,27 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression
@Override
public void init() {
// Reading all the labels and ordinal.
- // This scan should be done by user with global_admin previliges.. Ensure that it works
- Table labelsTable = null;
- try {
- labelsTable = new HTable(conf, LABELS_TABLE_NAME);
- } catch (TableNotFoundException e) {
- // Just return with out doing any thing. When the VC is not used we wont be having 'labels'
- // table in the cluster.
- return;
- } catch (IOException e) {
- LOG.error("Error opening 'labels' table", e);
- return;
- }
+ // This scan should be done by user with global_admin previliges.. Ensure
+ // that it works
Scan scan = new Scan();
scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
- ResultScanner scanner = null;
- try {
- scanner = labelsTable.getScanner(scan);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table labelsTable = conn.getTable(LABELS_TABLE_NAME);
+ ResultScanner scanner = labelsTable.getScanner(scan)) {
Result next = null;
while ((next = scanner.next()) != null) {
byte[] row = next.getRow();
byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
labels.put(Bytes.toString(value), Bytes.toInt(row));
}
+ } catch (TableNotFoundException e) {
+ // Just return with out doing any thing. When the VC is not used we wont be having 'labels'
+ // table in the cluster.
+ return;
} catch (IOException e) {
- LOG.error("Error reading 'labels' table", e);
- } finally {
- try {
- if (scanner != null) {
- scanner.close();
- }
- } finally {
- try {
- labelsTable.close();
- } catch (IOException e) {
- LOG.warn("Error on closing 'labels' table", e);
- }
- }
+ LOG.error("Error opening 'labels' table", e);
+ return;
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
index c10359e..ef1c8bb 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -83,10 +84,34 @@ public class HFileOutputFormat extends FileOutputFormat
* The user should be sure to set the map output value class to either KeyValue or Put before
* running this function.
+ *
+ * @deprecated Use {{@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
*/
+ @Deprecated
public static void configureIncrementalLoad(Job job, HTable table)
throws IOException {
- HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, table, HFileOutputFormat.class);
+ }
+
+
+ /**
+ * Configure a MapReduce Job to perform an incremental load into the given
+ * table. This
+ *
+ * - Inspects the table to configure a total order partitioner
+ * - Uploads the partitions file to the cluster and adds it to the DistributedCache
+ * - Sets the number of reduce tasks to match the current number of regions
+ * - Sets the output key/value class to match HFileOutputFormat's requirements
+ * - Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+ * PutSortReducer)
+ *
+ * The user should be sure to set the map output value class to either KeyValue or Put before
+ * running this function.
+ */
+ public static void configureIncrementalLoad(Job job, Table table,
+ RegionLocator regionLocator) throws IOException {
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 2c0efc8..42ddf98 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -357,10 +357,10 @@ public class HFileOutputFormat2
*/
public static void configureIncrementalLoad(Job job, HTable table)
throws IOException {
- configureIncrementalLoad(job, table, HFileOutputFormat2.class);
+ configureIncrementalLoad(job, table, table, HFileOutputFormat2.class);
}
- static void configureIncrementalLoad(Job job, HTable table,
+ static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator,
Class extends OutputFormat, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
@@ -386,8 +386,8 @@ public class HFileOutputFormat2
KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points.
- LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
- List startKeys = getRegionStartKeys(table);
+ LOG.info("Looking up current regions for table " + table.getName());
+ List startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
job.setNumReduceTasks(startKeys.size());
@@ -401,8 +401,7 @@ public class HFileOutputFormat2
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + Bytes.toString(table.getTableName())
- + " output configured.");
+ LOG.info("Incremental table " + table.getName() + " output configured.");
}
public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
index f88d959..3334c3b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -125,7 +126,8 @@ implements Configurable {
try {
TableName tableName = TableName.valueOf(configuration
.get(TableOutputFormat.OUTPUT_TABLE));
- this.table = new HTable(this.conf, tableName);
+ Connection conn = ConnectionFactory.createConnection(this.conf);
+ this.table = conn.getRegionLocator(tableName);
} catch (IOException e) {
LOG.error(e);
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
index 1033dac..02746c5 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -42,13 +42,16 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -438,15 +441,19 @@ public class Import extends Configured implements Tool {
if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
- HTable table = new HTable(conf, tableName);
- job.setReducerClass(KeyValueSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- HFileOutputFormat.configureIncrementalLoad(job, table);
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- com.google.common.base.Preconditions.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ com.google.common.base.Preconditions.class);
+ }
} else {
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
@@ -497,19 +504,14 @@ public class Import extends Configured implements Tool {
*/
public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
InterruptedException {
- String tableName = conf.get(TABLE_NAME);
- HBaseAdmin hAdmin = null;
+ TableName tableName = TableName.valueOf(conf.get(TABLE_NAME));
String durability = conf.get(WAL_DURABILITY);
// Need to flush if the data is written to hbase and skip wal is enabled.
if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
&& Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
- try {
- hAdmin = new HBaseAdmin(conf);
- hAdmin.flush(tableName);
- } finally {
- if (hAdmin != null) {
- hAdmin.close();
- }
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Admin admin = conn.getAdmin()) {
+ admin.flush(tableName);
}
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 5b375e9..6caa07c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
@@ -402,81 +404,86 @@ public class ImportTsv extends Configured implements Tool {
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException, ClassNotFoundException {
- HBaseAdmin admin = new HBaseAdmin(conf);
- // Support non-XML supported characters
- // by re-encoding the passed separator as a Base64 string.
- String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
- if (actualSeparator != null) {
- conf.set(SEPARATOR_CONF_KEY,
- Base64.encodeBytes(actualSeparator.getBytes()));
- }
-
- // See if a non-default Mapper was set
- String mapperClassName = conf.get(MAPPER_CONF_KEY);
- Class mapperClass = mapperClassName != null ?
- Class.forName(mapperClassName) : DEFAULT_MAPPER;
-
- TableName tableName = TableName.valueOf(args[0]);
- Path inputDir = new Path(args[1]);
- String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
- Job job = Job.getInstance(conf, jobName);
- job.setJarByClass(mapperClass);
- FileInputFormat.setInputPaths(job, inputDir);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapperClass(mapperClass);
- String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
- String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
- if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
- String fileLoc = conf.get(CREDENTIALS_LOCATION);
- Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
- job.getCredentials().addAll(cred);
- }
-
- if (hfileOutPath != null) {
- if (!admin.tableExists(tableName)) {
- String errorMsg = format("Table '%s' does not exist.", tableName);
- if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
- LOG.warn(errorMsg);
- // TODO: this is backwards. Instead of depending on the existence of a table,
- // create a sane splits file for HFileOutputFormat based on data sampling.
- createTable(admin, tableName, columns);
- } else {
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Admin admin = conn.getAdmin()) {
+ // Support non-XML supported characters
+ // by re-encoding the passed separator as a Base64 string.
+ String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
+ if (actualSeparator != null) {
+ conf.set(SEPARATOR_CONF_KEY,
+ Base64.encodeBytes(actualSeparator.getBytes()));
+ }
+
+ // See if a non-default Mapper was set
+ String mapperClassName = conf.get(MAPPER_CONF_KEY);
+ Class mapperClass = mapperClassName != null ?
+ Class.forName(mapperClassName) : DEFAULT_MAPPER;
+
+ TableName tableName = TableName.valueOf(args[0]);
+ Path inputDir = new Path(args[1]);
+ String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
+ Job job = Job.getInstance(conf, jobName);
+ job.setJarByClass(mapperClass);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(mapperClass);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
+ if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+ String fileLoc = conf.get(CREDENTIALS_LOCATION);
+ Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+ job.getCredentials().addAll(cred);
+ }
+
+ if (hfileOutPath != null) {
+ if (!admin.tableExists(tableName)) {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
+ LOG.warn(errorMsg);
+ // TODO: this is backwards. Instead of depending on the existence of a table,
+ // create a sane splits file for HFileOutputFormat based on data sampling.
+ createTable(admin, tableName, columns);
+ } else {
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ }
+ try (Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ job.setReducerClass(PutSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ if (mapperClass.equals(TsvImporterTextMapper.class)) {
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(TextSortReducer.class);
+ } else {
+ job.setMapOutputValueClass(Put.class);
+ job.setCombinerClass(PutCombiner.class);
+ }
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
}
- }
- HTable table = new HTable(conf, tableName);
- job.setReducerClass(PutSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- if (mapperClass.equals(TsvImporterTextMapper.class)) {
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(TextSortReducer.class);
} else {
- job.setMapOutputValueClass(Put.class);
- job.setCombinerClass(PutCombiner.class);
- }
- HFileOutputFormat.configureIncrementalLoad(job, table);
- } else {
- if (mapperClass.equals(TsvImporterTextMapper.class)) {
- usage(TsvImporterTextMapper.class.toString()
- + " should not be used for non bulkloading case. use "
- + TsvImporterMapper.class.toString()
- + " or custom mapper whose value type is Put.");
- System.exit(-1);
- }
- // No reducers. Just write straight to table. Call initTableReducerJob
- // to set up the TableOutputFormat.
- TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
- job);
- job.setNumReduceTasks(0);
- }
+ if (mapperClass.equals(TsvImporterTextMapper.class)) {
+ usage(TsvImporterTextMapper.class.toString()
+ + " should not be used for non bulkloading case. use "
+ + TsvImporterMapper.class.toString()
+ + " or custom mapper whose value type is Put.");
+ System.exit(-1);
+ }
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // to set up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
+ job);
+ job.setNumReduceTasks(0);
+ }
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- com.google.common.base.Function.class /* Guava used by TsvParser */);
- return job;
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ com.google.common.base.Function.class /* Guava used by TsvParser */);
+ return job;
+ }
}
private static void createTable(Admin admin, TableName tableName, String[] columns)
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 855417d..b276188 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -65,9 +65,10 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Table;
@@ -110,7 +111,6 @@ import java.util.UUID;
@InterfaceStability.Stable
public class LoadIncrementalHFiles extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
- private Admin hbAdmin;
public static final String NAME = "completebulkload";
public static final String MAX_FILES_PER_REGION_PER_FAMILY
@@ -127,6 +127,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private FsDelegationToken fsDelegationToken;
private String bulkToken;
private UserProvider userProvider;
+ private boolean initialized = false;
private LoadIncrementalHFiles() {}
@@ -136,17 +137,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
private void initialize() throws Exception {
- if (hbAdmin == null) {
+ if (!initialized) {
// make a copy, just to be sure we're not overriding someone else's config
setConf(HBaseConfiguration.create(getConf()));
Configuration conf = getConf();
// disable blockcache for tool invocation, see HBASE-10500
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
- this.hbAdmin = new HBaseAdmin(conf);
this.userProvider = UserProvider.instantiate(conf);
this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+ initialized = true;
}
}
@@ -232,17 +233,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* of a job using HFileOutputFormat
* @param table the table to load into
* @throws TableNotFoundException if table does not yet exist
+ * @deprecated Use {@link #doBulkLoad(Path, Connection, Table)} instead.
*/
- @SuppressWarnings("deprecation")
public void doBulkLoad(Path hfofDir, final HTable table)
+ throws TableNotFoundException, IOException {
+ doBulkLoad(hfofDir, table.getConnection(), table);
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given
+ * pre-existing table. This method is not threadsafe.
+ *
+ * @param hfofDir the directory that was provided as the output path
+ * of a job using HFileOutputFormat
+ * @param table the table to load into
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public void doBulkLoad(Path hfofDir, final Connection conn, final Table table)
throws TableNotFoundException, IOException
{
- final HConnection conn = table.getConnection();
-
- if (!conn.isTableAvailable(table.getName())) {
- throw new TableNotFoundException("Table " +
- Bytes.toStringBinary(table.getTableName()) +
- "is not currently available.");
+ if (!conn.getAdmin().isTableAvailable(table.getName())) {
+ throw new TableNotFoundException("Table " + table.getName()
+ + "is not currently available.");
}
// initialize thread pools
@@ -278,7 +290,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String msg =
"Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+ unmatchedFamilies + "; valid family names of table "
- + Bytes.toString(table.getTableName()) + " are: " + familyNames;
+ + table.getName() + " are: " + familyNames;
LOG.error(msg);
throw new IOException(msg);
}
@@ -300,9 +312,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
// Assumes that region splits can happen while this occurs.
+ RegionLocator regionLocator = conn.getRegionLocator(table.getName());
while (!queue.isEmpty()) {
// need to reload split keys each iteration.
- final Pair startEndKeys = table.getStartEndKeys();
+ final Pair startEndKeys = regionLocator.getStartEndKeys();
if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " +
+ count + " with " + queue.size() + " files remaining to group or split");
@@ -361,7 +374,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* them. Any failures are re-queued for another pass with the
* groupOrSplitPhase.
*/
- protected void bulkLoadPhase(final Table table, final HConnection conn,
+ protected void bulkLoadPhase(final Table table, final Connection conn,
ExecutorService pool, Deque queue,
final Multimap regionGroups) throws IOException {
// atomically bulk load the groups.
@@ -433,7 +446,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @return A Multimap that groups LQI by likely
* bulk load region targets.
*/
- private Multimap groupOrSplitPhase(final HTable table,
+ private Multimap groupOrSplitPhase(final Table table,
ExecutorService pool, Deque queue,
final Pair startEndKeys) throws IOException {
// need synchronized only within this scope of this
@@ -526,7 +539,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @throws IOException
*/
protected List groupOrSplit(Multimap regionGroups,
- final LoadQueueItem item, final HTable table,
+ final LoadQueueItem item, final Table table,
final Pair startEndKeys)
throws IOException {
final Path hfilePath = item.hfilePath;
@@ -571,18 +584,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
if (indexForCallable < 0) {
throw new IOException("The first region info for table "
- + Bytes.toString(table.getTableName())
+ + table.getName()
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
} else if ((indexForCallable == startEndKeys.getFirst().length - 1)
&& !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
throw new IOException("The last region info for table "
- + Bytes.toString(table.getTableName())
+ + table.getName()
+ " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
} else if (indexForCallable + 1 < startEndKeys.getFirst().length
&& !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
throw new IOException("The endkey of one region for table "
- + Bytes.toString(table.getTableName())
+ + table.getName()
+ " is not equal to the startkey of the next region in hbase:meta."
+ "Please use hbck tool to fix it first.");
}
@@ -603,10 +616,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
- * @deprecated Use {@link #tryAtomicRegionLoad(HConnection, TableName, byte[], Collection)}
+ * @deprecated Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}
*/
@Deprecated
- protected List tryAtomicRegionLoad(final HConnection conn,
+ protected List tryAtomicRegionLoad(final Connection conn,
final byte [] tableName, final byte[] first, Collection lqis)
throws IOException {
return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
@@ -625,7 +638,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @return empty list if success, list of items to retry on recoverable
* failure
*/
- protected List tryAtomicRegionLoad(final HConnection conn,
+ protected List tryAtomicRegionLoad(final Connection conn,
final TableName tableName, final byte[] first, Collection lqis)
throws IOException {
final List> famPaths =
@@ -648,10 +661,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
if (!isSecureBulkLoadEndpointAvailable()) {
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
} else {
- Table table = new HTable(conn.getConfiguration(), getTableName());
- secureClient = new SecureBulkLoadClient(table);
- success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
- bulkToken, getLocation().getRegionInfo().getStartKey());
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(table);
+ success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
+ bulkToken, getLocation().getRegionInfo().getStartKey());
+ }
}
return success;
} finally {
@@ -785,7 +799,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
private boolean doesTableExist(TableName tableName) throws Exception {
- return hbAdmin.tableExists(tableName);
+ try (Connection conn = ConnectionFactory.createConnection(getConf());
+ Admin admin = conn.getAdmin()) {
+ return admin.tableExists(tableName);
+ }
}
/*
@@ -892,7 +909,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
keys = LoadIncrementalHFiles.inferBoundaries(map);
- this.hbAdmin.createTable(htd,keys);
+ try (Connection conn = ConnectionFactory.createConnection(getConf());
+ Admin admin = conn.getAdmin()) {
+ admin.createTable(htd,keys);
+ }
LOG.info("Table "+ tableName +" is available!!");
}
@@ -921,9 +941,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
Path hfofDir = new Path(dirPath);
- HTable table = new HTable(getConf(), tableName);
- doBulkLoad(hfofDir, table);
+ try (Connection conn = ConnectionFactory.createConnection(getConf());
+ Table table = conn.getTable(tableName)) {
+ doBulkLoad(hfofDir, conn, table);
+ }
+
return 0;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
index 6a66d37..6d0afae 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
@@ -29,10 +29,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
index c1d8373..ee86f95 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
@@ -29,13 +29,16 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -73,7 +76,7 @@ public class MultiTableOutputFormat extends OutputFormat {
private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
- Map tables;
+ Map> tables;
Configuration conf;
boolean useWriteAheadLogging;
@@ -88,7 +91,7 @@ public class MultiTableOutputFormat extends OutputFormat();
+ this.tables = new HashMap<>();
this.conf = conf;
this.useWriteAheadLogging = useWriteAheadLogging;
}
@@ -100,21 +103,26 @@ public class MultiTableOutputFormat extends OutputFormat(conn, table));
}
- return tables.get(tableName);
+ return tables.get(tableName).getSecond();
}
@Override
public void close(TaskAttemptContext context) throws IOException {
- for (HTable table : tables.values()) {
+ for (Pair pair : tables.values()) {
+ Table table = pair.getSecond();
table.flushCommits();
+ table.close();
+ pair.getFirst().close();
}
+ tables.clear();
}
/**
@@ -129,7 +137,7 @@ public class MultiTableOutputFormat extends OutputFormat The type of the key.
*/
- protected static class TableRecordWriter
+ protected class TableRecordWriter
extends RecordWriter {
/** The table to write to. */
- private Table table;
+// private Table table;
/**
* Instantiate a TableRecordWriter with the HBase HClient for writing.
*
* @param table The table to write to.
*/
- public TableRecordWriter(Table table) {
- this.table = table;
+ public TableRecordWriter() {
}
/**
@@ -112,6 +113,7 @@ implements Configurable {
public void close(TaskAttemptContext context)
throws IOException {
table.close();
+ connection.close();
}
/**
@@ -125,8 +127,8 @@ implements Configurable {
@Override
public void write(KEY key, Mutation value)
throws IOException {
- if (value instanceof Put) this.table.put(new Put((Put)value));
- else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
+ if (value instanceof Put) table.put(new Put((Put)value));
+ else if (value instanceof Delete) table.delete(new Delete((Delete)value));
else throw new IOException("Pass a Delete or a Put");
}
}
@@ -144,7 +146,7 @@ implements Configurable {
public RecordWriter getRecordWriter(
TaskAttemptContext context)
throws IOException, InterruptedException {
- return new TableRecordWriter(this.table);
+ return new TableRecordWriter();
}
/**
@@ -205,8 +207,9 @@ implements Configurable {
if (zkClientPort != 0) {
this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
}
- this.table = new HTable(this.conf, TableName.valueOf(tableName));
- this.table.setAutoFlush(false, true);
+ this.connection = ConnectionFactory.createConnection(otherConf);
+ this.table = connection.getTable(TableName.valueOf(tableName));
+ this.table.setClearBufferOnFail(true);
LOG.info("Created table instance for " + tableName);
} catch(IOException e) {
LOG.error(e);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index cf9dc56..5ee8cdb 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -34,10 +34,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -247,13 +251,18 @@ public class WALPlayer extends Configured implements Tool {
if (tables.length != 1) {
throw new IOException("Exactly one table must be specified for the bulk export option");
}
- HTable table = new HTable(conf, TableName.valueOf(tables[0]));
- job.setMapperClass(HLogKeyValueMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputValueClass(KeyValue.class);
- HFileOutputFormat.configureIncrementalLoad(job, table);
+ TableName tableName = TableName.valueOf(tables[0]);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ job.setMapperClass(HLogKeyValueMapper.class);
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputValueClass(KeyValue.class);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
+ }
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Preconditions.class);
} else {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index c091312..72f7885 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -25,10 +25,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HConnectable;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -87,6 +85,7 @@ public class VerifyReplication extends Configured implements Tool {
public static enum Counters {
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
+ private Connection replicatedScannerConnection;
private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable;
@@ -121,22 +120,19 @@ public class VerifyReplication extends Configured implements Tool {
}
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
- HConnectionManager.execute(new HConnectable(conf) {
- @Override
- public Void connect(HConnection conn) throws IOException {
- String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
- Configuration peerConf = HBaseConfiguration.create(conf);
- ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
-
- TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
- // TODO: THis HTable doesn't get closed. Fix!
- Table replicatedTable = new HTable(peerConf, tableName);
- scan.setStartRow(value.getRow());
- scan.setStopRow(tableSplit.getEndRow());
- replicatedScanner = replicatedTable.getScanner(scan);
- return null;
- }
- });
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
+ Configuration peerConf = HBaseConfiguration.create(conf);
+ ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+
+ TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+ // TODO: This connection get closed. Fix!
+ replicatedScannerConnection = ConnectionFactory.createConnection(conf);
+ Table replicatedTable = replicatedScannerConnection.getTable(tableName);
+ scan.setStartRow(value.getRow());
+ scan.setStopRow(tableSplit.getEndRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
+ }
currentCompareRowInPeerTable = replicatedScanner.next();
}
while (true) {
@@ -191,6 +187,14 @@ public class VerifyReplication extends Configured implements Tool {
replicatedScanner = null;
}
}
+ if (replicatedScannerConnection != null) {
+ try {
+ replicatedScannerConnection.close();
+ } catch (IOException e) {
+ LOG.error("fail to close peer table connection in cleanup", e);
+ replicatedScannerConnection = null;
+ }
+ }
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index bbd9dbe..4e885a7 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -280,6 +280,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* be wholesome. Rather than use the return direct, its usually best to
* make a copy and use that. Do
* Configuration c = new Configuration(INSTANCE.getConfiguration());
+ *
+ * Alternatively, create a connection using {@link #createConnection()}
* @return Instance of Configuration.
*/
@Override
@@ -287,6 +289,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return super.getConfiguration();
}
+ public Connection createConnection() throws IOException {
+ return ConnectionFactory.createConnection(getConfiguration());
+ }
+
public void setHBaseCluster(HBaseCluster hbaseCluster) {
this.hbaseCluster = hbaseCluster;
}
@@ -1183,7 +1189,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
/**
- * Create a table.
+ * Create a table and wait until it's available.
+ *
* @param htd
* @param families
* @param c Configuration to use
@@ -1192,6 +1199,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/
public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
throws IOException {
+ initializeTable(htd, families);
+ // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+ waitUntilAllRegionsAssigned(htd.getTableName());
+ return new HTable(c, htd.getTableName());
+ }
+
+ /**
+ * Initialiate the creation of the table in the descriptor, and don't wait for its completion.
+ */
+ public void initializeTable(HTableDescriptor htd, byte[][] families)
+ throws IOException {
for(byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
// Disable blooms (they are on by default as of 0.95) but we disable them here because
@@ -1201,9 +1219,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
htd.addFamily(hcd);
}
getHBaseAdmin().createTable(htd);
+ }
+
+
+ /**
+ * Create a table and wait until it's available.
+ *
+ * @param htd
+ * @param families
+ * @param conn
+ * @return a reference to the table.
+ * @throws IOException
+ */
+ public Table createTableAndWait(HTableDescriptor htd, byte[][] families, Connection conn) throws IOException {
+ initializeTable(htd, families);
// HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
waitUntilAllRegionsAssigned(htd.getTableName());
- return new HTable(c, htd.getTableName());
+ return conn.getTable( htd.getTableName());
}
/**
@@ -1232,15 +1264,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
final Configuration c)
throws IOException {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- for(byte[] family : families) {
- HColumnDescriptor hcd = new HColumnDescriptor(family);
- // Disable blooms (they are on by default as of 0.95) but we disable them here because
- // tests have hard coded counts of what to expect in block cache, etc., and blooms being
- // on is interfering.
- hcd.setBloomFilterType(BloomType.NONE);
- desc.addFamily(hcd);
- }
- getHBaseAdmin().createTable(desc);
+ initializeTable(desc, families);
return new HTable(c, desc.getTableName());
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 4890edd..344ac05 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -577,7 +579,10 @@ public class TestRegionObserverInterface {
createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
//Bulk load
- new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table newTable = conn.getTable(tableName)) {
+ new LoadIncrementalHFiles(conf).doBulkLoad(dir, conn, newTable);
+ }
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index 9bdebe6..0327c3a 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -371,21 +373,21 @@ public class TestHFileOutputFormat {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
byte[][] startKeys = generateRandomStartKeys(5);
- HBaseAdmin admin = null;
- try {
+ try (Connection conn = util.createConnection();
+ Admin admin = conn.getAdmin();
+ Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn);
+ RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME)){
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
- admin = new HBaseAdmin(conf);
- HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table",
0, util.countRows(table));
int numRegions = util.createMultiRegions(
- util.getConfiguration(), table, FAMILIES[0], startKeys);
+ util.getConfiguration(), (HTable) table, FAMILIES[0], startKeys);
assertEquals("Should make 5 regions", numRegions, 5);
// Generate the bulk load files
util.startMiniMapReduceCluster();
- runIncrementalPELoad(conf, table, testDir);
+ runIncrementalPELoad(conf, table, regionLocator, testDir);
// This doesn't write into the table, just makes files
assertEquals("HFOF should not touch actual table",
0, util.countRows(table));
@@ -405,7 +407,7 @@ public class TestHFileOutputFormat {
// handle the split case
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
- admin.disableTable(table.getTableName());
+ admin.disableTable(table.getName());
while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
getRegionStates().isRegionsInTransition()) {
Threads.sleep(200);
@@ -413,17 +415,17 @@ public class TestHFileOutputFormat {
}
byte[][] newStartKeys = generateRandomStartKeys(15);
util.createMultiRegions(
- util.getConfiguration(), table, FAMILIES[0], newStartKeys);
- admin.enableTable(table.getTableName());
- while (table.getRegionLocations().size() != 15 ||
- !admin.isTableAvailable(table.getTableName())) {
+ util.getConfiguration(), (HTable) table, FAMILIES[0], newStartKeys);
+ admin.enableTable(table.getName());
+ while (regionLocator.getAllRegionLocations().size() != 15 ||
+ !admin.isTableAvailable(table.getName())) {
Thread.sleep(200);
LOG.info("Waiting for new region assignment to happen");
}
}
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+ new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table);
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
@@ -453,27 +455,26 @@ public class TestHFileOutputFormat {
assertEquals("Data should remain after reopening of regions",
tableDigestBefore, util.checksumRows(table));
} finally {
- if (admin != null) admin.close();
util.shutdownMiniMapReduceCluster();
util.shutdownMiniCluster();
}
}
- private void runIncrementalPELoad(
- Configuration conf, HTable table, Path outDir)
- throws Exception {
+ private void runIncrementalPELoad(Configuration conf, Table table,
+ RegionLocator regionLocator, Path outDir) throws Exception {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
setupRandomGeneratorMapper(job);
- HFileOutputFormat.configureIncrementalLoad(job, table);
+ HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator,
+ HFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, outDir);
Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
- assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks());
+ assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
assertTrue(job.waitForCompletion(true));
}
@@ -792,7 +793,8 @@ public class TestHFileOutputFormat {
Path dir = util.getDataTestDir("testColumnFamilySettings");
// Setup table descriptor
- HTable table = Mockito.mock(HTable.class);
+ Table table = Mockito.mock(Table.class);
+ RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
Mockito.doReturn(htd).when(table).getTableDescriptor();
for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
@@ -800,7 +802,7 @@ public class TestHFileOutputFormat {
}
// set up the table to return some mock keys
- setupMockStartKeys(table);
+ setupMockStartKeys(regionLocator);
try {
// partial map red setup to get an operational writer for testing
@@ -810,7 +812,7 @@ public class TestHFileOutputFormat {
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job);
- HFileOutputFormat.configureIncrementalLoad(job, table);
+ HFileOutputFormat.configureIncrementalLoad(job, table, regionLocator);
FileOutputFormat.setOutputPath(job, dir);
context = createTestTaskAttemptContext(job);
HFileOutputFormat hof = new HFileOutputFormat();
@@ -891,11 +893,12 @@ public class TestHFileOutputFormat {
conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5);
- try {
+ try (Connection conn = util.createConnection();
+ Admin admin = conn.getAdmin()){
util.startMiniCluster();
final FileSystem fs = util.getDFSCluster().getFileSystem();
- HBaseAdmin admin = new HBaseAdmin(conf);
- HTable table = util.createTable(TABLE_NAME, FAMILIES);
+ Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn);
+ RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
@@ -912,9 +915,9 @@ public class TestHFileOutputFormat {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
- runIncrementalPELoad(conf, table, testDir);
+ runIncrementalPELoad(conf, table, regionLocator, testDir);
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+ new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table);
}
// Ensure data shows up
@@ -926,7 +929,7 @@ public class TestHFileOutputFormat {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
- admin.compact(TABLE_NAME.getName());
+ admin.compact(TABLE_NAME);
try {
quickPoll(new Callable() {
public Boolean call() throws Exception {
@@ -939,7 +942,7 @@ public class TestHFileOutputFormat {
}
// a major compaction should work though
- admin.majorCompact(TABLE_NAME.getName());
+ admin.majorCompact(TABLE_NAME);
quickPoll(new Callable() {
public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1;
@@ -958,12 +961,14 @@ public class TestHFileOutputFormat {
conf.setInt("hbase.hstore.compaction.min", 2);
generateRandomStartKeys(5);
- try {
+ try (Connection conn = util.createConnection();
+ Admin admin = conn.getAdmin();
+ Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn);
+ RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME)){
util.startMiniCluster();
Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
final FileSystem fs = util.getDFSCluster().getFileSystem();
- HBaseAdmin admin = new HBaseAdmin(conf);
- HTable table = util.createTable(TABLE_NAME, FAMILIES);
+
assertEquals("Should start with empty table", 0, util.countRows(table));
// deep inspection: get the StoreFile dir
@@ -977,7 +982,7 @@ public class TestHFileOutputFormat {
Put p = new Put(Bytes.toBytes("test"));
p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
table.put(p);
- admin.flush(TABLE_NAME.getName());
+ admin.flush(TABLE_NAME);
assertEquals(1, util.countRows(table));
quickPoll(new Callable() {
public Boolean call() throws Exception {
@@ -989,10 +994,10 @@ public class TestHFileOutputFormat {
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
true);
util.startMiniMapReduceCluster();
- runIncrementalPELoad(conf, table, testDir);
+ runIncrementalPELoad(conf, table, regionLocator, testDir);
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
+ new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table);
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
@@ -1003,7 +1008,7 @@ public class TestHFileOutputFormat {
assertEquals(2, fs.listStatus(storePath).length);
// minor compactions shouldn't get rid of the file
- admin.compact(TABLE_NAME.getName());
+ admin.compact(TABLE_NAME);
try {
quickPoll(new Callable() {
public Boolean call() throws Exception {
@@ -1016,7 +1021,7 @@ public class TestHFileOutputFormat {
}
// a major compaction should work though
- admin.majorCompact(TABLE_NAME.getName());
+ admin.majorCompact(TABLE_NAME);
quickPoll(new Callable() {
public Boolean call() throws Exception {
return fs.listStatus(storePath).length == 1;
@@ -1048,22 +1053,26 @@ public class TestHFileOutputFormat {
public void manualTest(String args[]) throws Exception {
Configuration conf = HBaseConfiguration.create();
util = new HBaseTestingUtility(conf);
- if ("newtable".equals(args[0])) {
- TableName tname = TableName.valueOf(args[1]);
- HTable table = util.createTable(tname, FAMILIES);
- HBaseAdmin admin = new HBaseAdmin(conf);
- admin.disableTable(tname);
- byte[][] startKeys = generateRandomStartKeys(5);
- util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
- admin.enableTable(tname);
- } else if ("incremental".equals(args[0])) {
- TableName tname = TableName.valueOf(args[1]);
- HTable table = new HTable(conf, tname);
- Path outDir = new Path("incremental-out");
- runIncrementalPELoad(conf, table, outDir);
- } else {
- throw new RuntimeException(
- "usage: TestHFileOutputFormat newtable | incremental");
+ try (Connection conn = util.createConnection()) {
+ if ("newtable".equals(args[0])) {
+ TableName tname = TableName.valueOf(args[1]);
+ Table table = util.createTableAndWait(new HTableDescriptor(tname), FAMILIES, conn);
+ Admin admin = conn.getAdmin();
+ admin.disableTable(tname);
+ byte[][] startKeys = generateRandomStartKeys(5);
+ util.createMultiRegions(conf, (HTable) table, FAMILIES[0], startKeys);
+ admin.enableTable(tname);
+ } else if ("incremental".equals(args[0])) {
+ TableName tname = TableName.valueOf(args[1]);
+ try (Table table = conn.getTable(tname);
+ RegionLocator regionLocator = conn.getRegionLocator(tname)) {
+ Path outDir = new Path("incremental-out");
+ runIncrementalPELoad(conf, table, regionLocator, outDir);
+ }
+ } else {
+ throw new RuntimeException(
+ "usage: TestHFileOutputFormat newtable | incremental");
+ }
}
}
diff --git hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java
index 45578c8..6dfd04d 100644
--- hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java
+++ hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java
@@ -625,6 +625,11 @@ public class HTablePool implements Closeable {
}
@Override
+ public void setClearBufferOnFail(boolean clearBufferOnFail) {
+ table.setClearBufferOnFail(clearBufferOnFail);
+ }
+
+ @Override
public long getWriteBufferSize() {
checkState();
return table.getWriteBufferSize();