From f96e7f74e94c62a8ff66e3b8009139ecad81e9a6 Mon Sep 17 00:00:00 2001 From: Mohammad Arshad Date: Tue, 5 Dec 2017 03:43:53 +0530 Subject: [PATCH] HBASE-19423:Replication entries are not filtered correctly when replication scope is set through WAL Co-processor --- .../replication/regionserver/Replication.java | 5 +- .../TestWALObserverAsReplicationFilter.java | 159 +++++++++++++++++++++ 2 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index be6f7a7..9e2649a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -303,7 +303,10 @@ public class Replication extends WALActionsListener.Base implements */ public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) throws IOException { - NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + NavigableMap scopes = logKey.getScopes(); + if (scopes == null) { + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + } byte[] family; boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); for (Cell cell : logEdit.getCells()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java new file mode 100644 index 0000000..8829027 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseWALObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestWALObserverAsReplicationFilter { + private static final Log LOG = LogFactory.getLog(TestWALObserverAsReplicationFilter.class); + private static HBaseTestingUtility TEST_UTIL; + private static HBaseTestingUtility TEST_UTIL_PEER; + private static Configuration conf; + private static Configuration conf_peer; + private static ReplicationAdmin repAdmin; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + conf.set(HConstants.REPLICATION_CLUSTER_ID, "WALObserverAsReplicationFilterCluster"); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + ReplicationFilterWALCoprocessor.class.getName()); + TEST_UTIL = new HBaseTestingUtility(conf); + + conf_peer = HBaseConfiguration.create(conf); + conf_peer.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + TEST_UTIL_PEER = new HBaseTestingUtility(conf_peer); + + TEST_UTIL.startMiniCluster(); + TEST_UTIL_PEER.startMiniCluster(); + repAdmin = new ReplicationAdmin(conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + @Test(timeout = 300000) + public void testWALObserverAsReplicationCellFilter() throws Exception { + TableName tableName = TableName.valueOf("RepFilterTable"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("f1")); + htd.addFamily(new HColumnDescriptor("f2")); + TEST_UTIL.getHBaseAdmin().createTable(htd); + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + peerConfig.setClusterKey(TEST_UTIL_PEER.getClusterKey()); + repAdmin.addPeer("peer1", peerConfig, null); + repAdmin.enableTableRep(tableName); + + Table tbl = TEST_UTIL.getConnection().getTable(tableName); + Put put1 = + new Put("row1".getBytes()).addColumn("f1".getBytes(), "c1".getBytes(), "v1".getBytes()); + tbl.put(put1); + Put put2 = + new Put("row2".getBytes()).addColumn("f2".getBytes(), "c2".getBytes(), "v2".getBytes()); + tbl.put(put2); + Table tbl_peer = TEST_UTIL_PEER.getConnection().getTable(tableName); + /** + * Written two rows but only one row is expected to be replicated as one row will be filter by + * ReplicationFilterWALCoprocessor + */ + int expectedCount = 1; + verifyCount(TEST_UTIL_PEER, tbl_peer, expectedCount); + } + + private void verifyCount(HBaseTestingUtility utility, Table target, int expectedCount) + throws IOException, InterruptedException { + int count = 0; + int tries = 30; + for (int i = 0; i < tries; i++) { + if (i == tries - 1 || count > expectedCount) { + fail("Replicated data count is wrong. Current count=" + count + ", expected count=" + + expectedCount); + } + count = utility.countRows(target); + if (count != expectedCount) { + LOG.info("Waiting more time for bulkloaded data replication."); + Thread.sleep(1000); + } else { + break; + } + } + } + + // Filter all family f2 rows + public static class ReplicationFilterWALCoprocessor extends BaseWALObserver { + @Override + public boolean preWALWrite(ObserverContext ctx, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + ArrayList cells = logEdit.getCells(); + for (Cell cell : cells) { + byte[] fam = CellUtil.cloneFamily(cell); + if ("f2".equals(Bytes.toString(fam))) { + NavigableMap scopes = logKey.getScopes(); + if (scopes == null) { + logKey.setScopes(new TreeMap(Bytes.BYTES_COMPARATOR)); + } + logKey.getScopes().put(fam, HConstants.REPLICATION_SCOPE_LOCAL); + } + } + return false; + } + } +} -- 2.7.1.windows.2