diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index d667269..4439a8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractService; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; /** * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this @@ -68,15 +67,25 @@ public abstract class BaseReplicationEndpoint extends AbstractService @Override public WALEntryFilter getWALEntryfilter() { ArrayList filters = Lists.newArrayList(); + ArrayList cellFilters = Lists.newArrayList(); WALEntryFilter scopeFilter = getScopeWALEntryFilter(); if (scopeFilter != null) { filters.add(scopeFilter); + if (scopeFilter instanceof WALCellFilter) cellFilters.add((WALCellFilter) scopeFilter); } WALEntryFilter tableCfFilter = getTableCfWALEntryFilter(); if (tableCfFilter != null) { filters.add(tableCfFilter); + if (tableCfFilter instanceof WALCellFilter) cellFilters.add((WALCellFilter) tableCfFilter); } - return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); + + if (filters.size() == 0) return null; + if (cellFilters.size() > 0) { + // Scope and TableCF filters first filter entry as part of the ChainWALEntryFilter, + // and if those pass, then further down the chain the cells are filtered via the WALEntryCellFilter + filters.add(new WALEntryCellFilter(cellFilters)); + } + return new ChainWALEntryFilter(filters); } /** Returns a WALEntryFilter for checking the scope. Subclasses can diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 28a83dd..1906efb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -18,102 +18,38 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.NavigableMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Keeps KVs that are scoped other than local */ @InterfaceAudience.Private -public class ScopeWALEntryFilter implements WALEntryFilter { - private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class); +public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { @Override - public Entry filter(Entry entry) { + public Cell filterCell(Entry entry, Cell cell) { NavigableMap scopes = entry.getKey().getReplicationScopes(); - if (scopes == null || scopes.isEmpty()) { + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + byte[] fam = CellUtil.cloneFamily(cell); + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { return null; } - ArrayList cells = entry.getEdit().getCells(); - int size = cells.size(); - byte[] fam; - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // If a bulk load entry has a scope then that means user has enabled replication for bulk load - // hfiles. - // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } - } else { - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - fam = CellUtil.cloneFamily(cell); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - cells.remove(i); - } - } - } - if (cells.size() < size / 2) { - cells.trimToSize(); - } - return entry; + return cell; } - private Cell filterBulkLoadEntries(NavigableMap scopes, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List copiedStoresList = new ArrayList(storesList); - Iterator copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; - } - } - - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { + @Override + public Entry filter(Entry entry) { + NavigableMap scopes = entry.getKey().getReplicationScopes(); + if (scopes == null || scopes.isEmpty()) { return null; } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return entry; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index f10849b..3d399d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -18,120 +18,64 @@ package org.apache.hadoop.hbase.replication; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -public class TableCfWALEntryFilter implements WALEntryFilter { +public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class); - private final ReplicationPeer peer; + private ReplicationPeer peer; public TableCfWALEntryFilter(ReplicationPeer peer) { this.peer = peer; } @Override - public Entry filter(Entry entry) { + public Cell filterCell(Entry entry, Cell cell) { + Map> tableCfs = getTableCfs(); + if (tableCfs == null) return cell; TableName tabName = entry.getKey().getTablename(); - ArrayList cells = entry.getEdit().getCells(); - Map> tableCFs = null; - - try { - tableCFs = this.peer.getTableCFs(); - } catch (IllegalArgumentException e) { - LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + - ", degenerate as if it's not configured by keeping tableCFs==null"); + List cfs = tableCfs.get(tabName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if ((cfs != null) && !cfs.contains( + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { + return null; } - int size = cells.size(); + return cell; + } + + @Override + public Entry filter(Entry entry) { + TableName tabName = entry.getKey().getTablename(); + Map> tableCFs = getTableCfs(); // If null means user has explicitly not configured any table CFs so all the tables data are // applicable for replication - if (tableCFs == null) { - return entry; - } - // return null(prevent replicating) if logKey's table isn't in this peer's - // replicable table list + if (tableCFs == null) return entry; + if (!tableCFs.containsKey(tabName)) { return null; - } else { - List cfs = tableCFs.get(tabName); - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so - // cannot refactor into one now, can revisit and see if any way to unify them. - // Filter bulk load entries separately - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell); - if (filteredBulkLoadEntryCell != null) { - cells.set(i, filteredBulkLoadEntryCell); - } else { - cells.remove(i); - } - } else { - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength()))) { - cells.remove(i); - } - } - } - } - if (cells.size() < size/2) { - cells.trimToSize(); } + return entry; } - private Cell filterBulkLoadEntries(List cfs, Cell cell) { - byte[] fam; - BulkLoadDescriptor bld = null; + Map> getTableCfs() { + Map> tableCFs = null; try { - bld = WALEdit.getBulkLoadDescriptor(cell); - } catch (IOException e) { - LOG.warn("Failed to get bulk load events information from the WAL file.", e); - return cell; - } - List storesList = bld.getStoresList(); - // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList - List copiedStoresList = new ArrayList(storesList); - Iterator copiedStoresListIterator = copiedStoresList.iterator(); - boolean anyStoreRemoved = false; - while (copiedStoresListIterator.hasNext()) { - StoreDescriptor sd = copiedStoresListIterator.next(); - fam = sd.getFamilyName().toByteArray(); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - copiedStoresListIterator.remove(); - anyStoreRemoved = true; - } - } - - if (!anyStoreRemoved) { - return cell; - } else if (copiedStoresList.isEmpty()) { - return null; + tableCFs = this.peer.getTableCFs(); + } catch (IllegalArgumentException e) { + LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + + ", degenerate as if it's not configured by keeping tableCFs==null"); } - BulkLoadDescriptor.Builder newDesc = - BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) - .setEncodedRegionName(bld.getEncodedRegionName()) - .setBulkloadSeqNum(bld.getBulkloadSeqNum()); - newDesc.addAllStores(copiedStoresList); - BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); - return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, - cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + return tableCFs; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java new file mode 100644 index 0000000..958a8ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +/** + * A filter for WAL entry cells before being sent over to replication. + * Used in conjunction with {@link WALEntryCellFilter}. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface WALCellFilter { + + /** + * Applies the filter, possibly returning a different Cell instance. + * If null is returned, the cell will be skipped. + * @param entry Entry which contains the cell + * @param cell Cell to filter + * @return a (possibly modified) Cell to use. Returning null will cause the cell + * to be skipped for replication. + */ + public Cell filterCell(Entry entry, Cell cell); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryCellFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryCellFilter.java new file mode 100644 index 0000000..fe66141 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryCellFilter.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; + +/** + * A {@link WALEntryFilter} that iterates through the cells of each {@link Entry}, + * and calls the registered {@link WALCellFilter}s for each cell. + * + */ +public class WALEntryCellFilter implements WALEntryFilter { + private static final Log LOG = LogFactory.getLog(WALEntryCellFilter.class); + + private WALCellFilter[] filters; + private BulkLoadCellFilter bulkLoadCellFilter; + + public WALEntryCellFilter(WALCellFilter...filters) { + this.filters = filters; + initBulkLoadCellFilter(); + } + + public WALEntryCellFilter(List filters) { + this.filters = filters.toArray(new WALCellFilter[filters.size()]); + initBulkLoadCellFilter(); + } + + // bulk load is a special case because it short circuits other cell filters + private void initBulkLoadCellFilter() { + for (WALCellFilter filter : filters) { + if (filter instanceof TableCfWALEntryFilter) { + bulkLoadCellFilter = new BulkLoadCellFilter((TableCfWALEntryFilter) filter); + break; + } + } + if (bulkLoadCellFilter == null) LOG.warn("Didn't find a TableCfWALEntryFilter for bulk load"); + } + + @Override + public Entry filter(Entry entry) { + ArrayList cells = entry.getEdit().getCells(); + int size = cells.size(); + for (int i = size - 1; i >= 0; i--) { + Cell cell = cells.get(i); + // special case for bulk load + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + if (bulkLoadCellFilter != null) { + cell = bulkLoadCellFilter.filterCell(entry, cell); + replaceOrRemoveCell(cells, i, cell); + } + } + else { //normal filtering + for (WALCellFilter filter : filters) { + cell = filter.filterCell(entry, cell); + if (replaceOrRemoveCell(cells, i, cell)) { + break; + } + } + } + } + if (cells.size() < size / 2) { + cells.trimToSize(); + } + return entry; + } + + private static class BulkLoadCellFilter implements WALCellFilter { + + private TableCfWALEntryFilter tableCfFilter; + + public BulkLoadCellFilter(TableCfWALEntryFilter tableCfFilter) { + this.tableCfFilter = tableCfFilter; + } + + @Override + public Cell filterCell(Entry entry, Cell cell) { + byte[] fam; + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(cell); + } catch (IOException e) { + LOG.warn("Failed to get bulk load events information from the WAL file.", e); + return cell; + } + List storesList = bld.getStoresList(); + // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList + List copiedStoresList = new ArrayList(storesList); + Iterator copiedStoresListIterator = copiedStoresList.iterator(); + boolean anyStoreRemoved = false; + while (copiedStoresListIterator.hasNext()) { + StoreDescriptor sd = copiedStoresListIterator.next(); + fam = sd.getFamilyName().toByteArray(); + + NavigableMap scopes = entry.getKey().getReplicationScopes(); + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } else { + Map> tableCfs = tableCfFilter.getTableCfs(); + if (tableCfs != null) { + List cfs = tableCfs.get(entry.getKey().getTablename()); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + copiedStoresListIterator.remove(); + anyStoreRemoved = true; + } + } + } + } + + if (!anyStoreRemoved) { + return cell; + } else if (copiedStoresList.isEmpty()) { + return null; + } + BulkLoadDescriptor.Builder newDesc = + BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName()) + .setEncodedRegionName(bld.getEncodedRegionName()) + .setBulkloadSeqNum(bld.getBulkloadSeqNum()); + newDesc.addAllStores(copiedStoresList); + BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build(); + return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD, + cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray()); + } + } + + // returns true if cell was removed + private boolean replaceOrRemoveCell(ArrayList cells, int i, Cell cell) { + if (cell != null) { + cells.set(i, cell); + return false; + } else { + cells.remove(i); + return true; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index c906d6a..28325d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -78,7 +78,8 @@ public class TestReplicationWALEntryFilters { @Test public void testScopeWALEntryFilter() { - ScopeWALEntryFilter filter = new ScopeWALEntryFilter(); + ScopeWALEntryFilter scopeWALCellFilter = new ScopeWALEntryFilter(); + WALEntryFilter filter = new ChainWALEntryFilter(scopeWALCellFilter, new WALEntryCellFilter(scopeWALCellFilter)); Entry userEntry = createEntry(null, a, b); Entry userEntryA = createEntry(null, a); @@ -201,14 +202,14 @@ public class TestReplicationWALEntryFilters { when(peer.getTableCFs()).thenReturn(null); Entry userEntry = createEntry(null, a, b, c); - TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer); + WALEntryFilter filter = getTableCfWALFilter(peer); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); // empty map userEntry = createEntry(null, a, b, c); Map> tableCfs = new HashMap>(); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = getTableCfWALFilter(peer); assertEquals(null, filter.filter(userEntry)); // table bar @@ -216,7 +217,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = getTableCfWALFilter(peer); assertEquals(null, filter.filter(userEntry)); // table foo:a @@ -224,7 +225,7 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = getTableCfWALFilter(peer); assertEquals(createEntry(null, a), filter.filter(userEntry)); // table foo:a,c @@ -232,10 +233,16 @@ public class TestReplicationWALEntryFilters { tableCfs = new HashMap>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); - filter = new TableCfWALEntryFilter(peer); + filter = getTableCfWALFilter(peer); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); } + private WALEntryFilter getTableCfWALFilter(ReplicationPeer peer) { + TableCfWALEntryFilter tableCfWALCellFilter = new TableCfWALEntryFilter(peer); + WALEntryFilter filter = new ChainWALEntryFilter(tableCfWALCellFilter, new WALEntryCellFilter(tableCfWALCellFilter)); + return filter; + } + private Entry createEntry(TreeMap scopes, byte[]... kvs) { WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"), scopes); WALEdit edit1 = new WALEdit();