From 756856c2a8053755592c03886b4cae345bd77ca7 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 22 Dec 2018 17:51:05 +0530 Subject: [PATCH] HBASE-9888 HBase replicates edits written before the replication peer is created --- .../org/apache/hadoop/hbase/HConstants.java | 5 + .../hbase/replication/ReplicationPeer.java | 8 +- .../replication/ReplicationPeerImpl.java | 30 ++++ .../hbase/replication/ReplicationPeers.java | 22 ++- .../replication/ZKReplicationPeerStorage.java | 2 +- .../WALKeyWriteTimeBasedFilter.java | 44 ++++++ .../regionserver/ReplicationSource.java | 21 +++ .../TestWALKeyWriteTimeBasedFilter.java | 129 ++++++++++++++++++ 8 files changed, 257 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALKeyWriteTimeBasedFilter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALKeyWriteTimeBasedFilter.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7aa149445d..34b84f5811 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1294,6 +1294,11 @@ public final class HConstants { /** Maximum number of threads used by the replication source for shipping edits to the sinks */ public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; + /** Config parameter to enable/disable wal key write time filter */ + public static final String HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED = + "hbase.replication.walKeyWriteTime.filter.enabled"; + public static final boolean DEFAULT_HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED = false; + /** Configuration key for SplitLog manager timeout */ public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 2da3cce940..31d7c911bf 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -104,4 +104,10 @@ public interface ReplicationPeer { default void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { registerPeerConfigListener(listener); } -} \ No newline at end of file + + /** + * Returns the peer creation time. + * @return creation time of replication peer + */ + long getPeerCreationTime(); +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index d656466572..61e270fb64 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -23,7 +23,11 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; @InterfaceAudience.Private public class ReplicationPeerImpl implements ReplicationPeer { @@ -38,6 +42,9 @@ public class ReplicationPeerImpl implements ReplicationPeer { private final List peerConfigListeners; + // initialize the node creation time to be used for comparison with WALKey writeTime + private long peerCreationTime = Long.MIN_VALUE; + /** * Constructor that takes all the objects required to communicate with the specified peer, except * for the region server addresses. @@ -122,4 +129,27 @@ public class ReplicationPeerImpl implements ReplicationPeer { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { this.peerConfigListeners.add(listener); } + + @Override + public long getPeerCreationTime() { + return this.peerCreationTime; + } + + /** + * Initialize peer creation time. + * @param zookeeper ZooKeeperWatcher + * @param peerNode Peer znode + * @throws ReplicationException on failure + */ + void initPeerCreationTime(final ZKWatcher zookeeper, String peerNode) + throws ReplicationException { + final Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(zookeeper, peerNode, stat); + this.peerCreationTime = stat.getCtime(); + } catch (KeeperException e) { + throw new ReplicationException( + "Exception occured while reading the create time for peer=" + peerNode, e); + } + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index e754e387d4..53f73b9842 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -36,6 +37,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class ReplicationPeers { + private ZKWatcher zookeeper; private final Configuration conf; // Map of peer clusters keyed by their id @@ -43,6 +45,7 @@ public class ReplicationPeers { private final ReplicationPeerStorage peerStorage; ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; this.conf = conf; this.peerCache = new ConcurrentHashMap<>(); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); @@ -134,7 +137,22 @@ public class ReplicationPeers { private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), - peerId, enabled, peerConfig); + ReplicationPeerImpl peerImpl = + new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), + peerId, enabled, peerConfig); + peerImpl.initPeerCreationTime(zookeeper, getPeerNodePath(peerId)); + + return peerImpl; + } + + /** + * Retrieve the peer znode path + * @param peerId Peer ID + * @return Peer znode path + */ + private String getPeerNodePath(String peerId) { + ZKReplicationPeerStorage zkReplicationPeerStorage = + new ZKReplicationPeerStorage(zookeeper, conf); + return ZNodePaths.joinZNode(zkReplicationPeerStorage.peersZNode, peerId); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index bbe6549873..0da952507c 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -60,7 +60,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase /** * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ - private final String peersZNode; + protected final String peersZNode; public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { super(zookeeper, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALKeyWriteTimeBasedFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALKeyWriteTimeBasedFilter.java new file mode 100644 index 0000000000..c66bd19b73 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALKeyWriteTimeBasedFilter.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This WALKEy writeTime based filter can be used to make sure replication source replicates only + * those WAL entries for which WalKey writeTime is after the creation time of replication source + */ +@InterfaceAudience.Private +public class WALKeyWriteTimeBasedFilter implements WALEntryFilter { + private long baseWriteTime = Long.MIN_VALUE; + + public WALKeyWriteTimeBasedFilter(long currentTime) { + this.baseWriteTime = currentTime; + } + + @Override + public Entry filter(Entry entry) { + final long writeTime = entry.getKey().getWriteTime(); + // when time is equal then allow replication? + if (writeTime < baseWriteTime) { + return null; + } + return entry; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 04a54e0cf2..584b0acf7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.WALKeyWriteTimeBasedFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -288,6 +289,15 @@ public class ReplicationSource implements ReplicationSourceInterface { // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList filters = Lists. newArrayList(new SystemTableWALEntryFilter()); + + // add filter based on WALKey timestamp so that we do not replicate anything that was + // added before this ReplicationSource is created + final WALEntryFilter walKeyWriteTimeBasedFilter = initWALKeyWriteTimeFilter(); + if (walKeyWriteTimeBasedFilter != null) { + filters.add(walKeyWriteTimeBasedFilter); + LOG.info("WALKey writeTime based filter for replication is enabled."); + } + WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); if (filterFromEndpoint != null) { filters.add(filterFromEndpoint); @@ -296,6 +306,17 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } + /** + * Add filter based on WALKey timestamp + */ + private WALEntryFilter initWALKeyWriteTimeFilter() { + if (this.conf.getBoolean(HConstants.HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED, + HConstants.DEFAULT_HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED)) { + return new WALKeyWriteTimeBasedFilter(this.replicationPeer.getPeerCreationTime()); + } + return null; + } + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALKeyWriteTimeBasedFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALKeyWriteTimeBasedFilter.java new file mode 100644 index 0000000000..4b74800ba8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALKeyWriteTimeBasedFilter.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ReplicationTests.class }) +public class TestWALKeyWriteTimeBasedFilter extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestWALKeyWriteTimeBasedFilter.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.HBASE_REPLICATION_WAL_KEY_WRITE_TIME_FILTER_ENABLED, true); + TestReplicationBase.setUpBeforeClass(); + } + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALKeyWriteTimeBasedFilter.class); + + @Test(timeout = 300000) + public void testOldTableDataReplicationUponPeerReAddition() + throws InterruptedException, IOException, ReplicationException { + // Put records in source table + Put put = new Put(row); + put.addColumn(famName, "c1".getBytes(), "v1".getBytes()); + htable1.put(put); + put = new Put("row2".getBytes()); + put.addColumn(famName, "c1".getBytes(), "v2".getBytes()); + htable1.put(put); + + // wait for above put to replicate in peer cluster + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication."); + } + int count = utility2.countRows(htable2); + if (count < 2) { + LOG.debug("Rows are not yet replicated to peer cluster table"); + Thread.sleep(500); + } else { + assertTrue("Replicated rows count don't match with source cluster table row count", + count == 2); + break; + } + } + + // Remove peer + hbaseAdmin.removeReplicationPeer(PEER_ID2); + + // Truncate the table from source cluster + try (Connection conn1 = ConnectionFactory.createConnection(conf1); + Admin admin1 = conn1.getAdmin()) { + admin1.disableTable(tableName); + admin1.truncateTable(tableName, false); + } + assertTrue("Table should be truncated from source cluster", utility1.countRows(htable1) == 0); + + // Truncate the table from peer cluster + try (Connection conn2 = ConnectionFactory.createConnection(conf2); + Admin admin2 = conn2.getAdmin()) { + admin2.disableTable(tableName); + admin2.truncateTable(tableName, false); + } + assertTrue("Table should be truncated from peer cluster", utility2.countRows(htable2) == 0); + + // add peer again + ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build(); + hbaseAdmin.addReplicationPeer(PEER_ID2, peerConfig); + + // add a new row to source cluster table + put = new Put("row3".getBytes()); + put.addColumn(famName, "c1".getBytes(), "v33".getBytes()); + htable1.put(put); + assertTrue("table data should be deleted from source", utility1.countRows(htable1) == 1); + + // Wait for replication so that actual row count should be 1 + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + int count = utility2.countRows(htable2); + if (count < 1) { + LOG.debug("Rows are not yet replicated to peer cluster table"); + Thread.sleep(5000); + } else { + assertTrue("Replicated rows count don't match with source cluster table row count", + count == 1); + break; + } + } + } +} \ No newline at end of file -- 2.17.1