diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 3ed2061..0704b13 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -30,9 +34,10 @@ import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -52,10 +57,16 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.zookeeper.KeeperException; @@ -74,7 +85,138 @@ public class Import { public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; public final static String TABLE_NAME = "import.table.name"; public final static String WAL_DURABILITY = "import.wal.durability"; + + public static class KeyValueWritableComparablePartitioner extends Partitioner { + public static KeyValueWritableComparable[] START_KEYS = null; + @Override + public int getPartition(KeyValueWritableComparable key, KeyValue value, + int numPartitions) { + for (int i = 0; i < START_KEYS.length; ++i) { + if (key.compareTo(START_KEYS[i]) <= 0) { + return i; + } + } + return START_KEYS.length; + } + + } + + public static class KeyValueWritableComparable implements WritableComparable { + + private KeyValue kv = null; + + static { // register this comparator + WritableComparator.define(KeyValueWritableComparable.class, new KeyValueWritableComparator()); + } + + public KeyValueWritableComparable() { + } + + public KeyValueWritableComparable(KeyValue kv) { + this.kv = kv; + } + + @Override + public void write(DataOutput out) throws IOException { + KeyValue.write(kv, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + kv = KeyValue.create(in); + } + + @Override + public int compareTo(KeyValueWritableComparable o) { + return KeyValue.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); + } + + public static class KeyValueWritableComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); + kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); + kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + return compare(kv1, kv2); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + } + + } + + public static class KeyValueReducer extends Reducer { + protected void reduce(KeyValueWritableComparable row, java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + int index = 0; + for (KeyValue kv: kvs) { + context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } + } + + + /** + * A mapper that just writes out KeyValues. + */ + public static class KeyValueSortImporter extends TableMapper { + private Map cfRenameMap; + private Filter filter; + private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Considering the row." + + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null || !filter.filterRowKey(row.get(), row.getOffset(), row.getLength())) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); + context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Override + public void setup(Context context) throws IOException { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + HTable table = new HTable(context.getConfiguration(), context.getConfiguration().get(TABLE_NAME)); + byte[][] startKeys = table.getStartKeys(); + KeyValueWritableComparable[] startKeyWraps = new KeyValueWritableComparable[startKeys.length - 1]; + for (int i = 1; i < startKeys.length; ++i) { + startKeyWraps[i - 1] = new KeyValueWritableComparable(KeyValue.createFirstOnRow(startKeys[i])); + } + KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + table.close(); + } + } + /** * A mapper that just writes out KeyValues. */ @@ -167,22 +309,7 @@ public class Import { kv = convertKv(kv, cfRenameMap); // Deletes and Puts are gathered and written when finished - /* - * If there are sequence of mutations and tombstones in an Export, and after Import the same - * sequence should be restored as it is. If we combine all Delete tombstones into single - * request then there is chance of ignoring few DeleteFamily tombstones, because if we - * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining - * only newest in hbase table and ignoring other. Check - HBASE-12065 - */ - if (CellUtil.isDeleteFamily(kv)) { - Delete deleteFamily = new Delete(key.get()); - deleteFamily.addDeleteMarker(kv); - if (durability != null) { - deleteFamily.setDurability(durability); - } - deleteFamily.setClusterIds(clusterIds); - context.write(key, deleteFamily); - } else if (CellUtil.isDelete(kv)) { + if (CellUtil.isDelete(kv)) { if (delete == null) { delete = new Delete(key.get()); } @@ -225,26 +352,18 @@ public class Import { } // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. ZooKeeperWatcher zkw = null; - Exception ex = null; try { zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); } catch (ZooKeeperConnectionException e) { - ex = e; LOG.error("Problem connecting to ZooKeper during task setup", e); } catch (KeeperException e) { - ex = e; LOG.error("Problem reading ZooKeeper data during task setup", e); } catch (IOException e) { - ex = e; LOG.error("Problem setting up task", e); } finally { if (zkw != null) zkw.close(); } - if (clusterIds == null) { - // exit early if setup fails - throw new RuntimeException(ex); - } } } @@ -417,6 +536,7 @@ public class Import { * @return The newly created job. * @throws IOException When setting up the job fails. */ + @SuppressWarnings("deprecation") public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; @@ -438,7 +558,23 @@ public class Import { throw new IOException(e); } - if (hfileOutPath != null) { + if (hfileOutPath != null && conf.getBoolean("import.bulk.isLargeResult", false)) { + LOG.info("Use Large Result!!"); + HTable table = new HTable(conf, tableName); + HFileOutputFormat.configureIncrementalLoad(job, table); + job.setMapperClass(KeyValueSortImporter.class); + job.setReducerClass(KeyValueReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(KeyValueWritableComparable.class); + job.setMapOutputValueClass(KeyValue.class); + job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", + KeyValueWritableComparable.KeyValueWritableComparator.class, + RawComparator.class); + reConfigureIncrementalLoad(job, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); HTable table = new HTable(conf, tableName); job.setReducerClass(KeyValueSortReducer.class); @@ -459,7 +595,16 @@ public class Import { return job; } - /* + private static void reConfigureIncrementalLoad(Job job, HTable table) throws IOException { + // create the partitions file + Path partitionsPath = new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); + FileSystem fs = FileSystem.get(job.getConfiguration()); + fs.deleteOnExit(partitionsPath); + job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); + job.setNumReduceTasks(table.getStartKeys().length); + } + + /* * @param errorMsg Error message. Can be null. */ private static void usage(final String errorMsg) { -- 2.3.2 (Apple Git-55)