Index: src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (revision 0) @@ -0,0 +1,268 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMasterReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + + private static ReplicationAdmin admin1; + private static ReplicationAdmin admin2; + private static String clusterKey2; + private static String clusterKey1; + + private static HTable htable1; + private static HTable htable2; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static final int NB_ROWS_IN_BATCH = 100; + private static final int NB_ROWS_IN_BIG_BATCH = + NB_ROWS_IN_BATCH * 10; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] row1 = Bytes.toBytes("row1"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + admin1 = new ReplicationAdmin(conf1); + LOG.info("Setup first Zk"); + + conf2 = HBaseConfiguration.create(); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf2.setInt("hbase.client.retries.number", 6); + conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf2.setBoolean("dfs.support.append", true); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster2", null, true); + + admin2 = new ReplicationAdmin(conf2); + + clusterKey1 = conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf1.get("hbase.zookeeper.property.clientPort")+":/1"; + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + // set M-M + admin1.addPeer("2", clusterKey2); + admin2.addPeer("1", clusterKey1); + + LOG.info("Setup second Zk"); + utility1.startMiniCluster(2); + utility2.startMiniCluster(2); + + HTableDescriptor table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + HBaseAdmin admin1 = new HBaseAdmin(conf1); + HBaseAdmin admin2 = new HBaseAdmin(conf2); + admin1.createTable(table); + admin2.createTable(table); + htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + htable2 = new HTable(conf2, tableName); + } + + private static void setIsReplication(boolean rep) throws Exception { + LOG.info("Set rep " + rep); + admin1.setReplicating(rep); + admin2.setReplicating(rep); + Thread.sleep(SLEEP_TIME); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for ( JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + utility1.truncateTable(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call truncateTable on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + /** + * Add a row, check it's replicated, delete it, check's gone + * @throws Exception + */ + @Test(timeout=300000) + public void testSimplePutDelete() throws Exception { + LOG.info("testSimplePutDelete"); + + putAndWait(row, famName, htable1, htable2); + putAndWait(row1, famName, htable2, htable1); + + // make sure row did not get replicated back. + Thread.sleep(SLEEP_TIME); + Get get = new Get(row); + get.setMaxVersions(); + Result res = htable1.get(get); + if (res.size() > 1) { + fail("Data got replicated back"); + } + + deleteAndWait(row, htable1, htable2); + deleteAndWait(row1, htable2, htable1); + } + + private void deleteAndWait(byte[] row, HTable source, HTable target) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1163862) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -203,6 +203,12 @@ /** Configuration key storing the cluster ID */ public static final String CLUSTER_ID = "hbase.cluster.id"; + /** + * Attribute used in Puts and Gets to indicate the originating + * cluster. + */ + public static final String CLUSTER_ID_ATTR = "_c.id_"; + // Always store the location of the root table's HRegion. // This HRegion is never split. Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1163862) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -970,34 +970,44 @@ public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now, HTableDescriptor htd) throws IOException { - if (edits.isEmpty()) return; - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); - HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); - doWrite(info, logKey, edits, htd); - this.numEntries.incrementAndGet(); - } - // Sync if catalog region, and if not then check if that table supports - // deferred log flushing - if (info.isMetaRegion() || - !htd.isDeferredLogFlush()) { - // sync txn to file system - this.sync(); - } + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd); } + /* + * Sample code only... + */ + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, byte clusterId, + final long now, HTableDescriptor htd) + throws IOException { + if (edits.isEmpty()) return; + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + synchronized (this.updateLock) { + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + // Use encoded name. Its shorter, guaranteed unique and a subset of + // actual name. + byte [] hriKey = info.getEncodedNameAsBytes(); + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); + logKey.setClusterId(clusterId); + doWrite(info, logKey, edits, htd); + this.numEntries.incrementAndGet(); + } + // Sync if catalog region, and if not then check if that table supports + // deferred log flushing + if (info.isMetaRegion() || + !htd.isDeferredLogFlush()) { + // sync txn to file system + this.sync(); + } + } + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1163862) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1404,7 +1404,9 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - delete(delete.getFamilyMap(), writeToWAL); + byte[] clusterIdArr = delete.getAttribute(HConstants.CLUSTER_ID_ATTR); + byte clusterId = clusterIdArr == null ? HConstants.DEFAULT_CLUSTER_ID : clusterIdArr[0]; + delete(delete.getFamilyMap(), clusterId, writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1413,13 +1415,19 @@ } } + /** + * sample code... + */ + public void delete(Map> familyMap, boolean writeToWAL) throws IOException { + delete(familyMap, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + } /** * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(Map> familyMap, boolean writeToWAL) + public void delete(Map> familyMap, byte clusterId, boolean writeToWAL) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { @@ -1490,7 +1498,7 @@ WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } // Now make changes to the memstore. @@ -1760,8 +1768,10 @@ } // Append the edit to WAL + byte[] clusterIdArr = batchOp.operations[firstIndex].getFirst().getAttribute(HConstants.CLUSTER_ID_ATTR); + byte clusterId = clusterIdArr == null ? HConstants.DEFAULT_CLUSTER_ID : clusterIdArr[0]; this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); // ------------------------------------ // STEP 4. Write back to memstore Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1163862) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -366,23 +366,30 @@ this.reader.seek(this.position); } HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); + byte peerCluster = Byte.valueOf(peerClusterId); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); seenEntries++; // Remove all KVs that should not be replicated - removeNonReplicableEdits(edit); HLogKey logKey = entry.getKey(); - // Don't replicate catalog entries, if the WALEdit wasn't - // containing anything to replicate and if we're currently not set to replicate - if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || - Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && - edit.size() != 0 && replicating.get()) { - logKey.setClusterId(this.clusterId); - currentNbOperations += countDistinctRowKeys(edit); - currentNbEntries++; - } else { - this.metrics.logEditsFilteredRate.inc(1); + // don't replicate if the log entries originated in the peer + if (logKey.getClusterId() != peerCluster) { + removeNonReplicableEdits(edit); + // Don't replicate catalog entries, if the WALEdit wasn't + // containing anything to replicate and if we're currently not set to replicate + if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || + Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && + edit.size() != 0 && replicating.get()) { + // only set the clusterId if is a local key. + if (logKey.getClusterId() == HConstants.DEFAULT_CLUSTER_ID) { + logKey.setClusterId(this.clusterId); + } + currentNbOperations += countDistinctRowKeys(edit); + currentNbEntries++; + } else { + this.metrics.logEditsFilteredRate.inc(1); + } } // Stop if too many entries or too big if ((this.reader.getPosition() - this.position) Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1163862) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; @@ -106,6 +107,8 @@ if (kvs.get(0).isDelete()) { Delete delete = new Delete(kvs.get(0).getRow(), kvs.get(0).getTimestamp(), null); + delete.setAttribute(HConstants.CLUSTER_ID_ATTR, + new byte[] {entry.getKey().getClusterId()}); for (KeyValue kv : kvs) { if (kv.isDeleteFamily()) { delete.deleteFamily(kv.getFamily()); @@ -126,10 +129,14 @@ byte[] lastKey = kvs.get(0).getRow(); Put put = new Put(kvs.get(0).getRow(), kvs.get(0).getTimestamp()); + put.setAttribute(HConstants.CLUSTER_ID_ATTR, + new byte[] {entry.getKey().getClusterId()}); for (KeyValue kv : kvs) { if (!Bytes.equals(lastKey, kv.getRow())) { tableList.add(put); put = new Put(kv.getRow(), kv.getTimestamp()); + put.setAttribute(HConstants.CLUSTER_ID_ATTR, + new byte[] {entry.getKey().getClusterId()}); } put.add(kv.getFamily(), kv.getQualifier(), kv.getValue()); lastKey = kv.getRow();