From bad9c432064e1452dde12a55858a2c41ca906c3c Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Tue, 7 Oct 2014 19:34:31 +0000 Subject: [PATCH] HBASE-11997 CopyTable with bulkload (Yi Deng) --- .../apache/hadoop/hbase/mapreduce/CopyTable.java | 141 ++++++++++++++++----- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 21 +++ .../hadoop/hbase/mapreduce/TableInputFormat.java | 35 ++++- .../hbase/mapreduce/TableInputFormatBase.java | 12 +- .../hadoop/hbase/mapreduce/TestCopyTable.java | 65 +++++----- 5 files changed, 203 insertions(+), 71 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 77580f6..887aa78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -18,23 +18,30 @@ */ package org.apache.hadoop.hbase.mapreduce; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - /** * Tool used to copy a table to another one which can be on a different setup. * It is also configurable with a start and time as well as a specification @@ -43,36 +50,42 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Stable public class CopyTable extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(CopyTable.class); final static String NAME = "copytable"; - static long startTime = 0; - static long endTime = 0; - static int versions = -1; - static String tableName = null; - static String startRow = null; - static String stopRow = null; - static String newTableName = null; - static String peerAddress = null; - static String families = null; - static boolean allCells = false; + long startTime = 0; + long endTime = 0; + int versions = -1; + String tableName = null; + String startRow = null; + String stopRow = null; + String dstTableName = null; + String peerAddress = null; + String families = null; + boolean allCells = false; + boolean bulkload = false; + Path bulkloadDir = null; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public CopyTable(Configuration conf) { super(conf); } /** * Sets up the actual job. * - * @param conf The current configuration. * @param args The command line parameters. * @return The newly created job. * @throws IOException When setting up the job fails. */ - public static Job createSubmittableJob(Configuration conf, String[] args) + public Job createSubmittableJob(String[] args) throws IOException { if (!doCommandLine(args)) { return null; } - Job job = new Job(conf, NAME + "_" + tableName); + + Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(CopyTable.class); Scan scan = new Scan(); scan.setCacheBlocks(false); @@ -114,12 +127,42 @@ public class CopyTable extends Configured implements Tool { } Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); } - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Import.Importer.class, null, null, job); - TableMapReduceUtil.initTableReducerJob( - newTableName == null ? tableName : newTableName, null, job, - null, peerAddress, null, null); job.setNumReduceTasks(0); + + if (bulkload) { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, + null, job); + + // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. + TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); + + FileSystem fs = FileSystem.get(getConf()); + Random rand = new Random(); + Path root = new Path(fs.getWorkingDirectory(), "copytable"); + fs.mkdirs(root); + while (true) { + bulkloadDir = new Path(root, "" + rand.nextLong()); + if (!fs.exists(bulkloadDir)) { + break; + } + } + + System.out.println("HFiles will be stored at " + this.bulkloadDir); + HFileOutputFormat2.setOutputPath(job, bulkloadDir); + HTable htable = new HTable(getConf(), TableName.valueOf(dstTableName)); + try { + HFileOutputFormat2.configureIncrementalLoadMap(job, htable); + } finally { + htable.close(); + } + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Import.Importer.class, null, null, job); + + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, + null); + } + return job; } @@ -150,6 +193,8 @@ public class CopyTable extends Configured implements Tool { System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); System.err.println(" To keep the same name, just give \"cfName\""); System.err.println(" all.cells also copy delete markers and deleted cells"); + System.err.println(" bulkload Write input into HFiles and bulk load to the destination " + + "table"); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to copy"); @@ -164,7 +209,7 @@ public class CopyTable extends Configured implements Tool { + "-Dmapred.map.tasks.speculative.execution=false"); } - private static boolean doCommandLine(final String[] args) { + private boolean doCommandLine(final String[] args) { // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). if (args.length < 1) { @@ -211,7 +256,7 @@ public class CopyTable extends Configured implements Tool { final String newNameArgKey = "--new.name="; if (cmd.startsWith(newNameArgKey)) { - newTableName = cmd.substring(newNameArgKey.length()); + dstTableName = cmd.substring(newNameArgKey.length()); continue; } @@ -231,6 +276,11 @@ public class CopyTable extends Configured implements Tool { allCells = true; continue; } + + if (cmd.startsWith("--bulkload")) { + bulkload = true; + continue; + } if (i == args.length-1) { tableName = cmd; @@ -239,7 +289,7 @@ public class CopyTable extends Configured implements Tool { return false; } } - if (newTableName == null && peerAddress == null) { + if (dstTableName == null && peerAddress == null) { printUsage("At least a new table name or a " + "peer address must be specified"); return false; @@ -248,6 +298,16 @@ public class CopyTable extends Configured implements Tool { printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); return false; } + + if (bulkload && peerAddress != null) { + printUsage("Remote bulkload is not supported!"); + return false; + } + + // set dstTableName if necessary + if (dstTableName == null) { + dstTableName = tableName; + } } catch (Exception e) { e.printStackTrace(); printUsage("Can't start because " + e.getMessage()); @@ -270,8 +330,29 @@ public class CopyTable extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); - Job job = createSubmittableJob(getConf(), otherArgs); + Job job = createSubmittableJob(otherArgs); if (job == null) return 1; - return job.waitForCompletion(true) ? 0 : 1; + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + if (bulkload) { + LOG.info("Files are not bulkloaded!"); + } + return 1; + } + int code = 0; + if (bulkload) { + code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(), + this.dstTableName}); + if (code == 0) { + // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun + // LoadIncrementalHFiles. + FileSystem fs = FileSystem.get(this.getConf()); + if (!fs.delete(this.bulkloadDir, true)) { + LOG.error("Deleting folder " + bulkloadDir + " failed!"); + code = 1; + } + } + } + return code; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 73ba37a..2e104b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -105,6 +105,7 @@ public class HFileOutputFormat2 public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + @Override public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { return createRecordWriter(context); @@ -153,6 +154,7 @@ public class HFileOutputFormat2 private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); private boolean rollRequested = false; + @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); @@ -263,6 +265,7 @@ public class HFileOutputFormat2 } } + @Override public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (WriterLength wl: this.writers.values()) { @@ -400,6 +403,24 @@ public class HFileOutputFormat2 LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); } + + public static void configureIncrementalLoadMap(Job job, HTable table) throws IOException { + Configuration conf = job.getConfiguration(); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + + // Set compression algorithms based on column families + configureCompression(table, conf); + configureBloomType(table, conf); + configureBlockSize(table, conf); + configureDataBlockEncoding(table, conf); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental table " + table.getName() + " output configured."); + } /** * Runs inside the task to deserialize column family to compression algorithm diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 83b2a15..96291c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -22,14 +22,17 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; /** @@ -44,6 +47,11 @@ implements Configurable { /** Job parameter that specifies the input table. */ public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; + /** + * If specified, use start keys of this table to split. + * This is useful when you are preparing data for bulkload. + */ + private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. */ @@ -102,7 +110,7 @@ implements Configurable { } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); } - + Scan scan = null; if (conf.get(SCAN) != null) { @@ -213,4 +221,25 @@ implements Configurable { } } + @Override + protected Pair getStartEndKeys() throws IOException { + if (conf.get(SPLIT_TABLE) != null) { + TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); + HTable rl = new HTable(getConf(), splitTableName); + try { + return rl.getStartEndKeys(); + } finally { + rl.close(); + } + } + + return super.getStartEndKeys(); + } + + /** + * Sets split table in map-reduce job. + */ + public static void configureSplitTable(Job job, TableName tableName) { + job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 1fadced..268d4de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.io.InterruptedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.text.MessageFormat; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; @@ -94,8 +92,7 @@ extends InputFormat { private HTable table = null; /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; - - + /** The reverse DNS lookup cache mapping: IPAddress => HostName */ private HashMap reverseDNSCacheMap = new HashMap(); @@ -138,6 +135,10 @@ extends InputFormat { trr.setHTable(table); return trr; } + + protected Pair getStartEndKeys() throws IOException { + return table.getStartEndKeys(); + } /** * Calculates the splits that will serve as input for the map tasks. The @@ -160,7 +161,8 @@ extends InputFormat { RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); - Pair keys = table.getStartEndKeys(); + + Pair keys = getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { HRegionLocation regLoc = table.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index 6163bb9..065e2b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -51,7 +51,6 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestCopyTable { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static MiniHBaseCluster cluster; private static final byte[] ROW1 = Bytes.toBytes("row1"); private static final byte[] ROW2 = Bytes.toBytes("row2"); private static final String FAMILY_A_STRING = "a"; @@ -63,7 +62,7 @@ public class TestCopyTable { @BeforeClass public static void beforeClass() throws Exception { - cluster = TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniMapReduceCluster(); } @@ -73,14 +72,9 @@ public class TestCopyTable { TEST_UTIL.shutdownMiniCluster(); } - /** - * Simple end-to-end test - * @throws Exception - */ - @Test - public void testCopyTable() throws Exception { - final byte[] TABLENAME1 = Bytes.toBytes("testCopyTable1"); - final byte[] TABLENAME2 = Bytes.toBytes("testCopyTable2"); + private void doCopyTableTest(boolean bulkload) throws Exception { + final TableName TABLENAME1 = TableName.valueOf("testCopyTable1"); + final TableName TABLENAME2 = TableName.valueOf("testCopyTable2"); final byte[] FAMILY = Bytes.toBytes("family"); final byte[] COLUMN1 = Bytes.toBytes("c1"); @@ -96,10 +90,15 @@ public class TestCopyTable { CopyTable copy = new CopyTable(TEST_UTIL.getConfiguration()); - assertEquals( - 0, - copy.run(new String[] { "--new.name=" + Bytes.toString(TABLENAME2), - Bytes.toString(TABLENAME1) })); + int code; + if (bulkload) { + code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), + "--bulkload", TABLENAME1.getNameAsString() }); + } else { + code = copy.run(new String[] { "--new.name=" + TABLENAME2.getNameAsString(), + TABLENAME1.getNameAsString() }); + } + assertEquals("copy job failed", 0, code); // verify the data was copied into table 2 for (int i = 0; i < 10; i++) { @@ -115,6 +114,23 @@ public class TestCopyTable { TEST_UTIL.deleteTable(TABLENAME2); } + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testCopyTable() throws Exception { + doCopyTableTest(false); + } + + /** + * Simple end-to-end test with bulkload. + */ + @Test + public void testCopyTableWithBulkload() throws Exception { + doCopyTableTest(true); + } + @Test public void testStartStopRow() throws Exception { final byte[] TABLENAME1 = Bytes.toBytes("testStartStopRow1"); @@ -194,7 +210,6 @@ public class TestCopyTable { "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), "--versions=1", sourceTable }; assertNull(t2.get(new Get(ROW1)).getRow()); - clean(); assertTrue(runCopy(args)); @@ -243,24 +258,8 @@ public class TestCopyTable { new Configuration(TEST_UTIL.getConfiguration()), args); Configuration configuration = opts.getConfiguration(); args = opts.getRemainingArgs(); - clean(); - Job job = CopyTable.createSubmittableJob(configuration, args); + Job job = new CopyTable(configuration).createSubmittableJob(args); job.waitForCompletion(false); return job.isSuccessful(); } - - - private void clean() { - - CopyTable.startTime = 0; - CopyTable.endTime = 0; - CopyTable.versions = -1; - CopyTable.tableName = null; - CopyTable.startRow = null; - CopyTable.stopRow = null; - CopyTable.newTableName = null; - CopyTable.peerAddress = null; - CopyTable.families = null; - CopyTable.allCells = false; - } } -- 2.0.1