From bcd34d826d55c5e77dad669200ed938a492bcb35 Mon Sep 17 00:00:00 2001 From: Geoffrey Date: Thu, 26 Jan 2017 17:06:28 -0800 Subject: [PATCH] HBASE-17543 - Create additional ReplicationEndpoint WALEntryFilters by configuration --- .../hbase/replication/BaseReplicationEndpoint.java | 15 +++++++++++++ .../hbase/replication/TestReplicationEndpoint.java | 25 +++++++++++++++++++--- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 48f3ac5..63b1f41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -39,6 +39,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class); + public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "replication.source.custom.walentryfilters"; protected Context ctx; @Override @@ -76,6 +77,20 @@ public abstract class BaseReplicationEndpoint extends AbstractService if (tableCfFilter != null) { filters.add(tableCfFilter); } + if (ctx != null && ctx.getPeerConfig() != null) { + String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterNameCSV != null && !filterNameCSV.isEmpty()) { + String[] filterNames = filterNameCSV.split(","); + for (String filterName : filterNames) { + try { + Class clazz = Class.forName(filterName); + filters.add((WALEntryFilter) clazz.newInstance()); + } catch (Exception e) { + LOG.error("Unable to create WALEntryFilter " + filterName, e); + } + } + } + } return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index f9c467e..537b5c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -239,9 +239,13 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { - admin.addPeer("testWALEntryFilterFromReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); + ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()); + //test that we can create a WALFilter reflectively, and that a malformed class name won't stop the entire + //endpoint from being created + rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY, + EverythingPassesWALEntryFilter.class.getName() + "," + "org.apache.hbase.iAmNotaRealWalEntryFilter"); + admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { doPut(connection, Bytes.toBytes("row1")); @@ -257,6 +261,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get()); + //make sure our reflectively created filter is in the filter chain + Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry()); admin.removePeer("testWALEntryFilterFromReplicationEndpoint"); } @@ -488,4 +494,17 @@ public class TestReplicationEndpoint extends TestReplicationBase { }); } } + + public static class EverythingPassesWALEntryFilter implements WALEntryFilter { + private static boolean passedEntry = false; + @Override + public Entry filter(Entry entry) { + passedEntry = true; + return entry; + } + + public static boolean hasPassedAnEntry(){ + return passedEntry; + } + } } -- 2.9.0