Index: src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (revision 1518378) +++ src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (working copy) @@ -23,8 +23,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,15 +57,11 @@ private static final Log LOG = LogFactory.getLog(TestReplicationBase.class); - private Configuration conf1; - private Configuration conf2; - private Configuration conf3; + private Configuration baseConfiguration; - private HBaseTestingUtility utility1; - private HBaseTestingUtility utility2; - private HBaseTestingUtility utility3; - - private MiniZooKeeperCluster miniZK; + private HBaseTestingUtility[] utilities; + private Configuration[] configurations; + private MiniZooKeeperCluster miniZK; private static final long SLEEP_TIME = 500; private static final int NB_RETRIES = 10; @@ -85,44 +83,21 @@ @Before public void setUp() throws Exception { - conf1 = HBaseConfiguration.create(); - conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + baseConfiguration = HBaseConfiguration.create(); // smaller block size and capacity to trigger more operations // and test them - conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); - conf1.setInt("replication.source.size.capacity", 1024); - conf1.setLong("replication.source.sleepforretries", 100); - conf1.setInt("hbase.regionserver.maxlogs", 10); - conf1.setLong("hbase.master.logcleaner.ttl", 10); - conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); - conf1.setBoolean("dfs.support.append", true); - conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); + baseConfiguration.setInt("replication.source.size.capacity", 1024); + baseConfiguration.setLong("replication.source.sleepforretries", 100); + baseConfiguration.setInt("hbase.regionserver.maxlogs", 10); + baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10); + baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + baseConfiguration.setBoolean("dfs.support.append", true); + baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + baseConfiguration.setStrings( + CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, CoprocessorCounter.class.getName()); - utility1 = new HBaseTestingUtility(conf1); - utility1.startMiniZKCluster(); - miniZK = utility1.getZkCluster(); - // By setting the mini ZK cluster through this method, even though this is - // already utility1's mini ZK cluster, we are telling utility1 not to shut - // the mini ZK cluster when we shut down the HBase cluster. - utility1.setZkCluster(miniZK); - new ZooKeeperWatcher(conf1, "cluster1", null, true); - - conf2 = new Configuration(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - - utility2 = new HBaseTestingUtility(conf2); - utility2.setZkCluster(miniZK); - new ZooKeeperWatcher(conf2, "cluster2", null, true); - - conf3 = new Configuration(conf1); - conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); - - utility3 = new HBaseTestingUtility(conf3); - utility3.setZkCluster(miniZK); - new ZooKeeperWatcher(conf3, "cluster3", null, true); - table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); @@ -131,181 +106,300 @@ table.addFamily(fam); } - @After - public void tearDown() throws IOException { - miniZK.shutdown(); + /** + * It tests the replication scenario involving 0 -> 1 -> 0. It does it by + * adding and deleting a row to a table in each cluster, checking if it's + * replicated. It also tests that the puts and deletes are not replicated back + * to the originating cluster. + */ + @Test(timeout = 300000) + public void testCyclicReplication1() throws Exception { + LOG.info("testSimplePutDelete"); + int numClusters = 2; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); + + htables = getHTablesOnClusters(tableName); + + // Test the replication scenarios of 0 -> 1 -> 0 + addPeer("1", 0, 1); + addPeer("1", 1, 0); + + int[] expectedCounts = new int[] { 2, 2 }; + + // add rows to both clusters, + // make sure they are both replication + putAndWait(row, famName, htables[0], htables[1]); + putAndWait(row1, famName, htables[1], htables[0]); + validateCounts(htables, put, expectedCounts); + + deleteAndWait(row, htables[0], htables[1]); + deleteAndWait(row1, htables[1], htables[0]); + validateCounts(htables, delete, expectedCounts); + } finally { + close(htables); + shutDownMiniClusters(); + } } - @Test(timeout=300000) - public void testCyclicReplication() throws Exception { - LOG.info("testCyclicReplication"); - utility1.startMiniCluster(); - utility2.startMiniCluster(); - utility3.startMiniCluster(); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); - ReplicationAdmin admin3 = new ReplicationAdmin(conf3); + /** + * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and + * deleting rows to a table in each clusters and ensuring that the each of + * these clusters get the appropriate mutations. It also tests the grouping + * scenario where a cluster needs to replicate the edits originating from + * itself and also the edits that it received using replication from a + * different cluster. The scenario is explained in HBASE-9158 + */ + @Test(timeout = 300000) + public void testCyclicReplication2() throws Exception { + LOG.info("testCyclicReplication1"); + int numClusters = 3; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); - new HBaseAdmin(conf1).createTable(table); - new HBaseAdmin(conf2).createTable(table); - new HBaseAdmin(conf3).createTable(table); - HTable htable1 = new HTable(conf1, tableName); - htable1.setWriteBufferSize(1024); - HTable htable2 = new HTable(conf2, tableName); - htable2.setWriteBufferSize(1024); - HTable htable3 = new HTable(conf3, tableName); - htable3.setWriteBufferSize(1024); - - admin1.addPeer("1", utility2.getClusterKey()); - admin2.addPeer("1", utility3.getClusterKey()); - admin3.addPeer("1", utility1.getClusterKey()); + // Test the replication scenario of 0 -> 1 -> 2 -> 0 + addPeer("1", 0, 1); + addPeer("1", 1, 2); + addPeer("1", 2, 0); - // put "row" and wait 'til it got around - putAndWait(row, famName, htable1, htable3); - // it should have passed through table2 - check(row,famName,htable2); + htables = getHTablesOnClusters(tableName); - putAndWait(row1, famName, htable2, htable1); - check(row,famName,htable3); - putAndWait(row2, famName, htable3, htable2); - check(row,famName,htable1); - - deleteAndWait(row,htable1,htable3); - deleteAndWait(row1,htable2,htable1); - deleteAndWait(row2,htable3,htable2); + // put "row" and wait 'til it got around + putAndWait(row, famName, htables[0], htables[2]); + putAndWait(row1, famName, htables[1], htables[0]); + putAndWait(row2, famName, htables[2], htables[1]); - assertEquals("Puts were replicated back ", 3, getCount(htable1, put)); - assertEquals("Puts were replicated back ", 3, getCount(htable2, put)); - assertEquals("Puts were replicated back ", 3, getCount(htable3, put)); - assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete)); - assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete)); - assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete)); + deleteAndWait(row, htables[0], htables[2]); + deleteAndWait(row1, htables[1], htables[0]); + deleteAndWait(row2, htables[2], htables[1]); - // Test HBASE-9158 - admin2.disablePeer("1"); - // we now have an edit that was replicated into cluster originating from cluster 1 - putAndWait(row3, famName, htable1, htable2); - // now add a local edit to cluster 2 - Put put = new Put(row4); - put.add(famName, row4, row4); - htable2.put(put); - // reenable replication from cluster 2 to cluster 3 - admin2.enablePeer("1"); - // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id - // and hence not replicated to cluster 1 - wait(row4, htable1); - - utility3.shutdownMiniCluster(); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + int[] expectedCounts = new int[] { 3, 3, 3 }; + validateCounts(htables, put, expectedCounts); + validateCounts(htables, delete, expectedCounts); + + // Test HBASE-9158 + disablePeer("1", 2); + // we now have an edit that was replicated into cluster originating from + // cluster 0 + putAndWait(row3, famName, htables[0], htables[1]); + // now add a local edit to cluster 1 + htables[1].put(new Put(row4).add(famName, row4, row4)); + // re-enable replication from cluster 2 to cluster 0 + enablePeer("1", 2); + // without HBASE-9158 the edit for row4 would have been marked with + // cluster 0's id + // and hence not replicated to cluster 0 + wait(row4, htables[0], true); + } finally { + close(htables); + shutDownMiniClusters(); + } } /** - * Add a row to a table in each cluster, check it's replicated, - * delete it, check's gone - * Also check the puts and deletes are not replicated back to - * the originating cluster. + * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1. */ - @Test(timeout=300000) - public void testSimplePutDelete() throws Exception { - LOG.info("testSimplePutDelete"); - utility1.startMiniCluster(); - utility2.startMiniCluster(); + @Test(timeout = 300000) + public void testCyclicReplication3() throws Exception { + LOG.info("testCyclicReplication2"); + int numClusters = 3; + HTable[] htables = null; + try { + startMiniClusters(numClusters); + createTableOnClusters(table); - ReplicationAdmin admin1 = new ReplicationAdmin(conf1); - ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + // Test the replication scenario of 0 -> 1 -> 2 -> 1 + addPeer("1", 0, 1); + addPeer("1", 1, 2); + addPeer("1", 2, 1); - new HBaseAdmin(conf1).createTable(table); - new HBaseAdmin(conf2).createTable(table); - HTable htable1 = new HTable(conf1, tableName); - htable1.setWriteBufferSize(1024); - HTable htable2 = new HTable(conf2, tableName); - htable2.setWriteBufferSize(1024); + htables = getHTablesOnClusters(tableName); - // set M-M - admin1.addPeer("1", utility2.getClusterKey()); - admin2.addPeer("1", utility1.getClusterKey()); + // put "row" and wait 'til it got around + putAndWait(row, famName, htables[0], htables[2]); + putAndWait(row1, famName, htables[1], htables[2]); + putAndWait(row2, famName, htables[2], htables[1]); - // add rows to both clusters, - // make sure they are both replication - putAndWait(row, famName, htable1, htable2); - putAndWait(row1, famName, htable2, htable1); + deleteAndWait(row, htables[0], htables[2]); + deleteAndWait(row1, htables[1], htables[2]); + deleteAndWait(row2, htables[2], htables[1]); - // make sure "row" did not get replicated back. - assertEquals("Puts were replicated back ", 2, getCount(htable1, put)); + int[] expectedCounts = new int[] { 1, 3, 3 }; + validateCounts(htables, put, expectedCounts); + validateCounts(htables, delete, expectedCounts); + } finally { + close(htables); + shutDownMiniClusters(); + } + } - // delete "row" and wait - deleteAndWait(row, htable1, htable2); + @After + public void tearDown() throws IOException { + configurations = null; + utilities = null; + } - // make the 2nd cluster replicated back - assertEquals("Puts were replicated back ", 2, getCount(htable2, put)); + @SuppressWarnings("resource") + private void startMiniClusters(int numClusters) throws Exception { + Random random = new Random(); + utilities = new HBaseTestingUtility[numClusters]; + configurations = new Configuration[numClusters]; + for (int i = 0; i < numClusters; i++) { + Configuration conf = new Configuration(baseConfiguration); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt()); + HBaseTestingUtility utility = new HBaseTestingUtility(conf); + if (i == 0) { + utility.startMiniZKCluster(); + miniZK = utility.getZkCluster(); + } else { + utility.setZkCluster(miniZK); + } + utility.startMiniCluster(); + utilities[i] = utility; + configurations[i] = conf; + new ZooKeeperWatcher(conf, "cluster" + i, null, true); + } + } - deleteAndWait(row1, htable2, htable1); + private void shutDownMiniClusters() throws Exception { + int numClusters = utilities.length; + for (int i = numClusters - 1; i >= 0; i--) { + if (utilities[i] != null) { + utilities[i].shutdownMiniCluster(); + } + } + miniZK.shutdown(); + } - assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete)); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + private void createTableOnClusters(HTableDescriptor table) throws Exception { + int numClusters = configurations.length; + for (int i = 0; i < numClusters; i++) { + HBaseAdmin hbaseAdmin = null; + try { + hbaseAdmin = new HBaseAdmin(configurations[i]); + hbaseAdmin.createTable(table); + } finally { + close(hbaseAdmin); + } + } } - private int getCount(HTable t, byte[] type) throws IOException { + private void addPeer(String id, int masterClusterNumber, + int slaveClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.addPeer(id, + utilities[slaveClusterNumber].getClusterKey()); + } finally { + close(replicationAdmin); + } + } + + private void disablePeer(String id, int masterClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.disablePeer(id); + } finally { + close(replicationAdmin); + } + } + + private void enablePeer(String id, int masterClusterNumber) throws Exception { + ReplicationAdmin replicationAdmin = null; + try { + replicationAdmin = new ReplicationAdmin( + configurations[masterClusterNumber]); + replicationAdmin.enablePeer(id); + } finally { + close(replicationAdmin); + } + } + + private void close(Closeable... closeables) { + try { + if (closeables != null) { + for (Closeable closeable : closeables) { + closeable.close(); + } + } + } catch (Exception e) { + LOG.warn("Exception occured while closing the object:", e); + } + } + + @SuppressWarnings("resource") + private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception { + int numClusters = utilities.length; + HTable[] htables = new HTable[numClusters]; + for (int i = 0; i < numClusters; i++) { + HTable htable = new HTable(configurations[i], tableName); + htable.setWriteBufferSize(1024); + htables[i] = htable; + } + return htables; + } + + private void validateCounts(HTable[] htables, byte[] type, + int[] expectedCounts) throws IOException { + for (int i = 0; i < htables.length; i++) { + assertEquals(Bytes.toString(type) + " were replicated back ", + expectedCounts[i], getCount(htables[i], type)); + } + } + + private int getCount(HTable t, byte[] type) throws IOException { Get test = new Get(row); - test.setAttribute("count", new byte[]{}); + test.setAttribute("count", new byte[] {}); Result res = t.get(test); return Bytes.toInt(res.getValue(count, type)); } private void deleteAndWait(byte[] row, HTable source, HTable target) - throws Exception { + throws Exception { Delete del = new Delete(row); source.delete(del); - - Get get = new Get(row); - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for del replication"); - } - Result res = target.get(get); - if (res.size() >= 1) { - LOG.info("Row not deleted"); - Thread.sleep(SLEEP_TIME); - } else { - break; - } - } + wait(row, target, true); } - private void check(byte[] row, byte[] fam, HTable t) throws IOException { - Get get = new Get(row); - Result res = t.get(get); - if (res.size() == 0) { - fail("Row is missing"); - } - } - private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target) - throws Exception { + throws Exception { Put put = new Put(row); put.add(fam, row, row); source.put(put); - - wait(row, target); + wait(row, target, false); } - private void wait(byte[] row, HTable target) throws Exception { + private void wait(byte[] row, HTable target, boolean isDeleted) + throws Exception { Get get = new Get(row); for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for put replication"); + if (i == NB_RETRIES - 1) { + fail("Waited too much time for replication. Row:" + Bytes.toString(row) + + ". IsDeleteReplication:" + isDeleted); } Result res = target.get(get); - if (res.size() == 0) { - LOG.info("Row not available"); + boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0; + if (sleep) { + LOG.info("Waiting for more time for replication. Row:" + + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); Thread.sleep(SLEEP_TIME); } else { - assertArrayEquals(res.value(), row); + if (!isDeleted) { + assertArrayEquals(res.value(), row); + } + LOG.info("Obtained row:" + + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted); break; } - } + } } /** Index: src/main/java/org/apache/hadoop/hbase/client/Mutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Mutation.java (revision 1518378) +++ src/main/java/org/apache/hadoop/hbase/client/Mutation.java (working copy) @@ -33,12 +33,20 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + public abstract class Mutation extends OperationWithAttributes implements Row { private static final Log LOG = LogFactory.getLog(Mutation.class); // Attribute used in Mutations to indicate the originating cluster. private static final String CLUSTER_ID_ATTR = "_c.id_"; private static final String DURABILITY_ID_ATTR = "_dur_"; + /** + * The attribute for storing the list of clusters that have consumed the change. + */ + private static final String CONSUMED_CLUSTER_IDS = "_cs.id"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; protected long lockId = -1L; @@ -243,6 +251,36 @@ } /** + * Marks that the clusters with the given clusterIds have consumed the mutation + * @param clusterIds of the clusters that have consumed the mutation + */ + public void setClusterIds(List clusterIds) { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + out.writeInt(clusterIds.size()); + for (UUID clusterId : clusterIds) { + out.writeLong(clusterId.getMostSignificantBits()); + out.writeLong(clusterId.getLeastSignificantBits()); + } + setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray()); + } + + /** + * @return the set of cluster Ids that have consumed the mutation + */ + public List getClusterIds() { + List clusterIds = new ArrayList(); + byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS); + if(bytes != null) { + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); + int numClusters = in.readInt(); + for(int i=0; i kvs = new ArrayList(); - private NavigableMap scopes; + /** + * This variable contains the information of the column family replication settings and contains + * the clusters that have already consumed the change represented by the object. This overloading + * of scopes with the consumed clusterids was introduced while porting the fix for HBASE-7709 back + * to 0.94 release. However, this overloading has been removed in the newer releases(0.95.2+). To + * check/change the column family settings, please use the getFromScope and putIntoScope methods + * and for marking/checking if a cluster has consumed the change, please use addCluster, + * addClusters and getClusters methods. + */ + private final NavigableMap scopes = new TreeMap( + Bytes.BYTES_COMPARATOR); + // default to decoding uncompressed data - needed for replication, which enforces that // uncompressed edits are sent across the wire. In the regular case (reading/writing WAL), the // codec will be setup by the reader/writer class, not here. @@ -116,22 +136,58 @@ return kvs; } - public NavigableMap getScopes() { - return scopes; + public Integer getFromScope(byte[] key) { + return scopes.get(key); } + public void putIntoScope(byte[] key, Integer value) { + scopes.put(key, value); + } - public void setScopes (NavigableMap scopes) { - // We currently process the map outside of WALEdit, - // TODO revisit when replication is part of core - this.scopes = scopes; + public boolean hasKeyInScope(byte[] key) { + return scopes.containsKey(key); } + /** + * @return true if the cluster with the given clusterId has consumed the change. + */ + public boolean hasClusterId(UUID clusterId) { + return hasKeyInScope(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString())); + } + + /** + * Marks that the cluster with the given clusterId has consumed the change. + */ + public void addClusterId(UUID clusterId) { + scopes.put(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString()), 1); + } + + /** + * Marks that the clusters with the given clusterIds have consumed the change. + */ + public void addClusterIds(List clusterIds) { + for (UUID clusterId : clusterIds) { + addClusterId(clusterId); + } + } + + /** + * @return the set of cluster Ids that have consumed the change. + */ + public List getClusterIds() { + List clusterIds = new ArrayList(); + for (byte[] keyBytes : scopes.keySet()) { + String key = Bytes.toString(keyBytes); + if (key.startsWith(PREFIX_CLUSTER_KEY)) { + clusterIds.add(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY.length()))); + } + } + return clusterIds; + } + public void readFields(DataInput in) throws IOException { kvs.clear(); - if (scopes != null) { - scopes.clear(); - } + scopes.clear(); Decoder decoder = this.codec.getDecoder((DataInputStream) in); int versionOrLength = in.readInt(); int length = versionOrLength; @@ -148,15 +204,12 @@ //its a new style WAL, so we need replication scopes too if (versionOrLength == VERSION_2) { - int numFamilies = in.readInt(); - if (numFamilies > 0) { - if (scopes == null) { - scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - } - for (int i = 0; i < numFamilies; i++) { - byte[] fam = Bytes.readByteArray(in); + int numEntries = in.readInt(); + if (numEntries > 0) { + for (int i = 0; i < numEntries; i++) { + byte[] key = Bytes.readByteArray(in); int scope = in.readInt(); - scopes.put(fam, scope); + scopes.put(key, scope); } } } @@ -173,14 +226,10 @@ } kvEncoder.flush(); - if (scopes == null) { - out.writeInt(0); - } else { - out.writeInt(scopes.size()); - for (byte[] key : scopes.keySet()) { - Bytes.writeByteArray(out, key); - out.writeInt(scopes.get(key)); - } + out.writeInt(scopes.size()); + for (byte[] key : scopes.keySet()) { + Bytes.writeByteArray(out, key); + out.writeInt(scopes.get(key)); } } @@ -189,11 +238,9 @@ for (KeyValue kv : kvs) { ret += kv.heapSize(); } - if (scopes != null) { - ret += ClassSize.TREEMAP; - ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY); - // TODO this isn't quite right, need help here - } + ret += ClassSize.TREEMAP; + ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY); + // TODO this isn't quite right, need help here return ret; } @@ -205,9 +252,7 @@ sb.append(kv.toString()); sb.append("; "); } - if (scopes != null) { - sb.append(" scopes: " + scopes.toString()); - } + sb.append(" scopes: " + scopes.toString()); sb.append(">]"); return sb.toString(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1518378) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1995,6 +1995,7 @@ // bunch up all edits across all column families into a // single WALEdit. addFamilyMapToWALEdit(familyMap, walEdit); + walEdit.addClusterIds(delete.getClusterIds()); this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, clusterId, now, this.htableDescriptor); } @@ -2448,6 +2449,7 @@ // STEP 5. Append the edit to WAL. Do not sync wal. // ------------------------- Mutation first = batchOp.operations[firstIndex].getFirst(); + walEdit.addClusterIds(first.getClusterIds()); txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdit, first.getClusterId(), now, this.htableDescriptor); @@ -2904,6 +2906,7 @@ // will contain uncommitted transactions. if (writeToWAL) { addFamilyMapToWALEdit(familyMap, walEdit); + walEdit.addClusterIds(put.getClusterIds()); this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit, clusterId, now, this.htableDescriptor); } else { Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1518378) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -172,20 +172,15 @@ @Override public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { - NavigableMap scopes = - new TreeMap(Bytes.BYTES_COMPARATOR); byte[] family; for (KeyValue kv : logEdit.getKeyValues()) { family = kv.getFamily(); int scope = htd.getFamily(family).getScope(); if (scope != REPLICATION_SCOPE_LOCAL && - !scopes.containsKey(family)) { - scopes.put(family, scope); + !logEdit.hasKeyInScope(family)) { + logEdit.putIntoScope(family, scope); } } - if (!scopes.isEmpty()) { - logEdit.setScopes(scopes); - } } @Override Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1518378) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -30,7 +30,6 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; @@ -477,8 +475,14 @@ seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); - // don't replicate if the log entries originated in the peer - if (!logKey.getClusterId().equals(peerClusterId)) { + List consumedClusterIds = edit.getClusterIds(); + // This cluster id has been added to resolve the scenario of A -> B -> A where A has old + // point release and B has the new point release which has the fix HBASE-7709. A change on + // cluster A would infinitely replicate to + // cluster B if we don't add the original cluster id to the set. + consumedClusterIds.add(logKey.getClusterId()); + // don't replicate if the log entries if it has not already been replicated + if (!consumedClusterIds.contains(peerClusterId)) { removeNonReplicableEdits(edit); // Don't replicate catalog entries, if the WALEdit wasn't // containing anything to replicate and if we're currently not set to replicate @@ -491,6 +495,8 @@ // This is *only* place where a cluster id other than the default is set. if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) { logKey.setClusterId(this.clusterId); + } else if (logKey.getClusterId() != this.clusterId) { + edit.addClusterId(clusterId); } currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; @@ -664,13 +670,12 @@ * @param edit The KV to check for replication */ protected void removeNonReplicableEdits(WALEdit edit) { - NavigableMap scopes = edit.getScopes(); List kvs = edit.getKeyValues(); for (int i = edit.size()-1; i >= 0; i--) { KeyValue kv = kvs.get(i); // The scope will be null or empty if // there's nothing to replicate in that WALEdit - if (scopes == null || !scopes.containsKey(kv.getFamily())) { + if (!edit.hasKeyInScope(kv.getFamily())) { kvs.remove(i); } } @@ -928,4 +933,4 @@ return Long.parseLong(parts[parts.length-1]); } } -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1518378) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -110,9 +110,11 @@ // to the same table. try { long totalReplicated = 0; - // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per - // invocation of this method per table and cluster id. - Map>> rowMap = new TreeMap>>(Bytes.BYTES_COMPARATOR); + // Map of table => list of Rows, grouped by clusters that consumed the change, we only want to + // flushCommits once per + // invocation of this method per table and clusters that have consumed the change. + Map, List>> rowMap = + new TreeMap, List>>(Bytes.BYTES_COMPARATOR); for (HLog.Entry entry : entries) { WALEdit edit = entry.getEdit(); byte[] table = entry.getKey().getTablename(); @@ -123,14 +125,19 @@ for (KeyValue kv : kvs) { if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) { UUID clusterId = entry.getKey().getClusterId(); + List clusterIds = edit.getClusterIds(); if (kv.isDelete()) { del = new Delete(kv.getRow()); del.setClusterId(clusterId); - addToHashMultiMap(rowMap, table, clusterId, del); + del.setClusterIds(clusterIds); + clusterIds.add(clusterId); + addToHashMultiMap(rowMap, table, clusterIds, del); } else { put = new Put(kv.getRow()); put.setClusterId(clusterId); - addToHashMultiMap(rowMap, table, clusterId, put); + put.setClusterIds(clusterIds); + clusterIds.add(clusterId); + addToHashMultiMap(rowMap, table, clusterIds, put); } } if (kv.isDelete()) { @@ -142,7 +149,7 @@ } totalReplicated++; } - for(Map.Entry>> entry : rowMap.entrySet()) { + for(Map.Entry, List>> entry : rowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values()); } this.metrics.setAgeOfLastAppliedOp( @@ -162,7 +169,7 @@ * @param key1 * @param key2 * @param value - * @return + * @return the list of values for the combination of key1 and key2 */ private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { Map> innerMap = map.get(key1);