From 7e877298c9cde0931816e52f8b49a38f0619031e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 25 Jan 2019 22:42:36 +0800 Subject: [PATCH] HBASE-21782 LoadIncrementalHFiles should not be IA.Public --- .../apache/hadoop/hbase/mapreduce/Driver.java | 3 +- .../hadoop/hbase/tool/BulkLoadHFiles.java | 92 +++++++++++++++++++ .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 73 +++++++++++++++ .../hbase/tool/LoadIncrementalHFiles.java | 65 ++++++------- .../hbase/tool/TestLoadIncrementalHFiles.java | 8 +- 5 files changed, 200 insertions(+), 41 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java index afa1ba7d53..1a249b0f82 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -23,6 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.util.ProgramDriver; @@ -47,7 +48,7 @@ public class Driver { pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); - pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, + pgd.addClass(LoadIncrementalHFiles.NAME, BulkLoadHFilesTool.class, "Complete a bulk data load."); pgd.addClass(CopyTable.NAME, CopyTable.class, "Export a table from local cluster to peer cluster."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java new file mode 100644 index 0000000000..c4b288f102 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The tool to let you load the output of {@code HFileOutputFormat} into an existing table + * programmatically. Not thread safe. + */ +@InterfaceAudience.Public +public interface BulkLoadHFiles { + + /** + * Represents an HFile waiting to be loaded. An queue is used in this class in order to support + * the case where a region has split during the process of the load. When this happens, the HFile + * is split into two physical parts across the new region boundary, and each part is added back + * into the queue. The import process finishes when the queue is empty. + */ + @InterfaceAudience.Public + public static class LoadQueueItem { + + private final byte[] family; + + private final Path hfilePath; + + public LoadQueueItem(byte[] family, Path hfilePath) { + this.family = family; + this.hfilePath = hfilePath; + } + + @Override + public String toString() { + return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); + } + + public byte[] getFamily() { + return family; + } + + public Path getFilePath() { + return hfilePath; + } + } + + /** + * Perform a bulk load of the given directory into the given pre-existing table. + * @param tableName the table to load into + * @param family2Files map of family to List of hfiles + * @throws TableNotFoundException if table does not yet exist + */ + Map bulkLoad(TableName tableName, Map> family2Files) + throws TableNotFoundException, IOException; + + /** + * Perform a bulk load of the given directory into the given pre-existing table. + * @param tableName the table to load into + * @param dir the directory that was provided as the output path of a job using + * {@code HFileOutputFormat} + * @throws TableNotFoundException if table does not yet exist + */ + Map bulkLoad(TableName tableName, Path dir) + throws TableNotFoundException, IOException; + + static BulkLoadHFiles create(Configuration conf) { + return new BulkLoadHFilesTool(conf); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java new file mode 100644 index 0000000000..0dc4c82e0a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a + * tool. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles { + + public BulkLoadHFilesTool(Configuration conf) { + super(conf); + } + + private Map convert( + Map map) { + return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + @Override + public Map bulkLoad(TableName tableName, + Map> family2Files) throws TableNotFoundException, IOException { + return convert(run(family2Files, tableName)); + } + + @Override + public Map bulkLoad(TableName tableName, Path dir) + throws TableNotFoundException, IOException { + return convert(run(dir, tableName)); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); + System.exit(ret); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 3320b1fb7f..dd9111ba07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.tool; +import static java.lang.String.format; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -46,7 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static java.lang.String.format; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -87,13 +88,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; -import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSVisitor; @@ -104,9 +98,19 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Tool to load the output of HFileOutputFormat into an existing table. + * @deprecated Use {@link BulkLoadHFiles} instead. */ +@Deprecated @InterfaceAudience.Public public class LoadIncrementalHFiles extends Configured implements Tool { @@ -142,28 +146,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * the case where a region has split during the process of the load. When this happens, the HFile * is split into two physical parts across the new region boundary, and each part is added back * into the queue. The import process finishes when the queue is empty. + * @deprecated Use {@link BulkLoadHFiles} instead. */ @InterfaceAudience.Public - public static class LoadQueueItem { - private final byte[] family; - private final Path hfilePath; + @Deprecated + public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { public LoadQueueItem(byte[] family, Path hfilePath) { - this.family = family; - this.hfilePath = hfilePath; - } - - @Override - public String toString() { - return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); - } - - public byte[] getFamily() { - return family; - } - - public Path getFilePath() { - return hfilePath; + super(family, hfilePath); } } @@ -825,8 +815,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. More * modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { - final Path hfofDir = new Path(dirPath); + private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { final FileSystem fs = hfofDir.getFileSystem(getConf()); // Add column families @@ -1148,13 +1137,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return getConf().getBoolean(ALWAYS_COPY_FILES, false); } - /** - * Perform bulk load on the given table. - * @param hfofDir the directory that was provided as the output path of a job using - * HFileOutputFormat - * @param tableName the table to load into - */ - public Map run(String hfofDir, TableName tableName) + protected final Map run(Path hfofDir, TableName tableName) throws IOException { try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1169,11 +1152,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), + return doBulkLoad(hfofDir, admin, table, locator, isSilence(), isAlwaysCopyFiles()); } } } + /** + * Perform bulk load on the given table. + * @param hfofDir the directory that was provided as the output path of a job using + * HFileOutputFormat + * @param tableName the table to load into + */ + public Map run(String hfofDir, TableName tableName) + throws IOException { + return run(new Path(hfofDir), tableName); + } /** * Perform bulk load on the given table. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java index 85235b6420..0dda3a113e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -346,7 +346,7 @@ public class TestLoadIncrementalHFiles { if (copyFiles) { conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); } - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); List args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); if (depth == 3) { args.add("-loadTable"); @@ -356,17 +356,17 @@ public class TestLoadIncrementalHFiles { if (deleteFile) { fs.delete(last, true); } - Map loaded = loader.run(map, tableName); + Map loaded = loader.bulkLoad(tableName, map); if (deleteFile) { expectedRows -= 1000; - for (LoadQueueItem item : loaded.keySet()) { + for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { if (item.getFilePath().getName().equals(last.getName())) { fail(last + " should be missing"); } } } } else { - loader.run(args.toArray(new String[]{})); + loader.run(args.toArray(new String[] {})); } if (copyFiles) { -- 2.17.1