From 485614e5895c8e58ce84e06a27b6d29aee02ceb0 Mon Sep 17 00:00:00 2001 From: Mohammad Arshad Date: Wed, 6 Dec 2017 23:55:45 +0530 Subject: [PATCH] HBASE-19423:Replication entries are not filtered correctly when replication scope is set through WAL Co-processor --- .../TestWALObserverAsReplicationFilter.java | 160 +++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java new file mode 100644 index 0000000..d6d0dd4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestWALObserverAsReplicationFilter.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Optional; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; +import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.WALObserver; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestWALObserverAsReplicationFilter { + private static final Log LOG = LogFactory.getLog(TestWALObserverAsReplicationFilter.class); + private static HBaseTestingUtility TEST_UTIL; + private static HBaseTestingUtility TEST_UTIL_PEER; + private static Configuration conf; + private static Configuration conf_peer; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + conf.set(HConstants.REPLICATION_CLUSTER_ID, "sourceCluster"); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + ReplicationFilterWALCoprocessor.class.getName()); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); + + conf_peer = HBaseConfiguration.create(conf); + conf_peer.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf_peer.set(HConstants.REPLICATION_CLUSTER_ID, "peerCluster"); + TEST_UTIL_PEER = new HBaseTestingUtility(conf_peer); + TEST_UTIL_PEER.setZkCluster(miniZK); + + TEST_UTIL.startMiniCluster(); + TEST_UTIL_PEER.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + @Test(timeout = 300000) + public void testWALObserverAsReplicationCellFilter() throws Exception { + TableName tableName = TableName.valueOf("RepFilterTable"); + TableDescriptorBuilder newBuilder = TableDescriptorBuilder.newBuilder(tableName); + newBuilder.addColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")); + newBuilder.addColumnFamily(ColumnFamilyDescriptorBuilder.of("f2")); + TableDescriptor htd = newBuilder.build(); + TEST_UTIL.getAdmin().createTable(htd); + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + peerConfig.setClusterKey(TEST_UTIL_PEER.getClusterKey()); + TEST_UTIL.getAdmin().addReplicationPeer("peer1", peerConfig); + TEST_UTIL.getAdmin().enableTableReplication(tableName); + Table tbl = TEST_UTIL.getConnection().getTable(tableName); + Put put1 = + new Put("row1".getBytes()).addColumn("f1".getBytes(), "c1".getBytes(), "v1".getBytes()); + tbl.put(put1); + Put put2 = + new Put("row2".getBytes()).addColumn("f2".getBytes(), "c2".getBytes(), "v2".getBytes()); + tbl.put(put2); + Table tbl_peer = TEST_UTIL_PEER.getConnection().getTable(tableName); + /** + * Written two rows but only one row is expected to be replicated as one row will be filter by + * ReplicationFilterWALCoprocessor + */ + int expectedCount = 1; + verifyCount(TEST_UTIL_PEER, tbl_peer, expectedCount); + } + + private void verifyCount(HBaseTestingUtility utility, Table target, int expectedCount) + throws IOException, InterruptedException { + int count = 0; + int tries = 30; + for (int i = 0; i < tries; i++) { + if (i == tries - 1 || count > expectedCount) { + fail("Replicated data count is wrong. Current count=" + count + ", expected count=" + + expectedCount); + } + count = utility.countRows(target); + if (count != expectedCount) { + LOG.info("Waiting more time for bulkloaded data replication."); + Thread.sleep(1000); + } else { + break; + } + } + } + + // Filter all family f2 rows + public static class ReplicationFilterWALCoprocessor implements WALObserver, WALCoprocessor { + @Override + public void preWALWrite(ObserverContext ctx, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + ArrayList cells = logEdit.getCells(); + for (Cell cell : cells) { + byte[] fam = CellUtil.cloneFamily(cell); + if ("f2".equals(Bytes.toString(fam))) { + logKey.getReplicationScopes().put(fam, HConstants.REPLICATION_SCOPE_LOCAL); + } + } + } + + @Override + public Optional getWALObserver() { + return Optional.of(this); + } + } +} -- 2.7.1.windows.2