diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8e6d17d..729f525 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -132,6 +132,8 @@ public class ReplicationSource extends Thread private ReplicationSinkManager replicationSinkMgr; //WARN threshold for the number of queued logs, defaults to 2 private int logQueueWarnThreshold; + // at the point of time, whether Ops are being processed and shipped + private boolean isShippingOps; /** * Instantiation method used by region servers @@ -184,6 +186,7 @@ public class ReplicationSource extends Thread this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); + this.isShippingOps = false; } @@ -258,8 +261,9 @@ public class ReplicationSource extends Thread if (sleepForRetries("No log to process", sleepMultiplier)) { sleepMultiplier++; } + isShippingOps = false; continue; - } + } else isShippingOps = true; boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the //current WAL file (this.currentPath) is in use (for writing) NOW! @@ -812,4 +816,9 @@ public class ReplicationSource extends Thread ", currently replicating from: " + this.currentPath + " at position: " + position; } + + @Override + public boolean getIsShippingOps() { + return isShippingOps; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df599f0..e6b7b31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -103,4 +103,10 @@ public interface ReplicationSourceInterface { */ String getStats(); + /** + * Get whether OPs is being shipped at the point of time + * @return whether OPs is being shipped + */ + public boolean getIsShippingOps(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java new file mode 100644 index 0000000..6841ff4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * In a scenario of Replication based Disaster/Recovery, when hbase + * Master-Cluster crashes, this tool is used to sync-up the delta from Master to + * Slave using the info from Zookeeper. The tool will run on Master-Cluser, and + * assume ZK, Filesystem and NetWork still available after hbase crashes + * + * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp + */ + +public class ReplicationSyncUp extends Configured implements Tool { + + static final Log LOG = LogFactory.getLog(ReplicationSyncUp.class.getName()); + + private static Configuration conf; + private static Replication replication; + private static FileSystem fs; + private static Path oldLogDir; + private static Path logDir; + private static ZooKeeperWatcher zkw; + private static ReplicationSourceManager manager; + private static Path rootDir; + + private static final long SLEEP_TIME = 1000; + + // although the tool is designed to be run on command line + // this api is provided for executing the tool through another app + public static void setConfigure(Configuration config) { + conf = config; + } + + /** + * Main program + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + if (conf == null) conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new ReplicationSyncUp(), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + }; + + zkw = + new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, + true); + + rootDir = FSUtils.getRootDir(conf); + fs = FileSystem.get(conf); + oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + + System.out.println("Start Replication Server start"); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + manager = replication.getReplicationManager(); + manager.init(); + + try { + int wait = 1; // default wait once + while (wait > 0) { + Thread.sleep(SLEEP_TIME); + wait = 0; + for (ReplicationSourceInterface source : manager.getSources()) { + if (source.getIsShippingOps()) { + // need to wait with at least one of the source has not completed + wait++; + } + } + } + } catch (InterruptedException e) { + System.err.println("interruped during sync up. Exception:" + e); + return (-1); + } + + Thread.sleep(10000); // wait another 10 sec to ensure ops sinked in + manager.join(); + + return (0); + } + + static class DummyServer implements Server { + String hostname; + + DummyServer() { + // an unique name in case the first run fails + hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org"; + } + + DummyServer(String hostname) { + this.hostname = hostname; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public ServerName getServerName() { + return new ServerName(hostname, 1234, 1L); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index d0868d6..9335ad6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -86,4 +86,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getStats() { return ""; } + + @Override + public boolean getIsShippingOps() { + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java new file mode 100644 index 0000000..c41e5da --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestReplicationSyncUpTool extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class); + + private static final byte[] t1_su = Bytes.toBytes("t1_syncup"); + private static final byte[] t2_su = Bytes.toBytes("t2_syncup"); + + private static final byte[] famName = Bytes.toBytes("cf1"); + private static final byte[] qualName = Bytes.toBytes("q1"); + + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private HTableDescriptor t1_syncupSource, t1_syncupTarget; + private HTableDescriptor t2_syncupSource, t2_syncupTarget; + + private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1; + private int rowCount_ht1Source, rowCount_ht2Source, rowCount_ht1TargetAtPeer1, + rowCount_ht2TargetAtPeer1; + + @Before + public void setUp() throws Exception { + + HColumnDescriptor fam; + + t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su)); + fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + t1_syncupSource.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + t1_syncupSource.addFamily(fam); + + t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su)); + fam = new HColumnDescriptor(famName); + t1_syncupTarget.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + t1_syncupTarget.addFamily(fam); + + t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su)); + fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + t2_syncupSource.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + t2_syncupSource.addFamily(fam); + + t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su)); + fam = new HColumnDescriptor(famName); + t2_syncupTarget.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + t2_syncupTarget.addFamily(fam); + + } + + /** + * Add a row to a table in each cluster, check it's replicated, delete it, + * check's gone Also check the puts and deletes are not replicated back to + * the originating cluster. + */ + @Test(timeout = 300000) + public void testSyncUpTool() throws Exception { + + /** + * Set up Replication: on Master and one Slave + * Table: t1_syncup and t2_syncup + * columnfamily: + * 'cf1' : replicated + * 'norep': not replicated + */ + setupReplication(); + + /** + * at Master: + * t1_syncup: put 100 rows into cf1, and 1 rows into norep + * t2_syncup: put 200 rows into cf1, and 1 rows into norep + * + * verify correctly replicated to slave + */ + putAndReplicateRows(); + + /** + * Verify delete works + * + * step 1: stop hbase on Slave + * + * step 2: at Master: + * t1_syncup: delete 50 rows from cf1 + * t2_syncup: delete 100 rows from cf1 + * no change on 'norep' + * + * step 3: stop hbase on master, restart hbase on Slave + * + * step 4: verify Slave still have the rows before delete + * t1_syncup: 100 rows from cf1 + * t2_syncup: 200 rows from cf1 + * + * step 5: run syncup tool on Master + * + * step 6: verify that delete show up on Slave + * t1_syncup: 50 rows from cf1 + * t2_syncup: 100 rows from cf1 + * + * verify correctly replicated to Slave + */ + mimicSyncUpAfterDelete(); + + /** + * Verify put works + * + * step 1: stop hbase on Slave + * + * step 2: at Master: + * t1_syncup: put 100 rows from cf1 + * t2_syncup: put 200 rows from cf1 + * and put another row on 'norep' + * ATTN: put to 'cf1' will overwrite existing rows, so end count will + * be 100 and 200 respectively + * put to 'norep' will add a new row. + * + * step 3: stop hbase on master, restart hbase on Slave + * + * step 4: verify Slave still has the rows before put + * t1_syncup: 50 rows from cf1 + * t2_syncup: 100 rows from cf1 + * + * step 5: run syncup tool on Master + * + * step 6: verify that put show up on Slave + * and 'norep' does not + * t1_syncup: 100 rows from cf1 + * t2_syncup: 200 rows from cf1 + * + * verify correctly replicated to Slave + */ + mimicSyncUpAfterPut(); + + } + + private void setupReplication() throws Exception { + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + + HBaseAdmin ha = new HBaseAdmin(conf1); + ha.createTable(t1_syncupSource); + ha.createTable(t2_syncupSource); + ha.close(); + + ha = new HBaseAdmin(conf2); + ha.createTable(t1_syncupTarget); + ha.createTable(t2_syncupTarget); + ha.close(); + + // Get HTable from Master + ht1Source = new HTable(conf1, t1_su); + ht1Source.setWriteBufferSize(1024); + ht2Source = new HTable(conf1, t2_su); + ht1Source.setWriteBufferSize(1024); + + // Get HTable from Peer1 + ht1TargetAtPeer1 = new HTable(conf2, t1_su); + ht1TargetAtPeer1.setWriteBufferSize(1024); + ht2TargetAtPeer1 = new HTable(conf2, t2_su); + ht2TargetAtPeer1.setWriteBufferSize(1024); + + /** + * set M-S : Master: utility1 Slave1: utility2 + */ + admin1.addPeer("1", utility2.getClusterKey()); + + admin1.close(); + admin2.close(); + } + + private void putAndReplicateRows() throws Exception { + // add rows to Master cluster, + Put p; + + // 100 + 1 row to t1_syncup + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + ht1Source.put(p); + } + p = new Put(Bytes.toBytes("row" + 9999)); + p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); + ht1Source.put(p); + + // 200 + 1 row to t2_syncup + for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + ht2Source.put(p); + } + p = new Put(Bytes.toBytes("row" + 9999)); + p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999)); + ht2Source.put(p); + + // ensure replication completed + Thread.sleep(SLEEP_TIME); + + rowCount_ht1Source = utility1.countRows(ht1Source); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1, + rowCount_ht1TargetAtPeer1); + + rowCount_ht2Source = utility1.countRows(ht2Source); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1, + rowCount_ht2TargetAtPeer1); + + } + + private void mimicSyncUpAfterDelete() throws Exception { + utility2.shutdownMiniHBaseCluster(); + + List list = new ArrayList(); + // delete half of the rows + for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) { + String rowKey = "row" + i; + Delete del = new Delete(rowKey.getBytes()); + list.add(del); + } + ht1Source.delete(list); + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + String rowKey = "row" + i; + Delete del = new Delete(rowKey.getBytes()); + list.add(del); + } + ht2Source.delete(list); + + rowCount_ht1Source = utility1.countRows(ht1Source); + assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51, + rowCount_ht1Source); + + rowCount_ht2Source = utility1.countRows(ht2Source); + assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", + 101, rowCount_ht2Source); + + utility1.shutdownMiniHBaseCluster(); + utility2.restartHBaseCluster(1); + + Thread.sleep(SLEEP_TIME); + + // before sync up + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1); + + // After sync up + syncUp(utility1); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, + rowCount_ht2TargetAtPeer1); + + } + + private void mimicSyncUpAfterPut() throws Exception { + utility1.restartHBaseCluster(1); + utility2.shutdownMiniHBaseCluster(); + + Put p; + // another 100 + 1 row to t1_syncup + // we should see 100 + 2 rows now + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + ht1Source.put(p); + } + p = new Put(Bytes.toBytes("row" + 9998)); + p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); + ht1Source.put(p); + + // another 200 + 1 row to t1_syncup + // we should see 200 + 2 rows now + for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + ht2Source.put(p); + } + p = new Put(Bytes.toBytes("row" + 9998)); + p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998)); + ht2Source.put(p); + + rowCount_ht1Source = utility1.countRows(ht1Source); + assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source); + rowCount_ht2Source = utility1.countRows(ht2Source); + assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source); + + utility1.shutdownMiniHBaseCluster(); + utility2.restartHBaseCluster(1); + + Thread.sleep(SLEEP_TIME); + + // before sync up + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, + rowCount_ht2TargetAtPeer1); + + // after syun up + syncUp(utility1); + rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1); + rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, + rowCount_ht1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, + rowCount_ht2TargetAtPeer1); + + } + + private void syncUp(HBaseTestingUtility ut) throws Exception { + ReplicationSyncUp.setConfigure(ut.getConfiguration()); + String[] arguments = new String[] { null }; + new ReplicationSyncUp().run(arguments); + } + +}