diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a1650c1..af4e4da 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1557,6 +1557,14 @@ public class HTable implements HTableInterface, RegionLocator { } /** + * {@inheritDoc} + */ + @Override + public void setClearBufferOnFail(boolean clearBufferOnFail) { + this.clearBufferOnFail = clearBufferOnFail; + } + + /** * Returns the maximum size in bytes of the write buffer for this HTable. *

* The default value comes from the configuration parameter diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 888fd93..f1f647e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; public abstract class RegionServerCallable implements RetryingCallable { // Public because used outside of this package over in ipc. static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final HConnection connection; + protected final Connection connection; protected final TableName tableName; protected final byte[] row; protected HRegionLocation location; @@ -61,7 +61,7 @@ public abstract class RegionServerCallable implements RetryingCallable { * @param tableName Table name to which row belongs. * @param row The row we want in tableName. */ - public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { this.connection = connection; this.tableName = tableName; this.row = row; @@ -75,7 +75,7 @@ public abstract class RegionServerCallable implements RetryingCallable { */ @Override public void prepare(final boolean reload) throws IOException { - this.location = connection.getRegionLocation(tableName, row, reload); + this.location = connection.getRegionLocator(tableName).getRegionLocation(row, reload); if (this.location == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); @@ -87,7 +87,7 @@ public abstract class RegionServerCallable implements RetryingCallable { * @return {@link HConnection} instance used by this Callable. */ HConnection getConnection() { - return this.connection; + return (HConnection) this.connection; } protected ClientService.BlockingInterface getStub() { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 07e4c08..10ee831 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -522,6 +522,14 @@ public interface Table extends Closeable { void setAutoFlushTo(boolean autoFlush); /** + * Set clearBufferOnFail to true to erase the buffer after + * {@link #flushCommits()} has been called, regardless of success. + * + * @param clearBufferOnFail + */ + void setClearBufferOnFail(boolean clearBufferOnFail); + + /** * Returns the maximum size in bytes of the write buffer for this HTable. *

* The default value comes from the configuration parameter diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 9dac95b..b59a1a7 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -45,10 +45,13 @@ import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -246,47 +249,52 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { EnvironmentEdgeManager.currentTime(); Configuration conf = new Configuration(util.getConfiguration()); Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); - HTable table = new HTable(conf, getTablename()); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(getTablename()); + RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { + conf.setBoolean("mapreduce.map.speculative", false); + conf.setBoolean("mapreduce.reduce.speculative", false); + conf.setInt(ROUND_NUM_KEY, iteration); - conf.setBoolean("mapreduce.map.speculative", false); - conf.setBoolean("mapreduce.reduce.speculative", false); - conf.setInt(ROUND_NUM_KEY, iteration); + Job job = new Job(conf); - Job job = new Job(conf); + job.setJobName(jobName); - job.setJobName(jobName); + // set the input format so that we can create map tasks with no data input. + job.setInputFormatClass(ITBulkLoadInputFormat.class); - // set the input format so that we can create map tasks with no data input. - job.setInputFormatClass(ITBulkLoadInputFormat.class); + // Set the mapper classes. + job.setMapperClass(LinkedListCreationMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); - // Set the mapper classes. - job.setMapperClass(LinkedListCreationMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); + // Use the identity reducer + // So nothing to do here. - // Use the identity reducer - // So nothing to do here. + // Set this jar. + job.setJarByClass(getClass()); - // Set this jar. - job.setJarByClass(getClass()); + // Set where to place the hfiles. + FileOutputFormat.setOutputPath(job, p); - // Set where to place the hfiles. - FileOutputFormat.setOutputPath(job, p); + // Configure the partitioner and other things needed for HFileOutputFormat. + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); - // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat.configureIncrementalLoad(job, table); + // Run the job making sure it works. + assertEquals(true, job.waitForCompletion(true)); - // Run the job making sure it works. - assertEquals(true, job.waitForCompletion(true)); + // Create a new loader. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - // Create a new loader. - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + // Load the HFiles in. + loader.doBulkLoad(p, conn, table); + + // Delete the files. + util.getTestFileSystem().delete(p, true); + } - // Load the HFiles in. - loader.doBulkLoad(p, table); - // Delete the files. - util.getTestFileSystem().delete(p, true); } public static class EmptySplit extends InputSplit implements Writable { diff --git hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 65bf509..e482c4a 100644 --- hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -821,6 +821,11 @@ public class RemoteHTable implements HTableInterface { } @Override + public void setClearBufferOnFail(boolean clearBufferOnFail) { + throw new UnsupportedOperationException("setClearBufferOnFail not implemented"); + } + + @Override public long getWriteBufferSize() { throw new UnsupportedOperationException("getWriteBufferSize not implemented"); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index afc8a09..f727136 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -308,6 +308,11 @@ public class HTableWrapper implements HTableInterface { } @Override + public void setClearBufferOnFail(boolean clearBufferOnFail) { + table.setClearBufferOnFail(clearBufferOnFail); + } + + @Override public long getWriteBufferSize() { return table.getWriteBufferSize(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java index 150bb25..60d1ad5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java @@ -32,7 +32,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -66,44 +67,27 @@ public class DefaultVisibilityExpressionResolver implements VisibilityExpression @Override public void init() { // Reading all the labels and ordinal. - // This scan should be done by user with global_admin previliges.. Ensure that it works - Table labelsTable = null; - try { - labelsTable = new HTable(conf, LABELS_TABLE_NAME); - } catch (TableNotFoundException e) { - // Just return with out doing any thing. When the VC is not used we wont be having 'labels' - // table in the cluster. - return; - } catch (IOException e) { - LOG.error("Error opening 'labels' table", e); - return; - } + // This scan should be done by user with global_admin previliges.. Ensure + // that it works Scan scan = new Scan(); scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL)); scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); - ResultScanner scanner = null; - try { - scanner = labelsTable.getScanner(scan); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table labelsTable = conn.getTable(LABELS_TABLE_NAME); + ResultScanner scanner = labelsTable.getScanner(scan)) { Result next = null; while ((next = scanner.next()) != null) { byte[] row = next.getRow(); byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); labels.put(Bytes.toString(value), Bytes.toInt(row)); } + } catch (TableNotFoundException e) { + // Just return with out doing any thing. When the VC is not used we wont be having 'labels' + // table in the cluster. + return; } catch (IOException e) { - LOG.error("Error reading 'labels' table", e); - } finally { - try { - if (scanner != null) { - scanner.close(); - } - } finally { - try { - labelsTable.close(); - } catch (IOException e) { - LOG.warn("Error on closing 'labels' table", e); - } - } + LOG.error("Error opening 'labels' table", e); + return; } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index c10359e..ef1c8bb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -83,10 +84,34 @@ public class HFileOutputFormat extends FileOutputFormat * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. + * + * @deprecated Use {{@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. */ + @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class); + HFileOutputFormat2.configureIncrementalLoad(job, table, table, HFileOutputFormat.class); + } + + + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + *

+ * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, Table table, + RegionLocator regionLocator) throws IOException { + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 2c0efc8..42ddf98 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -357,10 +357,10 @@ public class HFileOutputFormat2 */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table, HFileOutputFormat2.class); + configureIncrementalLoad(job, table, table, HFileOutputFormat2.class); } - static void configureIncrementalLoad(Job job, HTable table, + static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, Class> cls) throws IOException { Configuration conf = job.getConfiguration(); @@ -386,8 +386,8 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); - List startKeys = getRegionStartKeys(table); + LOG.info("Looking up current regions for table " + table.getName()); + List startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); @@ -401,8 +401,7 @@ public class HFileOutputFormat2 TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + Bytes.toString(table.getTableName()) - + " output configured."); + LOG.info("Incremental table " + table.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java index f88d959..3334c3b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java @@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -125,7 +126,8 @@ implements Configurable { try { TableName tableName = TableName.valueOf(configuration .get(TableOutputFormat.OUTPUT_TABLE)); - this.table = new HTable(this.conf, tableName); + Connection conn = ConnectionFactory.createConnection(this.conf); + this.table = conn.getRegionLocator(tableName); } catch (IOException e) { LOG.error(e); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 1033dac..02746c5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -42,13 +42,16 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -438,15 +441,19 @@ public class Import extends Configured implements Tool { if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); - HTable table = new HTable(conf, tableName); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Preconditions.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } } else { // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. @@ -497,19 +504,14 @@ public class Import extends Configured implements Tool { */ public static void flushRegionsIfNecessary(Configuration conf) throws IOException, InterruptedException { - String tableName = conf.get(TABLE_NAME); - HBaseAdmin hAdmin = null; + TableName tableName = TableName.valueOf(conf.get(TABLE_NAME)); String durability = conf.get(WAL_DURABILITY); // Need to flush if the data is written to hbase and skip wal is enabled. if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) { - try { - hAdmin = new HBaseAdmin(conf); - hAdmin.flush(tableName); - } finally { - if (hAdmin != null) { - hAdmin.close(); - } + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { + admin.flush(tableName); } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 5b375e9..6caa07c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -402,81 +404,86 @@ public class ImportTsv extends Configured implements Tool { public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException { - HBaseAdmin admin = new HBaseAdmin(conf); - // Support non-XML supported characters - // by re-encoding the passed separator as a Base64 string. - String actualSeparator = conf.get(SEPARATOR_CONF_KEY); - if (actualSeparator != null) { - conf.set(SEPARATOR_CONF_KEY, - Base64.encodeBytes(actualSeparator.getBytes())); - } - - // See if a non-default Mapper was set - String mapperClassName = conf.get(MAPPER_CONF_KEY); - Class mapperClass = mapperClassName != null ? - Class.forName(mapperClassName) : DEFAULT_MAPPER; - - TableName tableName = TableName.valueOf(args[0]); - Path inputDir = new Path(args[1]); - String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString()); - Job job = Job.getInstance(conf, jobName); - job.setJarByClass(mapperClass); - FileInputFormat.setInputPaths(job, inputDir); - job.setInputFormatClass(TextInputFormat.class); - job.setMapperClass(mapperClass); - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - String columns[] = conf.getStrings(COLUMNS_CONF_KEY); - if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { - String fileLoc = conf.get(CREDENTIALS_LOCATION); - Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); - job.getCredentials().addAll(cred); - } - - if (hfileOutPath != null) { - if (!admin.tableExists(tableName)) { - String errorMsg = format("Table '%s' does not exist.", tableName); - if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { - LOG.warn(errorMsg); - // TODO: this is backwards. Instead of depending on the existence of a table, - // create a sane splits file for HFileOutputFormat based on data sampling. - createTable(admin, tableName, columns); - } else { - LOG.error(errorMsg); - throw new TableNotFoundException(errorMsg); + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { + // Support non-XML supported characters + // by re-encoding the passed separator as a Base64 string. + String actualSeparator = conf.get(SEPARATOR_CONF_KEY); + if (actualSeparator != null) { + conf.set(SEPARATOR_CONF_KEY, + Base64.encodeBytes(actualSeparator.getBytes())); + } + + // See if a non-default Mapper was set + String mapperClassName = conf.get(MAPPER_CONF_KEY); + Class mapperClass = mapperClassName != null ? + Class.forName(mapperClassName) : DEFAULT_MAPPER; + + TableName tableName = TableName.valueOf(args[0]); + Path inputDir = new Path(args[1]); + String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString()); + Job job = Job.getInstance(conf, jobName); + job.setJarByClass(mapperClass); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(mapperClass); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + String columns[] = conf.getStrings(COLUMNS_CONF_KEY); + if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { + String fileLoc = conf.get(CREDENTIALS_LOCATION); + Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); + job.getCredentials().addAll(cred); + } + + if (hfileOutPath != null) { + if (!admin.tableExists(tableName)) { + String errorMsg = format("Table '%s' does not exist.", tableName); + if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { + LOG.warn(errorMsg); + // TODO: this is backwards. Instead of depending on the existence of a table, + // create a sane splits file for HFileOutputFormat based on data sampling. + createTable(admin, tableName, columns); + } else { + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + } + try (Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + job.setReducerClass(PutSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + if (mapperClass.equals(TsvImporterTextMapper.class)) { + job.setMapOutputValueClass(Text.class); + job.setReducerClass(TextSortReducer.class); + } else { + job.setMapOutputValueClass(Put.class); + job.setCombinerClass(PutCombiner.class); + } + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); } - } - HTable table = new HTable(conf, tableName); - job.setReducerClass(PutSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - if (mapperClass.equals(TsvImporterTextMapper.class)) { - job.setMapOutputValueClass(Text.class); - job.setReducerClass(TextSortReducer.class); } else { - job.setMapOutputValueClass(Put.class); - job.setCombinerClass(PutCombiner.class); - } - HFileOutputFormat.configureIncrementalLoad(job, table); - } else { - if (mapperClass.equals(TsvImporterTextMapper.class)) { - usage(TsvImporterTextMapper.class.toString() - + " should not be used for non bulkloading case. use " - + TsvImporterMapper.class.toString() - + " or custom mapper whose value type is Put."); - System.exit(-1); - } - // No reducers. Just write straight to table. Call initTableReducerJob - // to set up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, - job); - job.setNumReduceTasks(0); - } + if (mapperClass.equals(TsvImporterTextMapper.class)) { + usage(TsvImporterTextMapper.class.toString() + + " should not be used for non bulkloading case. use " + + TsvImporterMapper.class.toString() + + " or custom mapper whose value type is Put."); + System.exit(-1); + } + // No reducers. Just write straight to table. Call initTableReducerJob + // to set up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, + job); + job.setNumReduceTasks(0); + } - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Function.class /* Guava used by TsvParser */); - return job; + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Function.class /* Guava used by TsvParser */); + return job; + } } private static void createTable(Admin admin, TableName tableName, String[] columns) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 855417d..b276188 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -65,9 +65,10 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; @@ -110,7 +111,6 @@ import java.util.UUID; @InterfaceStability.Stable public class LoadIncrementalHFiles extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class); - private Admin hbAdmin; public static final String NAME = "completebulkload"; public static final String MAX_FILES_PER_REGION_PER_FAMILY @@ -127,6 +127,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private FsDelegationToken fsDelegationToken; private String bulkToken; private UserProvider userProvider; + private boolean initialized = false; private LoadIncrementalHFiles() {} @@ -136,17 +137,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private void initialize() throws Exception { - if (hbAdmin == null) { + if (!initialized) { // make a copy, just to be sure we're not overriding someone else's config setConf(HBaseConfiguration.create(getConf())); Configuration conf = getConf(); // disable blockcache for tool invocation, see HBASE-10500 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); - this.hbAdmin = new HBaseAdmin(conf); this.userProvider = UserProvider.instantiate(conf); this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); + initialized = true; } } @@ -232,17 +233,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * of a job using HFileOutputFormat * @param table the table to load into * @throws TableNotFoundException if table does not yet exist + * @deprecated Use {@link #doBulkLoad(Path, Connection, Table)} instead. */ - @SuppressWarnings("deprecation") public void doBulkLoad(Path hfofDir, final HTable table) + throws TableNotFoundException, IOException { + doBulkLoad(hfofDir, table.getConnection(), table); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param table the table to load into + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Path hfofDir, final Connection conn, final Table table) throws TableNotFoundException, IOException { - final HConnection conn = table.getConnection(); - - if (!conn.isTableAvailable(table.getName())) { - throw new TableNotFoundException("Table " + - Bytes.toStringBinary(table.getTableName()) + - "is not currently available."); + if (!conn.getAdmin().isTableAvailable(table.getName())) { + throw new TableNotFoundException("Table " + table.getName() + + "is not currently available."); } // initialize thread pools @@ -278,7 +290,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + Bytes.toString(table.getTableName()) + " are: " + familyNames; + + table.getName() + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -300,9 +312,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } // Assumes that region splits can happen while this occurs. + RegionLocator regionLocator = conn.getRegionLocator(table.getName()); while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair startEndKeys = table.getStartEndKeys(); + final Pair startEndKeys = regionLocator.getStartEndKeys(); if (count != 0) { LOG.info("Split occured while grouping HFiles, retry attempt " + + count + " with " + queue.size() + " files remaining to group or split"); @@ -361,7 +374,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * them. Any failures are re-queued for another pass with the * groupOrSplitPhase. */ - protected void bulkLoadPhase(final Table table, final HConnection conn, + protected void bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque queue, final Multimap regionGroups) throws IOException { // atomically bulk load the groups. @@ -433,7 +446,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return A Multimap that groups LQI by likely * bulk load region targets. */ - private Multimap groupOrSplitPhase(final HTable table, + private Multimap groupOrSplitPhase(final Table table, ExecutorService pool, Deque queue, final Pair startEndKeys) throws IOException { // need synchronized only within this scope of this @@ -526,7 +539,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @throws IOException */ protected List groupOrSplit(Multimap regionGroups, - final LoadQueueItem item, final HTable table, + final LoadQueueItem item, final Table table, final Pair startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; @@ -571,18 +584,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ if (indexForCallable < 0) { throw new IOException("The first region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { throw new IOException("The last region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if (indexForCallable + 1 < startEndKeys.getFirst().length && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { throw new IOException("The endkey of one region for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " is not equal to the startkey of the next region in hbase:meta." + "Please use hbck tool to fix it first."); } @@ -603,10 +616,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } /** - * @deprecated Use {@link #tryAtomicRegionLoad(HConnection, TableName, byte[], Collection)} + * @deprecated Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)} */ @Deprecated - protected List tryAtomicRegionLoad(final HConnection conn, + protected List tryAtomicRegionLoad(final Connection conn, final byte [] tableName, final byte[] first, Collection lqis) throws IOException { return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis); @@ -625,7 +638,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List tryAtomicRegionLoad(final HConnection conn, + protected List tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, Collection lqis) throws IOException { final List> famPaths = @@ -648,10 +661,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { if (!isSecureBulkLoadEndpointAvailable()) { success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); } else { - Table table = new HTable(conn.getConfiguration(), getTableName()); - secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), - bulkToken, getLocation().getRegionInfo().getStartKey()); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), + bulkToken, getLocation().getRegionInfo().getStartKey()); + } } return success; } finally { @@ -785,7 +799,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private boolean doesTableExist(TableName tableName) throws Exception { - return hbAdmin.tableExists(tableName); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + return admin.tableExists(tableName); + } } /* @@ -892,7 +909,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } keys = LoadIncrementalHFiles.inferBoundaries(map); - this.hbAdmin.createTable(htd,keys); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + admin.createTable(htd,keys); + } LOG.info("Table "+ tableName +" is available!!"); } @@ -921,9 +941,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } Path hfofDir = new Path(dirPath); - HTable table = new HTable(getConf(), tableName); - doBulkLoad(hfofDir, table); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Table table = conn.getTable(tableName)) { + doBulkLoad(hfofDir, conn, table); + } + return 0; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index 6a66d37..6d0afae 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -29,10 +29,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java index c1d8373..ee86f95 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java @@ -29,13 +29,16 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -73,7 +76,7 @@ public class MultiTableOutputFormat extends OutputFormat { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); - Map tables; + Map> tables; Configuration conf; boolean useWriteAheadLogging; @@ -88,7 +91,7 @@ public class MultiTableOutputFormat extends OutputFormat(); + this.tables = new HashMap<>(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } @@ -100,21 +103,26 @@ public class MultiTableOutputFormat extends OutputFormat(conn, table)); } - return tables.get(tableName); + return tables.get(tableName).getSecond(); } @Override public void close(TaskAttemptContext context) throws IOException { - for (HTable table : tables.values()) { + for (Pair pair : tables.values()) { + Table table = pair.getSecond(); table.flushCommits(); + table.close(); + pair.getFirst().close(); } + tables.clear(); } /** @@ -129,7 +137,7 @@ public class MultiTableOutputFormat extends OutputFormat The type of the key. */ - protected static class TableRecordWriter + protected class TableRecordWriter extends RecordWriter { /** The table to write to. */ - private Table table; +// private Table table; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. * * @param table The table to write to. */ - public TableRecordWriter(Table table) { - this.table = table; + public TableRecordWriter() { } /** @@ -112,6 +113,7 @@ implements Configurable { public void close(TaskAttemptContext context) throws IOException { table.close(); + connection.close(); } /** @@ -125,8 +127,8 @@ implements Configurable { @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) this.table.put(new Put((Put)value)); - else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); + if (value instanceof Put) table.put(new Put((Put)value)); + else if (value instanceof Delete) table.delete(new Delete((Delete)value)); else throw new IOException("Pass a Delete or a Put"); } } @@ -144,7 +146,7 @@ implements Configurable { public RecordWriter getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { - return new TableRecordWriter(this.table); + return new TableRecordWriter(); } /** @@ -205,8 +207,9 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.table = new HTable(this.conf, TableName.valueOf(tableName)); - this.table.setAutoFlush(false, true); + this.connection = ConnectionFactory.createConnection(otherConf); + this.table = connection.getTable(TableName.valueOf(tableName)); + this.table.setClearBufferOnFail(true); LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index cf9dc56..5ee8cdb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -34,10 +34,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -247,13 +251,18 @@ public class WALPlayer extends Configured implements Tool { if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); } - HTable table = new HTable(conf, TableName.valueOf(tables[0])); - job.setMapperClass(HLogKeyValueMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat.configureIncrementalLoad(job, table); + TableName tableName = TableName.valueOf(tables[0]); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + job.setMapperClass(HLogKeyValueMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); + } TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index c091312..72f7885 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -25,10 +25,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.HConnectable; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -87,6 +85,7 @@ public class VerifyReplication extends Configured implements Tool { public static enum Counters { GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} + private Connection replicatedScannerConnection; private ResultScanner replicatedScanner; private Result currentCompareRowInPeerTable; @@ -121,22 +120,19 @@ public class VerifyReplication extends Configured implements Tool { } final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); - HConnectionManager.execute(new HConnectable(conf) { - @Override - public Void connect(HConnection conn) throws IOException { - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = HBaseConfiguration.create(conf); - ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); - - TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); - // TODO: THis HTable doesn't get closed. Fix! - Table replicatedTable = new HTable(peerConf, tableName); - scan.setStartRow(value.getRow()); - scan.setStopRow(tableSplit.getEndRow()); - replicatedScanner = replicatedTable.getScanner(scan); - return null; - } - }); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); + Configuration peerConf = HBaseConfiguration.create(conf); + ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); + + TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); + // TODO: This connection get closed. Fix! + replicatedScannerConnection = ConnectionFactory.createConnection(conf); + Table replicatedTable = replicatedScannerConnection.getTable(tableName); + scan.setStartRow(value.getRow()); + scan.setStopRow(tableSplit.getEndRow()); + replicatedScanner = replicatedTable.getScanner(scan); + } currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { @@ -191,6 +187,14 @@ public class VerifyReplication extends Configured implements Tool { replicatedScanner = null; } } + if (replicatedScannerConnection != null) { + try { + replicatedScannerConnection.close(); + } catch (IOException e) { + LOG.error("fail to close peer table connection in cleanup", e); + replicatedScannerConnection = null; + } + } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index bbd9dbe..4e885a7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -280,6 +280,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * be wholesome. Rather than use the return direct, its usually best to * make a copy and use that. Do * Configuration c = new Configuration(INSTANCE.getConfiguration()); + * + *

Alternatively, create a connection using {@link #createConnection()} * @return Instance of Configuration. */ @Override @@ -287,6 +289,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return super.getConfiguration(); } + public Connection createConnection() throws IOException { + return ConnectionFactory.createConnection(getConfiguration()); + } + public void setHBaseCluster(HBaseCluster hbaseCluster) { this.hbaseCluster = hbaseCluster; } @@ -1183,7 +1189,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** - * Create a table. + * Create a table and wait until it's available. + * * @param htd * @param families * @param c Configuration to use @@ -1192,6 +1199,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c) throws IOException { + initializeTable(htd, families); + // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned + waitUntilAllRegionsAssigned(htd.getTableName()); + return new HTable(c, htd.getTableName()); + } + + /** + * Initialiate the creation of the table in the descriptor, and don't wait for its completion. + */ + public void initializeTable(HTableDescriptor htd, byte[][] families) + throws IOException { for(byte[] family : families) { HColumnDescriptor hcd = new HColumnDescriptor(family); // Disable blooms (they are on by default as of 0.95) but we disable them here because @@ -1201,9 +1219,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { htd.addFamily(hcd); } getHBaseAdmin().createTable(htd); + } + + + /** + * Create a table and wait until it's available. + * + * @param htd + * @param families + * @param conn + * @return a reference to the table. + * @throws IOException + */ + public Table createTableAndWait(HTableDescriptor htd, byte[][] families, Connection conn) throws IOException { + initializeTable(htd, families); // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned waitUntilAllRegionsAssigned(htd.getTableName()); - return new HTable(c, htd.getTableName()); + return conn.getTable( htd.getTableName()); } /** @@ -1232,15 +1264,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { final Configuration c) throws IOException { HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - for(byte[] family : families) { - HColumnDescriptor hcd = new HColumnDescriptor(family); - // Disable blooms (they are on by default as of 0.95) but we disable them here because - // tests have hard coded counts of what to expect in block cache, etc., and blooms being - // on is interfering. - hcd.setBloomFilterType(BloomType.NONE); - desc.addFamily(hcd); - } - getHBaseAdmin().createTable(desc); + initializeTable(desc, families); return new HTable(c, desc.getTableName()); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 4890edd..344ac05 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -577,7 +579,10 @@ public class TestRegionObserverInterface { createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); //Bulk load - new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table newTable = conn.getTable(tableName)) { + new LoadIncrementalHFiles(conf).doBulkLoad(dir, conn, newTable); + } verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 9bdebe6..0327c3a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.HadoopShims; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PerformanceEvaluation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -371,21 +373,21 @@ public class TestHFileOutputFormat { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); byte[][] startKeys = generateRandomStartKeys(5); - HBaseAdmin admin = null; - try { + try (Connection conn = util.createConnection(); + Admin admin = conn.getAdmin(); + Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn); + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME)){ util.startMiniCluster(); Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); - admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); assertEquals("Should start with empty table", 0, util.countRows(table)); int numRegions = util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], startKeys); + util.getConfiguration(), (HTable) table, FAMILIES[0], startKeys); assertEquals("Should make 5 regions", numRegions, 5); // Generate the bulk load files util.startMiniMapReduceCluster(); - runIncrementalPELoad(conf, table, testDir); + runIncrementalPELoad(conf, table, regionLocator, testDir); // This doesn't write into the table, just makes files assertEquals("HFOF should not touch actual table", 0, util.countRows(table)); @@ -405,7 +407,7 @@ public class TestHFileOutputFormat { // handle the split case if (shouldChangeRegions) { LOG.info("Changing regions in table"); - admin.disableTable(table.getTableName()); + admin.disableTable(table.getName()); while(util.getMiniHBaseCluster().getMaster().getAssignmentManager(). getRegionStates().isRegionsInTransition()) { Threads.sleep(200); @@ -413,17 +415,17 @@ public class TestHFileOutputFormat { } byte[][] newStartKeys = generateRandomStartKeys(15); util.createMultiRegions( - util.getConfiguration(), table, FAMILIES[0], newStartKeys); - admin.enableTable(table.getTableName()); - while (table.getRegionLocations().size() != 15 || - !admin.isTableAvailable(table.getTableName())) { + util.getConfiguration(), (HTable) table, FAMILIES[0], newStartKeys); + admin.enableTable(table.getName()); + while (regionLocator.getAllRegionLocations().size() != 15 || + !admin.isTableAvailable(table.getName())) { Thread.sleep(200); LOG.info("Waiting for new region assignment to happen"); } } // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table); // Ensure data shows up int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; @@ -453,27 +455,26 @@ public class TestHFileOutputFormat { assertEquals("Data should remain after reopening of regions", tableDigestBefore, util.checksumRows(table)); } finally { - if (admin != null) admin.close(); util.shutdownMiniMapReduceCluster(); util.shutdownMiniCluster(); } } - private void runIncrementalPELoad( - Configuration conf, HTable table, Path outDir) - throws Exception { + private void runIncrementalPELoad(Configuration conf, Table table, + RegionLocator regionLocator, Path outDir) throws Exception { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator, + HFileOutputFormat.class); FileOutputFormat.setOutputPath(job, outDir); Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ; - assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); + assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); assertTrue(job.waitForCompletion(true)); } @@ -792,7 +793,8 @@ public class TestHFileOutputFormat { Path dir = util.getDataTestDir("testColumnFamilySettings"); // Setup table descriptor - HTable table = Mockito.mock(HTable.class); + Table table = Mockito.mock(Table.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) { @@ -800,7 +802,7 @@ public class TestHFileOutputFormat { } // set up the table to return some mock keys - setupMockStartKeys(table); + setupMockStartKeys(regionLocator); try { // partial map red setup to get an operational writer for testing @@ -810,7 +812,7 @@ public class TestHFileOutputFormat { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat.configureIncrementalLoad(job, table, regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat(); @@ -891,11 +893,12 @@ public class TestHFileOutputFormat { conf.setInt("hbase.hstore.compaction.min", 2); generateRandomStartKeys(5); - try { + try (Connection conn = util.createConnection(); + Admin admin = conn.getAdmin()){ util.startMiniCluster(); final FileSystem fs = util.getDFSCluster().getFileSystem(); - HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); + Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn); + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME); assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir @@ -912,9 +915,9 @@ public class TestHFileOutputFormat { for (int i = 0; i < 2; i++) { Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); - runIncrementalPELoad(conf, table, testDir); + runIncrementalPELoad(conf, table, regionLocator, testDir); // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table); } // Ensure data shows up @@ -926,7 +929,7 @@ public class TestHFileOutputFormat { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME.getName()); + admin.compact(TABLE_NAME); try { quickPoll(new Callable() { public Boolean call() throws Exception { @@ -939,7 +942,7 @@ public class TestHFileOutputFormat { } // a major compaction should work though - admin.majorCompact(TABLE_NAME.getName()); + admin.majorCompact(TABLE_NAME); quickPoll(new Callable() { public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; @@ -958,12 +961,14 @@ public class TestHFileOutputFormat { conf.setInt("hbase.hstore.compaction.min", 2); generateRandomStartKeys(5); - try { + try (Connection conn = util.createConnection(); + Admin admin = conn.getAdmin(); + Table table = util.createTableAndWait(new HTableDescriptor(TABLE_NAME), FAMILIES, conn); + RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME)){ util.startMiniCluster(); Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); final FileSystem fs = util.getDFSCluster().getFileSystem(); - HBaseAdmin admin = new HBaseAdmin(conf); - HTable table = util.createTable(TABLE_NAME, FAMILIES); + assertEquals("Should start with empty table", 0, util.countRows(table)); // deep inspection: get the StoreFile dir @@ -977,7 +982,7 @@ public class TestHFileOutputFormat { Put p = new Put(Bytes.toBytes("test")); p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); table.put(p); - admin.flush(TABLE_NAME.getName()); + admin.flush(TABLE_NAME); assertEquals(1, util.countRows(table)); quickPoll(new Callable() { public Boolean call() throws Exception { @@ -989,10 +994,10 @@ public class TestHFileOutputFormat { conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); util.startMiniMapReduceCluster(); - runIncrementalPELoad(conf, table, testDir); + runIncrementalPELoad(conf, table, regionLocator, testDir); // Perform the actual load - new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table); + new LoadIncrementalHFiles(conf).doBulkLoad(testDir, conn, table); // Ensure data shows up int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; @@ -1003,7 +1008,7 @@ public class TestHFileOutputFormat { assertEquals(2, fs.listStatus(storePath).length); // minor compactions shouldn't get rid of the file - admin.compact(TABLE_NAME.getName()); + admin.compact(TABLE_NAME); try { quickPoll(new Callable() { public Boolean call() throws Exception { @@ -1016,7 +1021,7 @@ public class TestHFileOutputFormat { } // a major compaction should work though - admin.majorCompact(TABLE_NAME.getName()); + admin.majorCompact(TABLE_NAME); quickPoll(new Callable() { public Boolean call() throws Exception { return fs.listStatus(storePath).length == 1; @@ -1048,22 +1053,26 @@ public class TestHFileOutputFormat { public void manualTest(String args[]) throws Exception { Configuration conf = HBaseConfiguration.create(); util = new HBaseTestingUtility(conf); - if ("newtable".equals(args[0])) { - TableName tname = TableName.valueOf(args[1]); - HTable table = util.createTable(tname, FAMILIES); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.disableTable(tname); - byte[][] startKeys = generateRandomStartKeys(5); - util.createMultiRegions(conf, table, FAMILIES[0], startKeys); - admin.enableTable(tname); - } else if ("incremental".equals(args[0])) { - TableName tname = TableName.valueOf(args[1]); - HTable table = new HTable(conf, tname); - Path outDir = new Path("incremental-out"); - runIncrementalPELoad(conf, table, outDir); - } else { - throw new RuntimeException( - "usage: TestHFileOutputFormat newtable | incremental"); + try (Connection conn = util.createConnection()) { + if ("newtable".equals(args[0])) { + TableName tname = TableName.valueOf(args[1]); + Table table = util.createTableAndWait(new HTableDescriptor(tname), FAMILIES, conn); + Admin admin = conn.getAdmin(); + admin.disableTable(tname); + byte[][] startKeys = generateRandomStartKeys(5); + util.createMultiRegions(conf, (HTable) table, FAMILIES[0], startKeys); + admin.enableTable(tname); + } else if ("incremental".equals(args[0])) { + TableName tname = TableName.valueOf(args[1]); + try (Table table = conn.getTable(tname); + RegionLocator regionLocator = conn.getRegionLocator(tname)) { + Path outDir = new Path("incremental-out"); + runIncrementalPELoad(conf, table, regionLocator, outDir); + } + } else { + throw new RuntimeException( + "usage: TestHFileOutputFormat newtable | incremental"); + } } } diff --git hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java index 45578c8..6dfd04d 100644 --- hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java +++ hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java @@ -625,6 +625,11 @@ public class HTablePool implements Closeable { } @Override + public void setClearBufferOnFail(boolean clearBufferOnFail) { + table.setClearBufferOnFail(clearBufferOnFail); + } + + @Override public long getWriteBufferSize() { checkState(); return table.getWriteBufferSize();