From 257876786bd33f1a5fd394c6756872dddf22b46e Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 18 Mar 2019 16:14:08 -0400 Subject: [PATCH] HBASE-22057 Cap the number of nodes we delete in one ZK.multi call If we try to delete too many znodes at once, we'll smack into the jute.maxbuffer size. Try to prevent that from happening. --- .../hbase/zookeeper/RecoverableZooKeeper.java | 19 +++--- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 35 ++++++++--- .../hadoop/hbase/zookeeper/TestZKMulti.java | 60 +++++++++++++++---- 3 files changed, 87 insertions(+), 27 deletions(-) diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index c23e3d2254..9699872f2a 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -82,18 +82,13 @@ public class RecoverableZooKeeper { private Watcher watcher; private int sessionTimeout; private String quorumServers; - - public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime) - throws IOException { - this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime, - null); - } + private int maxMultiBatchSize; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", justification="None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier) + Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, + int maxMultiBatchSize) throws IOException { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.retryCounterFactory = @@ -111,6 +106,7 @@ public class RecoverableZooKeeper { this.watcher = watcher; this.sessionTimeout = sessionTimeout; this.quorumServers = quorumServers; + this.maxMultiBatchSize = maxMultiBatchSize; try { checkZk(); @@ -119,6 +115,13 @@ public class RecoverableZooKeeper { } } + /** + * Returns the maximum number of operations that should be included in any single multi() call. + */ + public int getMaxMultiBatchSize() { + return maxMultiBatchSize; + } + /** * Try to create a ZooKeeper connection. Turns any exception encountered into a * KeeperException.OperationTimeoutException so it can retried. diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 57c847c0fe..6f38ea9b03 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -71,7 +71,7 @@ import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperSaslServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -135,8 +135,9 @@ public final class ZKUtil { int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", 1000); + int multiMaxBatchSize = conf.getInt("zookeeper.multi.max.batch.size", 500); return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis, maxSleepTime, identifier); + retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxBatchSize); } /** @@ -1333,10 +1334,7 @@ public final class ZKUtil { ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); } } - // atleast one element should exist - if (ops.size() > 0) { - multiOrSequential(zkw, ops, runSequentialOnMultiFailure); - } + submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); } /** @@ -1392,8 +1390,29 @@ public final class ZKUtil { zkw.interruptedException(e); } } - // atleast one element should exist - if (ops.size() > 0) { + submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); + } + + /** + * Chunks the provided {@code ops} when the number given exceeds the the configured limit. Take + * caution that this can ONLY be used for operations where atomicity is not important, + * e.g. deletions. It must not be used when atomicity of the operations is critical. + */ + static void submitBatchedMultiOrSequential(ZKWatcher zkw, boolean runSequentialOnMultiFailure, + List ops) throws KeeperException { + // at least one element should exist + if (ops.isEmpty()) { + return; + } + final int maxMultiBatchSize = zkw.getRecoverableZooKeeper().getMaxMultiBatchSize(); + if (ops.size() > maxMultiBatchSize) { + // Batch up the items to over smashing through jute.maxbuffer with too many Ops. + final List> batchedOps = Lists.partition(ops, maxMultiBatchSize); + // Would use forEach() but have to handle KeeperException + for (List batch : batchedOps) { + multiOrSequential(zkw, batch, runSequentialOnMultiFailure); + } + } else { multiOrSequential(zkw, ops, runSequentialOnMultiFailure); } } diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java index 0f2472ba9e..77afa478fa 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java @@ -57,21 +57,23 @@ public class TestZKMulti { private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); private static ZKWatcher zkw = null; + private static class ZKMultiAbortable implements Abortable { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); - Abortable abortable = new Abortable() { - @Override - public void abort(String why, Throwable e) { - LOG.info(why, e); - } - - @Override - public boolean isAborted() { - return false; - } - }; + Abortable abortable = new ZKMultiAbortable(); zkw = new ZKWatcher(conf, "TestZKMulti", abortable, true); } @@ -368,6 +370,42 @@ public class TestZKMulti { assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size()); } + @Test + public void testBatchedDeletesOfWideZNodes() throws Exception { + final int batchSize = 10; + Configuration localConf = new Configuration(TEST_UTIL.getConfiguration()); + localConf.setInt("zookeeper.multi.max.batch.size", batchSize); + try (ZKWatcher customZkw = new ZKWatcher(localConf, + "TestZKMulti_Custom", new ZKMultiAbortable(), true)) { + + final String parent1 = "/batchedDeletes1"; + final String parent2 = "/batchedDeletes2"; + final byte[] EMPTY_BYTES = new byte[0]; + + // Write one node + List ops = new ArrayList<>(); + ops.add(Op.create(parent1, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + for (int i = 0; i < batchSize * 2; i++) { + ops.add(Op.create( + parent1 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + } + customZkw.getRecoverableZooKeeper().multi(ops); + + // Write into a second node + ops.clear(); + ops.add(Op.create(parent2, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + for (int i = 0; i < batchSize * 4; i++) { + ops.add(Op.create( + parent2 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + } + customZkw.getRecoverableZooKeeper().multi(ops); + + // These should return successfully + ZKUtil.deleteChildrenRecursively(customZkw, parent1); + ZKUtil.deleteChildrenRecursively(customZkw, parent2); + } + } + private void createZNodeTree(String rootZNode) throws KeeperException, InterruptedException { List opList = new ArrayList<>(); -- 2.18.0