diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java index 3cdb8a6..07b79c1 100644 --- a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java +++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationRegion.java @@ -89,7 +89,8 @@ public class ReplicationRegion extends HRegion { // complete log flush. if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) || Bytes.equals(key.getTablename(),META_TABLE_NAME)) && - !Bytes.equals(val.getFamily(), HLog.METAFAMILY)) { + !Bytes.equals(val.getFamily(), HLog.METAFAMILY) && + key.getScope() == SCOPE_GLOBAL) { this.replicationSource.enqueueLog(entry); } diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java index 5e3efe9..1f1bf7a 100644 --- a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java +++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/replication/ReplicationSource.java @@ -59,6 +59,7 @@ public class ReplicationSource extends Chore implements HConstants { private final float ratio; private final Random random; private final AtomicBoolean isReplicating; + private final byte clusterId; private List currentPeers; @@ -80,6 +81,7 @@ public class ReplicationSource extends Chore implements HConstants { this.ratio = this.conf.getFloat("replication.ratio", 0.1f); currentPeers = new ArrayList(); this.random = new Random(); + this.clusterId = zkHelper.getRepId(); this.isReplicating = isReplicating; } @@ -120,6 +122,7 @@ public class ReplicationSource extends Chore implements HConstants { */ public void enqueueLog(HLog.Entry logEntry) { if(this.isReplicating.get()) { + logEntry.getKey().setClusterId(this.clusterId); this.queue.add(logEntry); } } diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java index 32bcb63..539e59a 100644 --- a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java +++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/regionserver/wal/replication/ReplicationHLog.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -68,8 +69,10 @@ public class ReplicationHLog extends HLog { protected void doWrite(HRegionInfo info, HLogKey logKey, KeyValue logEdit, long now) throws IOException { + logKey.setScope(info.getTableDesc().getFamily(logEdit.getFamily()).getScope()); super.doWrite(info, logKey, logEdit, now); - if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) { + if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion()) && + logKey.getScope() == HConstants.SCOPE_GLOBAL) { this.replicationSource.enqueueLog(new Entry(logKey, logEdit)); } diff --git a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java index ab508d7..c7888db 100644 --- a/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java +++ b/src/contrib/mdc_replication/src/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperHelper.java @@ -48,7 +48,6 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher { private final String replicationZNode; private final String peersZNode; - private final String replicationStateNodeName; private final boolean isMaster; @@ -57,6 +56,8 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher { private final AtomicBoolean isReplicating; + private final byte clusterId; + /** * Constructor used by region servers * @param zookeeperWrapper zkw to wrap @@ -77,13 +78,15 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher { conf.get("zookeeper.znode.master", "master"); this.replicationStateNodeName = conf.get("zookeeper.znode.state", "state"); - + String clusterIdName = + conf.get("zookeeper.znode.clusterId", "clusterId"); this.peerClusters = new ArrayList(); this.replicationZNode = zookeeperWrapper.getZNode( zookeeperWrapper.getParentZNode(),replicationZNodeName); this.peersZNode = zookeeperWrapper.getZNode(replicationZNode,peersZNodeName); + List znodes = this.zookeeperWrapper.listZnodes(this.peersZNode, this); @@ -95,6 +98,11 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher { String address = this.zookeeperWrapper.getData(this.replicationZNode, repMasterZNodeName); + String idResult = this.zookeeperWrapper.getData(this.replicationZNode, + clusterIdName); + this.clusterId = + idResult == null ? DEFAULT_CLUSTER_ID : Byte.valueOf(idResult); + String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+ this.conf.get("hbase.zookeeper.property.clientPort") +":" + this.conf.get(ZOOKEEPER_ZNODE_PARENT); @@ -139,12 +147,20 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher { /** * Tells if this cluster replicates or not - * @return + * @return if this is a master */ public boolean isMaster() { return isMaster; } + /** + * Get the identification of the cluster + * @return the id for the cluster + */ + public byte getRepId() { + return this.clusterId; + } + @Override public void process(WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); diff --git a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java index affbf7a..e804bbd 100644 --- a/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/contrib/mdc_replication/src/test/org/apache/hadoop/hbase/replication/TestReplication.java @@ -53,7 +53,7 @@ public class TestReplication implements HConstants{ private final int NB_ROWS_IN_BATCH = 100; private final long SLEEP_TIME = 500; - private final int NB_RETRIES = 10; + private final int NB_RETRIES = 5; /** @@ -133,10 +133,14 @@ public class TestReplication implements HConstants{ byte[] tableName = Bytes.toBytes("test"); byte[] famName = Bytes.toBytes("f"); + byte[] noRepfamName = Bytes.toBytes("norep"); byte[] row = Bytes.toBytes("row"); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); HBaseAdmin admin1 = new HBaseAdmin(conf1); @@ -260,6 +264,25 @@ public class TestReplication implements HConstants{ } } + put = new Put(Bytes.toBytes("do not rep")); + put.add(noRepfamName, row, row); + table1.put(put); + + get = new Get(Bytes.toBytes("do not rep")); + for(int i = 0; i < NB_RETRIES; i++) { + if(i==NB_RETRIES-1) { + break; + } + Result res = table2.get(get); + if(res.size() >= 1) { + fail("Not supposed to be replicated"); + + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + } private void setIsReplication(String bool) throws Exception{ diff --git a/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java index aed7f73..a538978 100644 --- a/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -77,6 +77,7 @@ public class HColumnDescriptor implements WritableComparable public static final String FOREVER = "FOREVER"; public static final String MAPFILE_INDEX_INTERVAL = "MAPFILE_INDEX_INTERVAL"; + public static final String SCOPE = "SCOPE"; /** * Default compression type. @@ -121,6 +122,11 @@ public class HColumnDescriptor implements WritableComparable */ public static final int DEFAULT_TTL = HConstants.FOREVER; + /** + * Default scope. + */ + public static final int DEFAULT_SCOPE = HConstants.SCOPE_LOCAL; + // Column family name private byte [] name; @@ -203,7 +209,7 @@ public class HColumnDescriptor implements WritableComparable final boolean blockCacheEnabled, final int timeToLive, final boolean bloomFilter) { this(familyName, maxVersions, compression, inMemory, blockCacheEnabled, - DEFAULT_BLOCKSIZE, timeToLive, bloomFilter); + DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_SCOPE); } /** @@ -219,6 +225,7 @@ public class HColumnDescriptor implements WritableComparable * @param timeToLive Time-to-live of cell contents, in seconds * (use HConstants.FOREVER for unlimited TTL) * @param bloomFilter Enable the specified bloom filter for this column + * @param scope The scope tag for this column * * @throws IllegalArgumentException if passed a family name that is made of * other than 'word' characters: i.e. [a-zA-Z_0-9] or contains @@ -228,7 +235,7 @@ public class HColumnDescriptor implements WritableComparable public HColumnDescriptor(final byte [] familyName, final int maxVersions, final String compression, final boolean inMemory, final boolean blockCacheEnabled, final int blocksize, - final int timeToLive, final boolean bloomFilter) { + final int timeToLive, final boolean bloomFilter, final int scope) { isLegalFamilyName(familyName); this.name = familyName; @@ -245,6 +252,7 @@ public class HColumnDescriptor implements WritableComparable valueOf(compression.toUpperCase())); setBloomfilter(bloomFilter); setBlocksize(blocksize); + setScope(scope); } /** @@ -482,6 +490,24 @@ public class HColumnDescriptor implements WritableComparable setValue(MAPFILE_INDEX_INTERVAL, Integer.toString(interval)); } + /** + * @return the scope tag + */ + public int getScope() { + String value = getValue(SCOPE); + if (value != null) { + return Integer.valueOf(value).intValue(); + } + return DEFAULT_SCOPE; + } + + /** + * @param scope the scope tag + */ + public void setScope(int scope) { + setValue(SCOPE, Integer.toString(scope)); + } + /** * @see java.lang.Object#toString() */ diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 7d957ae..7ddcc47 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -289,6 +289,24 @@ public interface HConstants { TABLE_SET_HTD, TABLE_SPLIT } + + /** + * Scope tag for locally scoped data. + * This data will not be replicated. + */ + public static final int SCOPE_LOCAL = 0; + + /** + * Scope tag for globally scoped data. + * This data will be replicated to all peers. + */ + public static final int SCOPE_GLOBAL = 1; + + /** + * Default cluster ID, cannot be used to identify a cluster so a key with + * this value means it wasn't meant for replication. + */ + public static final byte DEFAULT_CLUSTER_ID = 0; /** * Parameter name for maximum number of bytes returned when calling a diff --git a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java index 8be80a9..0fcd7f6 100644 --- a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -663,7 +663,7 @@ public class HTableDescriptor implements WritableComparable { new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false) }); + HConstants.FOREVER, false, HConstants.SCOPE_LOCAL) }); /** Table descriptor for .META. catalog table */ public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor( @@ -671,9 +671,9 @@ public class HTableDescriptor implements WritableComparable { new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false), + HConstants.FOREVER, false, HConstants.SCOPE_LOCAL), new HColumnDescriptor(HConstants.CATALOG_HISTORIAN_FAMILY, HConstants.ALL_VERSIONS, Compression.Algorithm.NONE.getName(), false, false, 8 * 1024, - HConstants.WEEK_IN_SECONDS, false)}); + HConstants.WEEK_IN_SECONDS, false, HConstants.SCOPE_LOCAL)}); } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index be396cb..a0d7d98 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -43,8 +43,11 @@ public class HLogKey implements WritableComparable, HeapSize { private long logSeqNum; // Time at which this edit was written. private long writeTime; + + private byte clusterId; + private int scope; private int HEAP_TAX = ClassSize.OBJECT + (2 * ClassSize.ARRAY) + - (2 * Bytes.SIZEOF_LONG); + (2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; /** Writable Consructor -- Do not use. */ public HLogKey() { @@ -67,6 +70,8 @@ public class HLogKey implements WritableComparable, HeapSize { this.tablename = tablename; this.logSeqNum = logSeqNum; this.writeTime = now; + this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + this.scope = HConstants.SCOPE_LOCAL; } ////////////////////////////////////////////////////////////////////////////// @@ -99,6 +104,38 @@ public class HLogKey implements WritableComparable, HeapSize { return this.writeTime; } + /** + * Get the id of the original cluster + * @return + */ + public byte getClusterId() { + return clusterId; + } + + /** + * Set the cluster id of this key + * @param clusterId + */ + public void setClusterId(byte clusterId) { + this.clusterId = clusterId; + } + + /** + * Get the replication scope of this key + * @return replication scope + */ + public int getScope() { + return this.scope; + } + + /** + * Set the replication scope of this key + * @param scope The new scope + */ + public void setScope(int scope) { + this.scope = scope; + } + @Override public String toString() { return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" + @@ -121,6 +158,8 @@ public class HLogKey implements WritableComparable, HeapSize { int result = Bytes.hashCode(this.regionName); result ^= this.logSeqNum; result ^= this.writeTime; + result ^= this.clusterId; + result ^= this.scope; return result; } @@ -146,8 +185,10 @@ public class HLogKey implements WritableComparable, HeapSize { public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.regionName); Bytes.writeByteArray(out, this.tablename); - out.writeLong(logSeqNum); + out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); + out.writeByte(this.clusterId); + out.writeInt(this.scope); } public void readFields(DataInput in) throws IOException { @@ -155,6 +196,12 @@ public class HLogKey implements WritableComparable, HeapSize { this.tablename = Bytes.readByteArray(in); this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); + try { + this.clusterId = in.readByte(); + this.scope = in.readInt(); + } catch(EOFException e) { + // Means it's an old key, just continue + } } public long heapSize() { diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index ad509cb..3cb1c54 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -194,13 +194,14 @@ public abstract class HBaseTestCase extends TestCase { HTableDescriptor htd = new HTableDescriptor(name); htd.addFamily(new HColumnDescriptor(fam1, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam2, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam3, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, false)); + Integer.MAX_VALUE, HConstants.FOREVER, + false, HConstants.SCOPE_LOCAL)); return htd; } diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java index 6f4e449..eac5957 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -286,7 +286,8 @@ public class HBaseTestingUtility { HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, - Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false); + Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, + false, HColumnDescriptor.DEFAULT_SCOPE); desc.addFamily(hcd); } (new HBaseAdmin(getConfiguration())).createTable(desc); @@ -311,7 +312,8 @@ public class HBaseTestingUtility { HColumnDescriptor.DEFAULT_COMPRESSION, HColumnDescriptor.DEFAULT_IN_MEMORY, HColumnDescriptor.DEFAULT_BLOCKCACHE, - Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false); + Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, + false, HColumnDescriptor.DEFAULT_SCOPE); desc.addFamily(hcd); i++; } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java index 55f8fff..92be3a7 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -67,7 +67,7 @@ public class TestScanner extends HBaseTestCase { TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false)); + HConstants.FOREVER, false, HConstants.SCOPE_LOCAL)); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO = diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java index d2fdded..6ddfbf0 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java @@ -32,7 +32,7 @@ public class TestWideScanner extends HBaseTestCase { TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY, 10, // Ten is arbitrary number. Keep versions to help debuggging. Compression.Algorithm.NONE.getName(), false, true, 8 * 1024, - HConstants.FOREVER, false)); + HConstants.FOREVER, false, HColumnDescriptor.DEFAULT_SCOPE)); } /** HRegionInfo for root region */ public static final HRegionInfo REGION_INFO =