diff --git a/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index 79a6d1f..e04ae97 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -191,6 +191,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row { * @param clusterId */ public void setClusterId(UUID clusterId) { + if (clusterId == null) return; byte[] val = new byte[2*Bytes.SIZEOF_LONG]; Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 03d3f8d..6518104 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -22,23 +22,30 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.Map; import java.util.TreeMap; +import java.util.UUID; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.zookeeper.KeeperException; /** * Import data written by {@link Export}. @@ -47,6 +54,7 @@ public class Import { final static String NAME = "import"; final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; + private static final Log LOG = LogFactory.getLog(Import.class); /** * A mapper that just writes out KeyValues. @@ -88,6 +96,7 @@ public class Import { static class Importer extends TableMapper { private Map cfRenameMap; + private UUID clusterId; /** * @param row The current table row key. @@ -128,16 +137,32 @@ public class Import { } } if (put != null) { + put.setClusterId(clusterId); context.write(key, put); } if (delete != null) { + delete.setClusterId(clusterId); context.write(key, delete); } } @Override public void setup(Context context) { - cfRenameMap = createCfRenameMap(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + cfRenameMap = createCfRenameMap(conf); + try { + HConnection connection = HConnectionManager.getConnection(conf); + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw); + clusterId = zkHelper.getUUIDForCluster(zkw); + } catch (ZooKeeperConnectionException e) { + LOG.error("Problem connecting to ZooKeper during task setup", e); + } catch (KeeperException e) { + LOG.error("Problem reading ZooKeeper data during task setup", e); + } catch (IOException e) { + LOG.error("Problem setting up task", e); + } + } }