diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 9abef9c..f00c7c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -25,6 +25,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.InetAddress; import java.net.URI; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -62,6 +63,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; import com.google.common.collect.TreeMultimap; import com.google.protobuf.ServiceException; +import io.netty.buffer.ByteBuf; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringUtils; @@ -307,6 +309,7 @@ public class HBaseFsck extends Configured implements Closeable { private Map> skippedRegions = new HashMap>(); + ZooKeeperWatcher zkw = null; /** * Constructor * @@ -345,6 +348,7 @@ public class HBaseFsck extends Configured implements Closeable { "hbase.hbck.lockfile.attempt.sleep.interval", DEFAULT_LOCK_FILE_ATTEMPT_SLEEP_INTERVAL), getConf().getInt( "hbase.hbck.lockfile.attempt.maxsleeptime", DEFAULT_LOCK_FILE_ATTEMPT_MAX_SLEEP_TIME)); + zkw = createZooKeeperWatcher(); } private class FileLockCallable implements Callable { @@ -685,9 +689,16 @@ public class HBaseFsck extends Configured implements Closeable { oldBalancer = admin.setBalancerRunning(false, true); } boolean[] oldSplitAndMerge = null; + boolean hasSplitOrMergeLease = false; if (shouldDisableSplitAndMerge()) { - oldSplitAndMerge = admin.setSplitOrMergeEnabled(false, false, - Admin.MasterSwitchType.SPLIT, Admin.MasterSwitchType.MERGE); + hasSplitOrMergeLease = acquireSplitOrMergeLease(); + if (hasSplitOrMergeLease) { + oldSplitAndMerge = admin.setSplitOrMergeEnabled(false, false, + Admin.MasterSwitchType.SPLIT, Admin.MasterSwitchType.MERGE); + saveOriginalSplitOrMergeState(oldSplitAndMerge); + } else { + LOG.warn("We can't acquire lease of splitOrMerge!"); + } } try { @@ -702,7 +713,7 @@ public class HBaseFsck extends Configured implements Closeable { } if (shouldDisableSplitAndMerge()) { - if (oldSplitAndMerge != null) { + if (oldSplitAndMerge != null && hasSplitOrMergeLease) { if (oldSplitAndMerge[0] && oldSplitAndMerge[1]) { admin.setSplitOrMergeEnabled(true, false, Admin.MasterSwitchType.SPLIT, Admin.MasterSwitchType.MERGE); @@ -749,6 +760,10 @@ public class HBaseFsck extends Configured implements Closeable { } catch (Exception io) { LOG.warn(io); } finally { + if (zkw != null) { + zkw.close(); + zkw = null; + } IOUtils.closeQuietly(admin); IOUtils.closeQuietly(meta); IOUtils.closeQuietly(connection); @@ -1789,14 +1804,7 @@ public class HBaseFsck extends Configured implements Closeable { private ServerName getMetaRegionServerName(int replicaId) throws IOException, KeeperException { - ZooKeeperWatcher zkw = createZooKeeperWatcher(); - ServerName sn = null; - try { - sn = new MetaTableLocator().getMetaRegionLocation(zkw, replicaId); - } finally { - zkw.close(); - } - return sn; + return new MetaTableLocator().getMetaRegionLocation(zkw, replicaId);; } /** @@ -3281,28 +3289,21 @@ public class HBaseFsck extends Configured implements Closeable { } private void checkAndFixTableLocks() throws IOException { - ZooKeeperWatcher zkw = createZooKeeperWatcher(); TableLockChecker checker = new TableLockChecker(zkw, errors); checker.checkTableLocks(); if (this.fixTableLocks) { checker.fixExpiredTableLocks(); } - zkw.close(); } private void checkAndFixReplication() throws IOException { - ZooKeeperWatcher zkw = createZooKeeperWatcher(); - try { - ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); - checker.checkUnDeletedQueues(); + ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); + checker.checkUnDeletedQueues(); - if (checker.hasUnDeletedQueues() && this.fixReplication) { - checker.fixUnDeletedQueues(); - setShouldRerun(); - } - } finally { - zkw.close(); + if (checker.hasUnDeletedQueues() && this.fixReplication) { + checker.fixUnDeletedQueues(); + setShouldRerun(); } } @@ -3372,12 +3373,7 @@ public class HBaseFsck extends Configured implements Closeable { private void unassignMetaReplica(HbckInfo hi) throws IOException, InterruptedException, KeeperException { undeployRegions(hi); - ZooKeeperWatcher zkw = createZooKeeperWatcher(); - try { - ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(hi.metaEntry.getReplicaId())); - } finally { - zkw.close(); - } + ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(hi.metaEntry.getReplicaId())); } private void assignMetaReplica(int replicaId) @@ -4227,6 +4223,35 @@ public class HBaseFsck extends Configured implements Closeable { return fixAny || disableSplitAndMerge; } + private boolean acquireSplitOrMergeLease() { + String baseZNode = getConf().get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + String leaseZnode = ZKUtil.joinZNode(baseZNode, + getConf().get("zookeeper.znode.lease", "lease")); + String splitOrMergeLease = ZKUtil.joinZNode(leaseZnode, + getConf().get("zookeeper.znode.lease.splitOrMerge", "splitOrMerge")); + try { + return ZKUtil.createEphemeralNodeAndWatch(zkw, splitOrMergeLease, null); + } catch (KeeperException e) { + return false; + } + } + + private void saveOriginalSplitOrMergeState(boolean[] status) throws KeeperException { + String baseZNode = getConf().get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + String leaseZnode = ZKUtil.joinZNode(baseZNode, + getConf().get("zookeeper.znode.lease", "lease")); + String splitOrMergeStates = ZKUtil.joinZNode(leaseZnode, + getConf().get("zookeeper.znode.lease.splitOrMergeStates", "splitOrMergeStates")); + byte[] bytes = new byte[status.length]; + int i = 0; + for (boolean st : status) { + bytes[i++] = (byte) (st ? 1 : 0); + } + ZKUtil.createSetData(zkw, splitOrMergeStates, bytes); + } + /** * Set summary mode. * Print only summary of the tables and status (OK or INCONSISTENT) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/SplitOrMergeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/SplitOrMergeTracker.java index 0d729a1..f74a575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/SplitOrMergeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/SplitOrMergeTracker.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.util.IllegalFormatException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -39,9 +41,12 @@ public class SplitOrMergeTracker { private String splitZnode; private String mergeZnode; + private String splitOrMergeLease; + private ZooKeeperWatcher watcher; private SwitchStateTracker splitStateTracker; private SwitchStateTracker mergeStateTracker; + private SplitOrMergeLeaseTracker splitOrMergeLeaseTracker; public SplitOrMergeTracker(ZooKeeperWatcher watcher, Configuration conf, Abortable abortable) { @@ -56,13 +61,23 @@ public class SplitOrMergeTracker { conf.get("zookeeper.znode.switch.split", "split")); mergeZnode = ZKUtil.joinZNode(watcher.getSwitchZNode(), conf.get("zookeeper.znode.switch.merge", "merge")); + + String leaseZnode = ZKUtil.joinZNode(watcher.getBaseZNode(), + conf.get("zookeeper.znode.lease", "lease")); + splitOrMergeLease = ZKUtil.joinZNode(leaseZnode, + conf.get("zookeeper.znode.lease.splitOrMerge", "splitOrMerge")); + splitStateTracker = new SwitchStateTracker(watcher, splitZnode, abortable); mergeStateTracker = new SwitchStateTracker(watcher, mergeZnode, abortable); + splitOrMergeLeaseTracker = new SplitOrMergeLeaseTracker(watcher, splitOrMergeLease, + abortable, conf, this); + this.watcher = watcher; } public void start() { splitStateTracker.start(); mergeStateTracker.start(); + splitOrMergeLeaseTracker.start(); } public boolean isSplitOrMergeEnabled(Admin.MasterSwitchType switchType) { @@ -79,6 +94,7 @@ public class SplitOrMergeTracker { public void setSplitOrMergeEnabled(boolean enabled, Admin.MasterSwitchType switchType) throws KeeperException { + checkLease(); switch (switchType) { case SPLIT: splitStateTracker.setSwitchEnabled(enabled); @@ -91,6 +107,56 @@ public class SplitOrMergeTracker { } } + private void checkLease() throws KeeperException { + if (ZKUtil.checkExists(watcher, splitOrMergeLease) < 0) { + throw new RuntimeException("splitOrMerge lease has been acquired!"); + } + } + + + private static class SplitOrMergeLeaseTracker extends ZooKeeperNodeTracker { + private Configuration conf; + private SplitOrMergeTracker splitOrMergeTracker; + + public SplitOrMergeLeaseTracker(ZooKeeperWatcher watcher, String node, Abortable abortable, + Configuration conf, SplitOrMergeTracker splitOrMergeTracker) { + super(watcher, node, abortable); + this.conf = conf; + this.splitOrMergeTracker = splitOrMergeTracker; + } + + @Override + public synchronized void nodeDeleted(String path) { + if (this.node.equals(path)) { + try { + if (ZKUtil.checkExists(this.watcher, this.node) < 0) { + rollback(); + } + } catch (KeeperException e) { + LOG.warn("rollback failed!", e); + } catch (InterruptedException e) { + LOG.warn("rollback abort!", e); + } + } + } + + private void rollback() throws KeeperException, InterruptedException { + String leaseZnode = ZKUtil.joinZNode(watcher.getBaseZNode(), + conf.get("zookeeper.znode.lease", "lease")); + String splitOrMergeStates = ZKUtil.joinZNode(leaseZnode, + conf.get("zookeeper.znode.lease.splitOrMergeStates", "splitOrMergeStates")); + byte[] bytes = ZKUtil.getData(watcher, splitOrMergeStates); + if (bytes.length != 2) { + throw new IllegalStateException(); + } + boolean split = bytes[0] == 1; + boolean merge = bytes[1] == 1; + splitOrMergeTracker.setSplitOrMergeEnabled(split, Admin.MasterSwitchType.SPLIT); + splitOrMergeTracker.setSplitOrMergeEnabled(merge, Admin.MasterSwitchType.MERGE); + ZKUtil.deleteNode(watcher, splitOrMergeStates); + } + } + private static class SwitchStateTracker extends ZooKeeperNodeTracker { public SwitchStateTracker(ZooKeeperWatcher watcher, String node, Abortable abortable) {