From 8a0bda404c0a2e5c917ac376821242f311ca1f27 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Wed, 23 Dec 2015 22:46:39 +0530 Subject: [PATCH 1/1] HBASe-14938 Limit to and fro requests size from ZK in bulk loaded hfile replication --- .../replication/ReplicationQueuesClientZKImpl.java | 2 + .../hbase/replication/ReplicationQueuesZKImpl.java | 101 +++++++++++++++++++-- .../hbase/replication/ReplicationStateZKBase.java | 4 + .../replication/TestReplicationStateZKImpl.java | 1 + 4 files changed, 100 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index cc407e3..f98c2a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -109,6 +109,8 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem return result; } + // TODO Control the number of nodes returned from ZK in a single request once ZOOKEEPER-2260 fix + // is available. For details refer HBASE-14938 @Override public List getReplicableHFiles(String peerId) throws KeeperException { String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 43dd412..08674bf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -71,9 +71,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + private int maxZnodesPerRequest; + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); + maxZnodesPerRequest = + conf.getInt(REPLICATION_MAX_ZNODES_PER_REQUEST, DEFAULT_REPLICATION_MAX_ZNODES_PER_REQUEST); } @Override @@ -448,11 +452,52 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R if (debugEnabled) { LOG.debug("Adding hfile references " + files + " in queue " + peerZnode); } - List listOfOps = new ArrayList(); - int size = files.size(); - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)), + int totalNoOfFiles = files.size(); + if (totalNoOfFiles > maxZnodesPerRequest) { + controlNoOfHFileRefsToAddInZKPerReq(files, peerZnode, debugEnabled, totalNoOfFiles); + } else { + List listOfOps = new ArrayList(totalNoOfFiles); + for (int i = 0; i < totalNoOfFiles; i++) { + listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)), + HConstants.EMPTY_BYTE_ARRAY)); + } + addHFileRefs(peerZnode, debugEnabled, listOfOps); + } + } + + /* + * Controls the number of files to be added to ZK in a single request. + */ + private void controlNoOfHFileRefsToAddInZKPerReq(List files, String peerZnode, + boolean debugEnabled, int totalNoOfFiles) throws ReplicationException { + int totalNoOfRequests = totalNoOfFiles / maxZnodesPerRequest; + int fromIndex = 0; + int toIndex = maxZnodesPerRequest; + List listOfOps = new ArrayList(maxZnodesPerRequest); + for (int i = 0; i < totalNoOfRequests; i++) { + while (fromIndex < toIndex) { + listOfOps.add(ZKUtilOp.createAndFailSilent( + ZKUtil.joinZNode(peerZnode, files.get(fromIndex)), HConstants.EMPTY_BYTE_ARRAY)); + fromIndex++; + } + addHFileRefs(peerZnode, debugEnabled, listOfOps); + toIndex += maxZnodesPerRequest; + listOfOps.clear(); + } + + // Add remaining files to the list + while (fromIndex < totalNoOfFiles) { + listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(fromIndex)), HConstants.EMPTY_BYTE_ARRAY)); + fromIndex++; + } + addHFileRefs(peerZnode, debugEnabled, listOfOps); + } + + private void addHFileRefs(String peerZnode, boolean debugEnabled, List listOfOps) + throws ReplicationException { + if (listOfOps.isEmpty()) { + return; } if (debugEnabled) { LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode @@ -472,10 +517,50 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R if (debugEnabled) { LOG.debug("Removing hfile references " + files + " from queue " + peerZnode); } - List listOfOps = new ArrayList(); - int size = files.size(); - for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)))); + int totalNoOfFiles = files.size(); + if (totalNoOfFiles > maxZnodesPerRequest) { + controlNoOfHFileRefsToRemoveFromZKPerReq(files, peerZnode, debugEnabled, totalNoOfFiles); + } else { + List listOfOps = new ArrayList(totalNoOfFiles); + for (int i = 0; i < totalNoOfFiles; i++) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)))); + } + removeHFileRefs(peerZnode, debugEnabled, listOfOps); + } + } + + /* + * Controls the number of files to be removed from ZK in a single request. + */ + private void controlNoOfHFileRefsToRemoveFromZKPerReq(List files, String peerZnode, + boolean debugEnabled, int totalNoOfFiles) { + int totalNoOfRequests = totalNoOfFiles / maxZnodesPerRequest; + int fromIndex = 0; + int toIndex = maxZnodesPerRequest; + List listOfOps = new ArrayList(maxZnodesPerRequest); + for (int i = 0; i < totalNoOfRequests; i++) { + while (fromIndex < toIndex) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, + files.get(fromIndex)))); + fromIndex++; + } + removeHFileRefs(peerZnode, debugEnabled, listOfOps); + toIndex += maxZnodesPerRequest; + listOfOps.clear(); + } + + // Add remaining files to the list + while (fromIndex < totalNoOfFiles) { + listOfOps + .add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(fromIndex)))); + fromIndex++; + } + removeHFileRefs(peerZnode, debugEnabled, listOfOps); + } + + private void removeHFileRefs(String peerZnode, boolean debugEnabled, List listOfOps) { + if (listOfOps.isEmpty()) { + return; } if (debugEnabled) { LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 762167f..18d1d3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -66,6 +66,10 @@ public abstract class ReplicationStateZKBase { public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = "zookeeper.znode.replication.hfile.refs"; public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + /** Maximum number of znodes to be processed in a single request to Zookeeper */ + protected static final String REPLICATION_MAX_ZNODES_PER_REQUEST = + "hbase.replication.max.znodes.per.request"; + protected static final int DEFAULT_REPLICATION_MAX_ZNODES_PER_REQUEST = 5000; public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, Abortable abortable) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 3b7402a..91bdc4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -65,6 +65,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { utility.startMiniZKCluster(); conf = utility.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); + conf.setInt(ReplicationStateZKBase.REPLICATION_MAX_ZNODES_PER_REQUEST, 2); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); -- 1.9.1