diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 6c79537..27774bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -89,15 +89,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // Amount of time for shutdown to wait for all tasks to complete private long maxTerminationWait; // Size limit for replication RPCs, in bytes - private int replicationRpcLimit; + protected int replicationRpcLimit; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers - private ReplicationSinkManager replicationSinkMgr; + protected ReplicationSinkManager replicationSinkMgr; private boolean peersSelected = false; private String replicationClusterId = ""; private ThreadPoolExecutor exec; - private int maxThreads; + protected int maxThreads; private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; @@ -194,7 +194,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return sleepMultiplier < maxRetriesMultiplier; } - private List> createBatches(final List entries) { + protected List> createBatches(final List entries) { int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); // Maintains the current batch for a given partition index diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RedirectingInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RedirectingInterClusterReplicationEndpoint.java new file mode 100644 index 0000000..74a7a28 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RedirectingInterClusterReplicationEndpoint.java @@ -0,0 +1,129 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.*; + +/** + * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} + * implementation for replicating to different namespaces and/or tables in another HBase cluster. + * Redirection is configured via redirection rules within the configuration field of + * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig}. + * A redirection rule is a key-value pair of the following format: + * "src-ns:table" : "dst-ns:table" + * + * This configuration can be provided on the HBase shell, for example, as follows: + * add_peer '9', CLUSTER_KEY => "localhost:2182:/hbase2", + * TABLE_CFS => {"ns1:t1" => ["cf1"], "ns2:t2" => ["cf1"]}, + * ENDPOINT_CLASSNAME => + * 'org.apache.hadoop.hbase.replication.regionserver.RedirectingInterClusterReplicationEndpoint', + * CONFIG => {"ns1:t1" => "ns1:t2", "ns2:t2" => "ns3:t2"} + * + */ +@InterfaceAudience.Private +public class RedirectingInterClusterReplicationEndpoint + extends HBaseInterClusterReplicationEndpoint { + + + private Map tableRedirectionsMap = null; + private static final Log LOG = + LogFactory.getLog(RedirectingInterClusterReplicationEndpoint.class); + + @Override + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + tableRedirectionsMap = new HashMap<>();; + Iterator keys = rpc.getConfiguration().keySet().iterator(); + while (keys.hasNext()){ + String key = keys.next(); + try { + byte[] keyBytes = TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(key)); + String val = rpc.getConfiguration().get(key); + byte[] valBytes = TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(val)); + tableRedirectionsMap.put(TableName.valueOf(keyBytes), TableName.valueOf(valBytes)); + LOG.debug("Redirecting replication from " + key + "to " + val); + } catch (IllegalArgumentException e) { + LOG.warn("Found unknown configuration key " + key + "in ReplicationPeerConfiguration"); + } + } + } + + /** + * @param entries {@link java.util.List} of WAL {@link org.apache.hadoop.hbase.wal.WAL.Entry} + * that are redirected if a corresponding redirection rule + * has been configured + * @return Number of redirected entries + */ + public int redirectEntries(final List entries) { + int count = 0; + for (WAL.Entry e: entries) { + TableName redirectedTablename = tableRedirectionsMap.get(e.getKey().getTablename()); + if (redirectedTablename != null) { + e.getKey().setTablename(redirectedTablename); + count++; + } + } + return count; + } + + @Override + protected List> createBatches(final List entries) { + int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1); + int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks); + // Maintains the current batch for a given partition index + Map> entryMap = new HashMap<>(n); + List> entryLists = new ArrayList<>(); + int[] sizes = new int[n]; + + // Redirect the edits to another table in the target + int redirected = this.redirectEntries(entries); + LOG.info("Redirected " + redirected + " WAL entries."); + + for (int i = 0; i < n; i++) { + entryMap.put(i, new ArrayList(entries.size()/n+1)); + } + + for (WAL.Entry e: entries) { + int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n); + int entrySize = (int)e.getKey().estimatedSerializedSizeOf() + + (int)e.getEdit().estimatedSerializedSizeOf(); + // If this batch is oversized, add it to final list and initialize a new empty batch + if (sizes[index] > 0 /* must include at least one entry */ && + sizes[index] + entrySize > replicationRpcLimit) { + entryLists.add(entryMap.get(index)); + entryMap.put(index, new ArrayList()); + sizes[index] = 0; + } + entryMap.get(index).add(e); + sizes[index] += entrySize; + } + + entryLists.addAll(entryMap.values()); + return entryLists; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 751ceba..4307377 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -360,6 +360,14 @@ public class WALKey implements SequenceId, Comparable { return tablename; } + /** + * Set the tablename of this WALKey + * @param tablename + */ + public void setTablename(TableName tablename) { + this.tablename = tablename; + } + /** @return log sequence number * @deprecated Use {@link #getSequenceId()} */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java index 433a345..422b4cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java @@ -19,10 +19,6 @@ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -40,10 +36,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -66,7 +58,6 @@ public class TestNamespaceReplication extends TestReplicationBase { private static final byte[] f1Name = Bytes.toBytes("f1"); private static final byte[] f2Name = Bytes.toBytes("f2"); - private static final byte[] val = Bytes.toBytes("myval"); private static HTableDescriptor tabA; private static HTableDescriptor tabB; @@ -185,63 +176,4 @@ public class TestNamespaceReplication extends TestReplicationBase { admin.removePeer("2"); } - private void put(Table source, byte[] row, byte[]... families) - throws Exception { - for (byte[] fam : families) { - Put put = new Put(row); - put.addColumn(fam, row, val); - source.put(put); - } - } - - private void delete(Table source, byte[] row, byte[]... families) - throws Exception { - for (byte[] fam : families) { - Delete del = new Delete(row); - del.addFamily(fam); - source.delete(del); - } - } - - private void ensureRowExisted(Table target, byte[] row, byte[]... families) - throws Exception { - for (byte[] fam : families) { - Get get = new Get(row); - get.addFamily(fam); - for (int i = 0; i < NB_RETRIES; i++) { - if (i == NB_RETRIES - 1) { - fail("Waited too much time for put replication"); - } - Result res = target.get(get); - if (res.isEmpty()) { - LOG.info("Row not available"); - } else { - assertEquals(res.size(), 1); - assertArrayEquals(res.value(), val); - break; - } - Thread.sleep(SLEEP_TIME); - } - } - } - - private void ensureRowNotExisted(Table target, byte[] row, byte[]... families) - throws Exception { - for (byte[] fam : families) { - Get get = new Get(row); - get.addFamily(fam); - for (int i = 0; i < NB_RETRIES; i++) { - if (i == NB_RETRIES - 1) { - fail("Waited too much time for delete replication"); - } - Result res = target.get(get); - if (res.size() >= 1) { - LOG.info("Row not deleted"); - } else { - break; - } - Thread.sleep(SLEEP_TIME); - } - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRedirectedReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRedirectedReplication.java new file mode 100644 index 0000000..d81b053 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRedirectedReplication.java @@ -0,0 +1,280 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.replication.regionserver.RedirectingInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MediumTests.class}) +public class TestRedirectedReplication extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestRedirectedReplication.class); + + private static final byte[] row2 = Bytes.toBytes("row2"); + + private static String ns1 = "ns1"; + private static String ns2 = "ns2"; + + private static String ns1T1Name = "ns1:t1"; + private static String ns2T1Name = "ns2:t1"; + private static String ns1T2Name = "ns1:t2"; + private static String ns2T2Name = "ns2:t2"; + + private static final TableName ns1T1 = TableName.valueOf(ns1T1Name ); + private static final TableName ns2T1 = TableName.valueOf(ns2T1Name ); + private static final TableName ns1T2 = TableName.valueOf(ns1T2Name ); + private static final TableName ns2T2 = TableName.valueOf(ns2T2Name ); + + private static final byte[] f1Name = Bytes.toBytes("f1"); + + private static HTableDescriptor tabLeftNs1T1; + private static HTableDescriptor tabLeftNs2T1; + private static HTableDescriptor tabRightNs1T2; + private static HTableDescriptor tabRightNs2T2; + + private static Connection connection1; + private static Connection connection2; + private static Admin admin1; + private static Admin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestReplicationBase.setUpBeforeClass(); + + connection1 = ConnectionFactory.createConnection(conf1); + connection2 = ConnectionFactory.createConnection(conf2); + admin1 = connection1.getAdmin(); + admin2 = connection2.getAdmin(); + + admin1.createNamespace(NamespaceDescriptor.create(ns1).build()); + admin1.createNamespace(NamespaceDescriptor.create(ns2).build()); + admin2.createNamespace(NamespaceDescriptor.create(ns1).build()); + admin2.createNamespace(NamespaceDescriptor.create(ns2).build()); + + HColumnDescriptor fam = new HColumnDescriptor(f1Name); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + + tabLeftNs1T1 = new HTableDescriptor(ns1T1); + tabLeftNs1T1.addFamily(fam); + + tabLeftNs2T1 = new HTableDescriptor(ns2T1); + tabLeftNs2T1.addFamily(fam); + + tabRightNs1T2 = new HTableDescriptor(ns1T2); + tabRightNs1T2.addFamily(fam); + + tabRightNs2T2 = new HTableDescriptor(ns2T2); + tabRightNs2T2.addFamily(fam); + + ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); + rpc3.setClusterKey(utility2.getClusterKey()); + rpc3.setReplicationEndpointImpl( + RedirectingInterClusterReplicationEndpoint.class.getName()); + hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); + hbaseAdmin.addReplicationPeer("3", rpc3); + + + admin1.createTable(tabLeftNs1T1); + admin1.createTable(tabLeftNs2T1); + admin2.createTable(tabLeftNs1T1); + admin2.createTable(tabRightNs1T2); + admin2.createTable(tabRightNs2T2); + + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + + admin.removePeer("2"); + admin.removePeer("3"); + admin1.disableTable(ns1T1); + admin1.deleteTable(ns1T1); + admin1.disableTable(ns2T1); + admin1.deleteTable(ns2T1); + + admin2.disableTable(ns1T1); + admin2.deleteTable(ns1T1); + admin2.disableTable(ns1T2); + admin2.deleteTable(ns1T2); + admin2.disableTable(ns2T2); + admin2.deleteTable(ns2T2); + + admin1.deleteNamespace(ns1); + admin1.deleteNamespace(ns2); + admin2.deleteNamespace(ns1); + admin2.deleteNamespace(ns2); + + connection1.close(); + connection2.close(); + TestReplicationBase.tearDownAfterClass(); + } + + + @Test + public void testRedirectedReplication() throws Exception { + + LOG.info("Starting RedirectedReplication test"); + Table htabLeftNs1T1 = connection1.getTable(ns1T1); + Table htabLeftNs2T1 = connection1.getTable(ns2T1); + Table htabRightNs1T1 = connection2.getTable(ns1T1); + Table htabRightNs1T2 = connection2.getTable(ns1T2); + Table htabRightNs2T2 = connection2.getTable(ns2T2); + + + /* + Step 1: "No redirection config, no redirection" + a) NS1 is set up left and right, no redirection rules are configured + b) put row to htabLeftNs1T1 + c) row should exist in htabLeftNs1T1 + d) row should not exist in htabRightNs1T2 + e) delete row from htabLeftNs1T1 + f) row should not exist in htabLeftNs1T1 + */ + // add ns1 to old peer config which replicate to cluster2 + ReplicationPeerConfig rpc2 = admin.getPeerConfig("2"); + LOG.info("Step1 start"); + put(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabRightNs1T1, row, f1Name); + ensureRowNotExisted(htabRightNs1T2, row, f1Name); + delete(htabLeftNs1T1, row, f1Name); + LOG.info("Step1 end"); + + /* + Step 2: "Redirection to different table name works" + a) add redirection rule htabLeftNs1T1 -> htabRightNs1T2 + b) put row to htabLeftNs1T1 + c) row should exist in htabLeftNs1T1 + d) row should _also_ exist in htabRightNs1T2 + e) delete row from htabLeftNs1T1 + f) row should not exist in htabLeftNs1T1 + g) row should not exist in htabRightNs1T2 + h) delete redirection rule htabLeftNs1T1 -> htabRightNs1T2 + */ + LOG.info("Step2 start"); + ReplicationPeerConfig rpc3 = admin.getPeerConfig("3"); + rpc3.getConfiguration().putAll( + new HashMap() { + { put(ns1T1Name,ns1T2Name); } + } + ); + admin.updatePeerConfig("3", rpc3); + LOG.info("updated peer config"); + put(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabRightNs1T2, row, f1Name); + delete(htabLeftNs1T1, row, f1Name); + ensureRowNotExisted(htabLeftNs1T1, row, f1Name); + ensureRowNotExisted(htabRightNs1T2, row, f1Name); + rpc3.getConfiguration().remove(ns1T1Name); + admin.updatePeerConfig("3", rpc3); + LOG.info("Step2 end"); + + /* + Step 3: "Redirection to different namespace works" + a) add redirection rule htabLeftNs1T1 -> htabRightNs2T1 + b) put row to htabLeftNs1T1 + c) row should exist in htabLeftNs1T1 + d) row should _also_ exist in htabRightNs2T2 + e) delete row from htabLeftNs1T1 + f) row should not exist in htabLeftNs1T1 + g) row should not exist in htabRightNs2T2 + */ + LOG.info("Step3 start"); + rpc3 = admin.getPeerConfig("3"); + rpc3.getConfiguration().putAll( + new HashMap() { + { put(ns1T1Name,ns2T2Name);} + } + ); + admin.updatePeerConfig("3", rpc3); + put(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabRightNs2T2, row, f1Name); + delete(htabLeftNs1T1, row, f1Name); + ensureRowNotExisted(htabLeftNs1T1, row, f1Name); + ensureRowNotExisted(htabRightNs2T2, row, f1Name); + rpc3.getConfiguration().remove(ns1T1Name); + admin.updatePeerConfig("3", rpc3); + LOG.info("Step3 end"); + + /* + Step 4: "Multiple redirection rules work at the same time" + a) add redirection rule htabLeftNs1T1 -> htabRightNs1T2 + b) add redirection rule htabLeftNs2T1 -> htabRightNs2T2 + c) put row to htabLeftNs1T1 + d) row should exist in htabLeftNs1T1 + e) row should _also_ exist in htabRightNs1T2 + c) put row to htabLeftN21T1 + d) row should exist in htabLeftNs2T1 + e) row should _also_ exist in htabRightNs2T2 + f) delete row from htabLeftNs1T1 + f) delete row from htabLeftNs2T1 + g) row should not exist in htabLeftNs1T1 + h) row should not exist in htabRightNs1T2 + g) row should not exist in htabLeftNs2T1 + h) row should not exist in htabRightNs2T2 + j) delete redirection rule htabLeftNs1T1 -> htabRightNs2T2 + */ + LOG.info("Step4 start"); + rpc3.getConfiguration().putAll( + new HashMap() { + { put(ns1T1Name,ns1T2Name); + put(ns2T1Name,ns2T2Name); } + } + ); + hbaseAdmin.updateReplicationPeerConfig("3", rpc3); + put(htabLeftNs1T1, row, f1Name); + put(htabLeftNs2T1, row2, f1Name); + ensureRowExisted(htabLeftNs1T1, row, f1Name); + ensureRowExisted(htabRightNs1T2, row, f1Name); + ensureRowExisted(htabLeftNs2T1, row2, f1Name); + ensureRowExisted(htabRightNs2T2, row2, f1Name); + delete(htabLeftNs1T1, row, f1Name); + delete(htabLeftNs2T1, row2, f1Name); + ensureRowNotExisted(htabLeftNs1T1, row, f1Name); + ensureRowNotExisted(htabRightNs1T2, row, f1Name); + ensureRowNotExisted(htabLeftNs2T1, row2, f1Name); + ensureRowNotExisted(htabRightNs2T2, row2, f1Name); + LOG.info("Step4 end"); + + } + + + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 87918ee..513d809 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -41,6 +45,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.BeforeClass; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + /** * This class is only a base for other integration-level replication tests. * Do not add tests here. @@ -63,7 +71,7 @@ public class TestReplicationBase { protected static ZooKeeperWatcher zkw2; protected static ReplicationAdmin admin; - private static Admin hbaseAdmin; + protected static Admin hbaseAdmin; protected static Table htable1; protected static Table htable2; @@ -81,6 +89,7 @@ public class TestReplicationBase { protected static final byte[] famName = Bytes.toBytes("f"); protected static final byte[] row = Bytes.toBytes("row"); protected static final byte[] noRepfamName = Bytes.toBytes("norep"); + protected static final byte[] val = Bytes.toBytes("myval"); /** * @throws java.lang.Exception @@ -174,4 +183,66 @@ public class TestReplicationBase { utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); } + + protected void put(Table source, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Put put = new Put(row); + put.addColumn(fam, row, val); + source.put(put); + } + } + + protected void delete(Table source, byte[] row, byte[]... families) + throws Exception { + for (byte[] fam : families) { + Delete del = new Delete(row); + del.addFamily(fam); + source.delete(del); + } + } + + protected void ensureRowExisted(Table target, byte[] row, byte[]... families) + throws Exception { + + for (byte[] fam : families) { + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = target.get(get); + if (res.isEmpty()) { + LOG.info("Row not available"); + } else { + assertEquals(res.size(), 1); + assertArrayEquals(res.value(), val); + break; + } + Thread.sleep(SLEEP_TIME); + } + } + } + + protected void ensureRowNotExisted(Table target, byte[] row, byte[]... families) + throws Exception { + + for (byte[] fam : families) { + Get get = new Get(row); + get.addFamily(fam); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for delete replication"); + } + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + } else { + break; + } + Thread.sleep(SLEEP_TIME); + } + } + } }