From 375ebbf6ce4d713e548a06d0f3187c0fde2fcaa8 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 18 Mar 2019 16:14:08 -0400 Subject: [PATCH] HBASE-22057 Cap the size of the nodes we delete in one ZK.multi call If we try to delete too many znodes at once, we'll smack into the jute.maxbuffer size. Try to prevent that from happening. The dominating factor of the ZK client request should be the znode side on a delete. --- .../hbase/zookeeper/RecoverableZooKeeper.java | 23 +++-- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 76 ++++++++++++--- .../hadoop/hbase/zookeeper/TestZKMulti.java | 95 ++++++++++++++++--- 3 files changed, 162 insertions(+), 32 deletions(-) diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index c23e3d2254..757889b751 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -82,18 +82,13 @@ public class RecoverableZooKeeper { private Watcher watcher; private int sessionTimeout; private String quorumServers; - - public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime) - throws IOException { - this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime, - null); - } + private int maxMultiSize; @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", justification="None. Its always been this way.") public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier) + Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, + int maxMultiSize) throws IOException { // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. this.retryCounterFactory = @@ -111,6 +106,7 @@ public class RecoverableZooKeeper { this.watcher = watcher; this.sessionTimeout = sessionTimeout; this.quorumServers = quorumServers; + this.maxMultiSize = maxMultiSize; try { checkZk(); @@ -119,6 +115,17 @@ public class RecoverableZooKeeper { } } + /** + * Returns the maximum size (in bytes) that should be included in any single multi() call. + * + * NB: This is an approximation, so there may be variance in the msg actually sent over the + * wire. Please be sure to set this approximately, with respect to your ZK server configuration + * for jute.maxbuffer. + */ + public int getMaxMultiSizeLimit() { + return maxMultiSize; + } + /** * Try to create a ZooKeeper connection. Turns any exception encountered into a * KeeperException.OperationTimeoutException so it can retried. diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 57c847c0fe..e4cc30afad 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -31,11 +31,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; + import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; @@ -43,6 +46,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent; @@ -51,6 +56,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -72,11 +78,6 @@ import org.apache.zookeeper.server.ZooKeeperSaslServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; - /** * Internal HBase utility class for ZooKeeper. * @@ -135,8 +136,9 @@ public final class ZKUtil { int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000); zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout", 1000); + int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024); return new RecoverableZooKeeper(ensemble, timeout, watcher, - retry, retryIntervalMillis, maxSleepTime, identifier); + retry, retryIntervalMillis, maxSleepTime, identifier, multiMaxSize); } /** @@ -1333,10 +1335,7 @@ public final class ZKUtil { ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i))); } } - // atleast one element should exist - if (ops.size() > 0) { - multiOrSequential(zkw, ops, runSequentialOnMultiFailure); - } + submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); } /** @@ -1392,12 +1391,63 @@ public final class ZKUtil { zkw.interruptedException(e); } } - // atleast one element should exist - if (ops.size() > 0) { - multiOrSequential(zkw, ops, runSequentialOnMultiFailure); + submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops); + } + + /** + * Chunks the provided {@code ops} when their approximate size exceeds the the configured limit. + * Take caution that this can ONLY be used for operations where atomicity is not important, + * e.g. deletions. It must not be used when atomicity of the operations is critical. + */ + static void submitBatchedMultiOrSequential(ZKWatcher zkw, boolean runSequentialOnMultiFailure, + List ops) throws KeeperException { + // at least one element should exist + if (ops.isEmpty()) { + return; + } + final int maxMultiSize = zkw.getRecoverableZooKeeper().getMaxMultiSizeLimit(); + // Batch up the items to over smashing through jute.maxbuffer with too many Ops. + final List> batchedOps = partitionOps(ops, maxMultiSize); + // Would use forEach() but have to handle KeeperException + for (List batch : batchedOps) { + multiOrSequential(zkw, batch, runSequentialOnMultiFailure); } } + /** + * Partition the list of {@code ops} by size (using {@link #estimateSize(ZKUtilOp)}). + */ + static List> partitionOps(List ops, int maxPartitionSize) { + List> partitionedOps = new ArrayList<>(); + List currentPartition = new ArrayList<>(); + int currentPartitionSize = 0; + partitionedOps.add(currentPartition); + Iterator iter = ops.iterator(); + while (iter.hasNext()) { + ZKUtilOp currentOp = iter.next(); + int currentOpSize = estimateSize(currentOp); + + // Roll a new partition if necessary + // If the current partition is empty, put the element in there anyways. + // We can roll a new partition if we get another element + if (!currentPartition.isEmpty() && currentOpSize + currentPartitionSize > maxPartitionSize) { + currentPartition = new ArrayList<>(); + partitionedOps.add(currentPartition); + currentPartitionSize = 0; + } + + // Add the current op to the partition + currentPartition.add(currentOp); + // And record its size + currentPartitionSize += currentOpSize; + } + return partitionedOps; + } + + static int estimateSize(ZKUtilOp op) { + return Bytes.toBytes(op.getPath()).length; + } + /** * BFS Traversal of all the children under path, with the entries in the list, * in the same order as that of the traversal. Lists all the children without diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java index 0f2472ba9e..5508ac7dac 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.zookeeper; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -57,21 +61,23 @@ public class TestZKMulti { private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); private static ZKWatcher zkw = null; + private static class ZKMultiAbortable implements Abortable { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); Configuration conf = TEST_UTIL.getConfiguration(); - Abortable abortable = new Abortable() { - @Override - public void abort(String why, Throwable e) { - LOG.info(why, e); - } - - @Override - public boolean isAborted() { - return false; - } - }; + Abortable abortable = new ZKMultiAbortable(); zkw = new ZKWatcher(conf, "TestZKMulti", abortable, true); } @@ -368,6 +374,73 @@ public class TestZKMulti { assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size()); } + @Test + public void testBatchedDeletesOfWideZNodes() throws Exception { + // Batch every 50bytes + final int batchSize = 50; + Configuration localConf = new Configuration(TEST_UTIL.getConfiguration()); + localConf.setInt("zookeeper.multi.max.size", batchSize); + try (ZKWatcher customZkw = new ZKWatcher(localConf, + "TestZKMulti_Custom", new ZKMultiAbortable(), true)) { + + // With a parent znode like this, we'll get batches of 2-3 elements + final String parent1 = "/batchedDeletes1"; + final String parent2 = "/batchedDeletes2"; + final byte[] EMPTY_BYTES = new byte[0]; + + // Write one node + List ops = new ArrayList<>(); + ops.add(Op.create(parent1, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + for (int i = 0; i < batchSize * 2; i++) { + ops.add(Op.create( + parent1 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + } + customZkw.getRecoverableZooKeeper().multi(ops); + + // Write into a second node + ops.clear(); + ops.add(Op.create(parent2, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + for (int i = 0; i < batchSize * 4; i++) { + ops.add(Op.create( + parent2 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + } + customZkw.getRecoverableZooKeeper().multi(ops); + + // These should return successfully + ZKUtil.deleteChildrenRecursively(customZkw, parent1); + ZKUtil.deleteChildrenRecursively(customZkw, parent2); + } + } + + @Test + public void testListPartitioning() { + // 10 Bytes + ZKUtilOp tenByteOp = ZKUtilOp.deleteNodeFailSilent("/123456789"); + + // Simple, single element case + assertEquals(Collections.singletonList(Collections.singletonList(tenByteOp)), + ZKUtil.partitionOps(Collections.singletonList(tenByteOp), 15)); + + // Simple case where we exceed the limit, but must make the list + assertEquals(Collections.singletonList(Collections.singletonList(tenByteOp)), + ZKUtil.partitionOps(Collections.singletonList(tenByteOp), 5)); + + // Each gets its own bucket + assertEquals( + Arrays.asList(Arrays.asList(tenByteOp), Arrays.asList(tenByteOp), Arrays.asList(tenByteOp)), + ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 15)); + + // Test internal boundary + assertEquals( + Arrays.asList(Arrays.asList(tenByteOp,tenByteOp), Arrays.asList(tenByteOp)), + ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 20)); + + // Plenty of space for one partition + assertEquals( + Arrays.asList(Arrays.asList(tenByteOp, tenByteOp, tenByteOp)), + ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 50)); + } + private void createZNodeTree(String rootZNode) throws KeeperException, InterruptedException { List opList = new ArrayList<>(); -- 2.18.0