diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 85f8125371..7b02c5ae09 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -81,7 +81,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -149,9 +148,11 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { private static final String OPT_LOAD = "load"; private static final String OPT_CHECK = "check"; + private static final String OPT_USE_DISTRIBUTED_BULKLOAD = "useDistributedBulkload"; private boolean load = false; private boolean check = false; + private boolean useDistributedBulkload = false; public static class SlowMeCoproScanOperations extends BaseRegionObserver { static final AtomicLong sleepTime = new AtomicLong(2000); @@ -210,17 +211,24 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { @Test public void testBulkLoad() throws Exception { - runLoad(); + runLoad(false); installSlowingCoproc(); runCheckWithRetry(); } - public void runLoad() throws Exception { + @Test + public void testDistributedBulkload() throws Exception { + runLoad(true); + installSlowingCoproc(); + runCheckWithRetry(); + } + + public void runLoad(boolean useDistributedBulkload) throws Exception { setupTable(); int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS); LOG.info("Running load with numIterations:" + numImportRounds); for (int i = 0; i < numImportRounds; i++) { - runLinkedListMRJob(i); + runLinkedListMRJob(i, useDistributedBulkload); } } @@ -249,7 +257,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount); } - private void runLinkedListMRJob(int iteration) throws Exception { + private void runLinkedListMRJob(int iteration, boolean useDistributedBulkload) throws Exception { String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " + EnvironmentEdgeManager.currentTime(); Configuration conf = new Configuration(util.getConfiguration()); @@ -296,10 +304,15 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { assertEquals(true, job.waitForCompletion(true)); // Create a new loader. - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + if (useDistributedBulkload) { + LoadIncrementalHFilesJob loader = new LoadIncrementalHFilesJob(); + loader.run(p.toString(), table.getName().getNameAsString(), conf); + } else { + // Load the HFiles in. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + loader.doBulkLoad(p, admin, table, regionLocator); + } - // Load the HFiles in. - loader.doBulkLoad(p, admin, table, regionLocator); } // Delete the files. @@ -760,6 +773,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { super.addOptions(); super.addOptNoArg(OPT_CHECK, "Run check only"); super.addOptNoArg(OPT_LOAD, "Run load only"); + super.addOptNoArg(OPT_USE_DISTRIBUTED_BULKLOAD, "Use the distributed version of bulkload"); } @Override @@ -767,12 +781,13 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { super.processOptions(cmd); check = cmd.hasOption(OPT_CHECK); load = cmd.hasOption(OPT_LOAD); + useDistributedBulkload = cmd.hasOption(OPT_USE_DISTRIBUTED_BULKLOAD); } @Override public int runTestFromCommandLine() throws Exception { if (load) { - runLoad(); + runLoad(useDistributedBulkload); } else if (check) { installSlowingCoproc(); runCheckWithRetry(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 7a8b9606f3..92901ec906 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -233,8 +233,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { TFamily family = visitor.bulkFamily(familyName); FileStatus[] hfileStatuses = fs.listStatus(familyDir); + for (FileStatus hfileStatus : hfileStatuses) { - if (!fs.isFile(hfileStatus.getPath())) { + if (!hfileStatus.isFile()) { LOG.warn("Skipping non-file " + hfileStatus); continue; } @@ -265,7 +266,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.warn("the file " + hfile + " was removed"); continue; } - } + } visitor.bulkHFile(family, hfileStatus); } @@ -393,34 +394,41 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } - ExecutorService pool = createExecutorService(); - // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque queue = new LinkedList(); - try { - /* + + /* * Checking hfile format is a time-consuming operation, we should have an option to skip * this step when bulkloading millions of HFiles. See HBASE-13985. */ - boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); - if(!validateHFile) { - LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + - "are not correct. If you fail to read data from your table after using this " + - "option, consider removing the files and bulkload again without this option. " + - "See HBASE-13985"); - } - prepareHFileQueue(hfofDir, table, queue, validateHFile); + boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); + if(!validateHFile) { + LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + + "are not correct. If you fail to read data from your table after using this " + + "option, consider removing the files and bulkload again without this option. " + + "See HBASE-13985"); + } + prepareHFileQueue(hfofDir, table, queue, validateHFile); - int count = 0; + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir.toUri() + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return; + } - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir.toUri() + ". Does it contain files in " + - "subdirectories that correspond to column family names?"); - return; - } + doBulkloadFromQueue(queue, table, regionLocator, admin.getConnection()); + } + public void doBulkloadFromQueue(Deque queue, Table table, RegionLocator regionLocator, + Connection connection) throws IOException { + + ExecutorService pool = createExecutorService(); + + int count = 0; + + try { //If using secure bulk load, get source delegation token, and //prepare staging directory and token // fs is the source filesystem @@ -456,7 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); + bulkLoadPhase(table, connection, pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFilesJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFilesJob.java index f7cb221c31..8bd2e1d24c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFilesJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFilesJob.java @@ -1,4 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.mapreduce; -public class LoadIncrementalHFilesJob { +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; + +public class LoadIncrementalHFilesJob extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(LoadIncrementalHFilesJob.class); + + public static final String ROOT_DIR = "loadincrementalhfilesjob.root.dir"; + public static final String TABLE_NAME = "loadincrementalhfilesjob.table.name"; + public static final String MAX_MAP_TASK = "loadincrementalhfilesjob.max.map.tasks"; + public static final String DEPTH = "loadincrementalhfilesjob.depth"; + + @Override public int run(String[] args) throws Exception { + Configuration configuration = HBaseConfiguration.create(getConf()); + + return run(args[0], args[1], configuration); + } + + @VisibleForTesting public int run(String rootPath, String table, Configuration configuration) + throws Exception { + Path basePath = new Path(rootPath); + configuration.set(TABLE_NAME, table); + configuration.set(ROOT_DIR, basePath.toString()); + + if (!bulkload(basePath, table, configuration)) { + return 1; + } + + return 0; + } + + private boolean bulkload(Path rootDir, String table, Configuration configuration) + throws Exception { + Configuration jobConfiguration = new Configuration(configuration); + + applyNonOverridableOptions(jobConfiguration); + + Job job = Job.getInstance(jobConfiguration, "Bulkload-" + table + "-" + rootDir.toString()); + + TableMapReduceUtil.addDependencyJars(job); + + job.setJarByClass(LoadIncrementalHFilesJob.class); + + job.setInputFormatClass(BulkLoadInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + + job.setMapperClass(BulkoadMapper.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + + job.setNumReduceTasks(0); + + return job.waitForCompletion(true); + } + + private static void applyNonOverridableOptions(Configuration configuration) { + configuration.setInt("mapred.map.max.attempts", 1); + configuration.setBoolean("mapred.map.tasks.speculative.execution", false); + configuration.setBoolean("mapreduce.map.speculative", false); + configuration.setBoolean("mapreduce.task.classpath.user.precedence", true); + configuration.setInt("mapreduce.task.timeout", 0); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new LoadIncrementalHFilesJob(), args); + System.exit(ret); + } + + public static class BulkoadMapper extends Mapper { + LoadIncrementalHFiles loadIncrementalHFiles; + private final Deque queue = new LinkedList<>(); + + @Override protected void setup(Context context) throws IOException, InterruptedException { + try { + loadIncrementalHFiles = new LoadIncrementalHFiles(context.getConfiguration()); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate LoadIncrementalHFiles", e); + } + } + + @Override protected void map(Text key, Text value, Context context) + throws IOException, InterruptedException { + byte[] family = Bytes.toBytes(key.toString()); + Path hfilePath = new Path(value.toString()); + LoadIncrementalHFiles.LoadQueueItem loadQueueItem = + new LoadIncrementalHFiles.LoadQueueItem(family, hfilePath); + queue.add(loadQueueItem); + } + + @Override protected void cleanup(Context context) throws IOException, InterruptedException { + int queueSize = queue.size(); + String tableStr = context.getConfiguration().get(TABLE_NAME); + try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = connection.getTable(TableName.valueOf(tableStr)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableStr))) { + loadIncrementalHFiles.doBulkloadFromQueue(queue, table, regionLocator, connection); + } + context.getCounter("Bulkload", "Files Loaded").increment(queueSize); + } + } + + public static class BulkLoadInputFormat extends InputFormat { + + private static final Log LOG = LogFactory.getLog(BulkLoadInputFormat.class); + + @Override public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + Configuration configuration = jobContext.getConfiguration(); + + List result = new ArrayList<>(); + List bulkoadInputWritables = new ArrayList<>(); + + int depth = configuration.getInt(DEPTH, 2); + if (depth != 2 && depth != 3) { + throw new IllegalStateException("Invalid depth " + depth); + } + Set validFamilies = getValidFamiliesSet(configuration); + + try { + Path rootPath = new Path(configuration.get(ROOT_DIR)); + FileSystem fs = rootPath.getFileSystem(configuration); + + for (FileStatus child : fs.listStatus(rootPath)) { + if (!child.isDirectory()) { + LOG.info("Skipping non dir " + child.getPath()); + continue; + } + if (child.getPath().getName().startsWith("_")) { + LOG.info("Skipping " + child.getPath()); + continue; + } + if (depth == 2) { + bulkoadInputWritables + .addAll(createSplitsForFamilyDirectory(fs, child.getPath(), validFamilies)); + } else { + for (FileStatus family : fs.listStatus(child.getPath())) { + bulkoadInputWritables + .addAll(createSplitsForFamilyDirectory(fs, family.getPath(), validFamilies)); + } + } + } + + int maxMapTasks = configuration.getInt(MAX_MAP_TASK, 1); + + int filesPerMap = bulkoadInputWritables.size() / maxMapTasks; + for (List partition : Lists + .partition(bulkoadInputWritables, filesPerMap)) { + BulkoadInputWritableArray bulkoadInputWritableArray = new BulkoadInputWritableArray(); + bulkoadInputWritableArray + .set(partition.toArray(new BulkoadInputWritable[partition.size()])); + result.add(new BulkloadInputSplit(bulkoadInputWritableArray)); + } + } catch (Exception e) { + LOG.error("Error computing splits for bulkload M/R job", e); + throw new RuntimeException("Failed to compute splits", e); + } + + return result; + } + + private Set getValidFamiliesSet(Configuration configuration) throws IOException { + TableName tableName = TableName.valueOf(configuration.get(TABLE_NAME)); + try (Connection connection = ConnectionFactory.createConnection(configuration)) { + try (Table table = connection.getTable(tableName)) { + Set families = new HashSet<>(); + for (HColumnDescriptor columnDescriptor : table.getTableDescriptor() + .getColumnFamilies()) { + families.add(columnDescriptor.getNameAsString()); + } + return families; + } + } + } + + private List createSplitsForFamilyDirectory(FileSystem fs, + Path familyPath, Set validFamilies) throws IOException { + if (!validFamilies.contains(familyPath.getName())) { + throw new IOException("Unmatched family names found " + familyPath.getName()); + } + LOG.info("Listing family dir " + familyPath); + List bulkloadInputSplits = new ArrayList<>(); + Text family = new Text(familyPath.getName()); + for (FileStatus file : fs.listStatus(familyPath)) { + if (file.isDirectory()) { + LOG.warn("Skipping non-file " + file.getPath()); + continue; + } + if (StoreFileInfo.isReference(file.getPath()) || HFileLink.isHFileLink(file.getPath())) { + continue; + } + bulkloadInputSplits + .add(new BulkoadInputWritable(new Text(family), new Text(file.getPath().toString()))); + } + + return bulkloadInputSplits; + } + + @Override public RecordReader createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new RecordReader() { + + Iterator iterator; + BulkoadInputWritable current; + + @Override + + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + iterator = Iterators.forArray(((BulkloadInputSplit) inputSplit).bulkloadInputs.get()); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (!iterator.hasNext()) { + return false; + } + current = (BulkoadInputWritable) iterator.next(); + return true; + } + + @Override public Text getCurrentKey() throws IOException, InterruptedException { + return current.family; + } + + @Override public Text getCurrentValue() throws IOException, InterruptedException { + return current.hfile; + } + + @Override public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override public void close() throws IOException { + + } + }; + } + } + + public static class BulkoadInputWritable implements Writable { + private Text family = new Text(); + private Text hfile = new Text(); + + public BulkoadInputWritable() { + } + + public BulkoadInputWritable(Text family, Text hfile) { + this.family = family; + this.hfile = hfile; + } + + @Override public void readFields(DataInput dataInput) throws IOException { + family.readFields(dataInput); + hfile.readFields(dataInput); + } + + @Override public void write(DataOutput dataOutput) throws IOException { + family.write(dataOutput); + hfile.write(dataOutput); + } + } + + public static class BulkoadInputWritableArray extends ArrayWritable { + public BulkoadInputWritableArray() { + super(BulkoadInputWritable.class); + } + + public BulkoadInputWritable[] asArray() { + return (BulkoadInputWritable[]) get(); + } + } + + public static class BulkloadInputSplit extends InputSplit implements Writable { + + private BulkoadInputWritableArray bulkloadInputs = new BulkoadInputWritableArray(); + + public BulkloadInputSplit() { + } + + public BulkloadInputSplit(BulkoadInputWritableArray bulkloadInputs) { + this.bulkloadInputs = bulkloadInputs; + } + + @Override public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + @Override public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override public void write(DataOutput dataOutput) throws IOException { + bulkloadInputs.write(dataOutput); + } + + @Override public void readFields(DataInput dataInput) throws IOException { + bulkloadInputs.readFields(dataInput); + } + } }