diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java index 4576222..8e345a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java @@ -20,7 +20,6 @@ public enum HiveLockMode { SHARED, - EXCLUSIVE, - SEMI_SHARED; // SEMI_SHARED can share with SHARED but not with itself + SEMI_SHARED, // SEMI_SHARED can share with SHARED but not with itself + EXCLUSIVE; } - diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 64f6c27..e6146a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -18,48 +18,67 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetAddress; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.lockmgr.*; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.metadata.*; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Uninterruptibles; public class ZooKeeperHiveLockManager implements HiveLockManager { - HiveLockManagerCtx ctx; - public static final Logger LOG = LoggerFactory.getLogger("ZooKeeperHiveLockManager"); + + public static final Logger LOG = + LoggerFactory.getLogger(ZooKeeperHiveLockManager.class); + static final private LogHelper console = new LogHelper(LOG); private static CuratorFramework curatorFramework; + HiveLockManagerCtx ctx; + // All the locks are created under this parent - private String parent; + private String parent; private long sleepTime; private int numRetriesForLock; @@ -93,20 +112,32 @@ public void setContext(HiveLockManagerCtx ctx) throws LockException { numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); + parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); + final String rootNodePath = "/" + parent; + try { curatorFramework = CuratorFrameworkSingleton.getInstance(conf); - parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); - try{ - curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" + parent, new byte[0]); - } catch (Exception e) { - // ignore if the parent already exists - if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { - LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e); - } + + // Create the root node if it does not exist. This could simply create the + // node and fail, but even if the creation fails, the creation actions is + // logged in the ZK logs, so it is better to check before creating it. + final Stat stat = curatorFramework.checkExists().forPath(rootNodePath); + if (stat == null) { + curatorFramework.create().withMode(CreateMode.PERSISTENT) + .forPath(rootNodePath, new byte[0]); + } + } catch (KeeperException ke) { + switch (ke.code()) { + case NODEEXISTS: + LOG.debug("Root node {} already exists.", rootNodePath); + break; + default: + throw new LockException( + ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg(), ke); } } catch (Exception e) { - LOG.error("Failed to create curatorFramework object: ", e); - throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); + throw new LockException( + ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg(), e); } } @@ -148,100 +179,94 @@ private static String getLastObjectName(String parent, HiveLockObject key) { } /** - * @param lockObjects List of objects and the modes of the locks requested - * @param keepAlive Whether the lock is to be persisted after the statement - * - * Acuire all the locks. Release all the locks and return null if any lock + * Acquire all the locks. Release all the locks and return null if any lock * could not be acquired. - **/ + * + * @param lockObjects List of objects and the modes of the locks requested + * @param keepAlive Whether the lock is to be persisted after the statement + * @param lDrvState The state of the driver + * + * @throws LockException if the DriverState is set to 'aborted' during the + * process of creating all the locks + */ @Override - public List lock(List lockObjects, - boolean keepAlive, LockedDriverState lDrvState) throws LockException - { - // Sort the objects first. You are guaranteed that if a partition is being locked, - // the table has already been locked - - Collections.sort(lockObjects, new Comparator() { - - @Override - public int compare(HiveLockObj o1, HiveLockObj o2) { - int cmp = o1.getName().compareTo(o2.getName()); - if (cmp == 0) { - if (o1.getMode() == o2.getMode()) { - return cmp; - } - // EXCLUSIVE locks occur before SHARED locks - if (o1.getMode() == HiveLockMode.EXCLUSIVE) { - return -1; + public List lock(final List lockObjects, + final boolean keepAlive, final LockedDriverState lDrvState) + throws LockException { + + final List lockObjectList = new ArrayList<>(lockObjects); + + // Sort the objects first. This guarantees that if a partition is being + // locked, the parent table has already been locked + + Collections.sort(lockObjectList, new Comparator() { + @Override + public int compare(HiveLockObj o1, HiveLockObj o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + cmp = o2.getMode().compareTo(o1.getMode()); } - return +1; + return cmp; } - return cmp; - } }); - // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep - // and retry - HiveLockObj prevLockObj = null; + // Walk the list and acquire the locks. If any lock cannot be acquired, + // release all locks, sleep, and retry List hiveLocks = new ArrayList(); + HashSet lockedNames = new HashSet<>(); - for (HiveLockObj lockObject : lockObjects) { - // No need to acquire a lock twice on the same object - // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object - if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { - prevLockObj = lockObject; + for (final HiveLockObj lockObject : lockObjectList) { + + // There may be times where an EXCLUSIVE and a SHARED lock will exist on + // the same object. In this case, only apply the first lock. Since the + // locks are sorted, with EXCLUSIVE first, only exclusive locks lock. + if (!lockedNames.add(lockObject.getName())) { continue; } - HiveLock lock = null; - boolean isInterrupted = false; if (lDrvState != null) { + final boolean isInterrupted; lDrvState.stateLock.lock(); - if (lDrvState.isAborted()) { - isInterrupted = true; - } - lDrvState.stateLock.unlock(); - } - if (!isInterrupted) { try { - lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); - } catch (LockException e) { - console.printError("Error in acquireLocks..." ); - LOG.error("Error in acquireLocks...", e); - lock = null; + isInterrupted = lDrvState.isAborted(); + } finally { + lDrvState.stateLock.unlock(); } - } - - if (lock == null) { - releaseLocks(hiveLocks); if (isInterrupted) { + releaseLocks(hiveLocks); throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); } - return null; } - hiveLocks.add(lock); - prevLockObj = lockObject; + try { + final HiveLock lock = + lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); + hiveLocks.add(lock); + } catch (LockException e) { + console.printError("Error in acquireLocks..."); + LOG.error("Error in acquireLocks", e); + releaseLocks(hiveLocks); + return null; + } } - return hiveLocks; - + return Collections.unmodifiableList(hiveLocks); } /** - * @param hiveLocks - * list of hive locks to be released Release all the locks specified. If some of the - * locks have already been released, ignore them + * Release all the locks specified. If some of the locks have already been + * released, ignore them. + * + * @param hiveLocks list of hive locks to be released. **/ @Override - public void releaseLocks(List hiveLocks) { - if (hiveLocks != null) { - int len = hiveLocks.size(); - for (int pos = len-1; pos >= 0; pos--) { - HiveLock hiveLock = hiveLocks.get(pos); + public void releaseLocks(final List hiveLocks) { + if (CollectionUtils.isNotEmpty(hiveLocks)) { + final ListIterator it = hiveLocks.listIterator(); + while (it.hasPrevious()) { + final HiveLock hiveLock = it.previous(); try { - LOG.debug("About to release lock for {}", - hiveLock.getHiveLockObject().getName()); + LOG.debug("About to release lock: {}", hiveLock.getHiveLockObject()); unlock(hiveLock); } catch (LockException e) { // The lock may have been released. Ignore and continue @@ -285,8 +310,9 @@ private String getLockName(String parent, HiveLockMode mode) { return parent + "/" + "LOCK-" + mode + "-"; } - private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, + private ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive, boolean parentCreated) throws LockException { + LOG.debug("Acquiring lock for {} with mode {} {}", key.getName(), mode, key.getData().getLockMode()); @@ -307,24 +333,22 @@ private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, if (ret != null) { break; } - } catch (Exception e1) { - lastException = e1; - if (e1 instanceof KeeperException) { - KeeperException e = (KeeperException) e1; - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - case NONODE: - case NODEEXISTS: - LOG.debug("Possibly transient ZooKeeper exception: ", e); - break; - default: - LOG.error("Serious Zookeeper exception: ", e); - break; - } - } else { - LOG.error("Other unexpected exception: ", e1); + } catch (KeeperException ke) { + lastException = ke; + switch (ke.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + case NONODE: + case NODEEXISTS: + LOG.debug("Possibly transient ZooKeeper exception", ke); + break; + default: + LOG.error("Serious Zookeeper exception", ke); + break; } + } catch (Exception e) { + lastException = e; + LOG.error("Other unexpected exception", e); } } while (tryNum < numRetriesForLock); @@ -334,7 +358,7 @@ private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, + tryNum + " attempts."); printConflictingLocks(key,mode,conflictingLocks); if (lastException != null) { - LOG.error("Exceeds maximum retries with errors: ", lastException); + LOG.error("Exceeds maximum retries with errors", lastException); throw new LockException(lastException); } } @@ -394,14 +418,19 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, } // Create the parents first - for (String name : names) { + for (final String name : names) { try { res = createChild(name, new byte[0], CreateMode.PERSISTENT); - } catch (Exception e) { - if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { - //if the exception is not 'NODEEXISTS', re-throw it - throw e; + } catch (KeeperException ke) { + switch (ke.code()) { + case NODEEXISTS: + // If the node already exists, just ignore it + break; + default: + throw ke; } + } catch (Exception e) { + throw e; } } @@ -411,7 +440,7 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); if (seqNo == -1) { - curatorFramework.delete().forPath(res); + deletePathUninterruptibly(curatorFramework, res); throw new LockException("The created node does not contain a sequence number: " + res); } @@ -459,118 +488,169 @@ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, case SEMI_SHARED: metrics.incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); break; - default: + case SHARED: metrics.incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); break; } - } catch (Exception e) { - LOG.warn("Error Reporting hive client zookeeper lock operation to Metrics system", e); + LOG.warn("Error Reporting hive client zookeeper lock operation " + + "to Metrics system", e); } } return new ZooKeeperHiveLock(res, key, mode); } - /* Remove the lock specified */ + /** + * Remove the lock specified. + */ @Override public void unlock(HiveLock hiveLock) throws LockException { unlockWithRetry(hiveLock, parent); } - private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException { + private void unlockWithRetry(HiveLock hiveLock, String parent) + throws LockException { + int remaining = numRetriesForUnLock; + Deque exceptionList = new ArrayDeque<>(); - int tryNum = 0; - do { + while (remaining-- > 0) { try { - tryNum++; - if (tryNum > 1) { - Thread.sleep(sleepTime); - } unlockPrimitive(hiveLock, parent, curatorFramework); - break; + return; } catch (Exception e) { - if (tryNum >= numRetriesForUnLock) { - String name = ((ZooKeeperHiveLock)hiveLock).getPath(); - throw new LockException("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.", - e); - } + exceptionList.add(e); } - } while (tryNum < numRetriesForUnLock); - - return; + if (remaining > 0) { + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + } + } + String name = ((ZooKeeperHiveLock) hiveLock).getPath(); + throw new LockException("Node " + name + " can not be deleted after " + + numRetriesForUnLock + " attempts.", exceptionList.getLast()); } - /* Remove the lock specified */ + /** + * Remove the lock specified. Ignore any {@link InterruptedException} and + * complete the lock. Any locks not deleted may block future queries. + */ @VisibleForTesting - static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException { + static void unlockPrimitive(HiveLock hiveLock, String parent, + CuratorFramework curatorFramework) throws LockException { ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; HiveLockMode lMode = hiveLock.getHiveLockMode(); HiveLockObject obj = zLock.getHiveLockObject(); String name = getLastObjectName(parent, obj); try { - //catch InterruptedException to make sure locks can be released when the query is cancelled. - try { - curatorFramework.delete().forPath(zLock.getPath()); - } catch (InterruptedException ie) { - curatorFramework.delete().forPath(zLock.getPath()); - } + deletePathUninterruptibly(curatorFramework, zLock.getPath()); // Delete the parent node if all the children have been deleted - List children = null; - try { - children = curatorFramework.getChildren().forPath(name); - } catch (InterruptedException ie) { - children = curatorFramework.getChildren().forPath(name); - } - if (children == null || children.isEmpty()) { - try { - curatorFramework.delete().forPath(name); - } catch (InterruptedException ie) { - curatorFramework.delete().forPath(name); - } + List children = + getChildrenUninterruptibly(curatorFramework, name); + + if (CollectionUtils.isEmpty(children)) { + deletePathUninterruptibly(curatorFramework, name); } + Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { - switch(lMode) { + switch (lMode) { case EXCLUSIVE: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); + metrics.decrementCounter( + MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); break; case SEMI_SHARED: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); + metrics.decrementCounter( + MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); break; - default: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); + case SHARED: + metrics + .decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); break; } } catch (Exception e) { - LOG.warn("Error Reporting hive client zookeeper unlock operation to Metrics system", e); + LOG.warn("Error Reporting hive client zookeeper unlock operation " + + "to Metrics system", e); } } } catch (KeeperException.NoNodeException nne) { //can happen in retrying deleting the zLock after exceptions like InterruptedException //or in a race condition where parent has already been deleted by other process when it //is to be deleted. Both cases should not raise error - LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted."); + LOG.debug("Node {} or its parent has already been deleted", + zLock.getPath(), nne); } catch (KeeperException.NotEmptyException nee) { //can happen in a race condition where another process adds a zLock under this parent //just before it is about to be deleted. It should not be a problem since this parent //can eventually be deleted by the process which hold its last child zLock - LOG.debug("Node " + name + " to be deleted is not empty."); + LOG.debug("Node {} to be deleted is not empty", name, nee); } catch (Exception e) { //exceptions including InterruptException and other KeeperException - LOG.error("Failed to release ZooKeeper lock: ", e); + LOG.error("Failed to release ZooKeeper lock", e); throw new LockException(e); } } + private static void deletePathUninterruptibly( + final CuratorFramework curatorFramework, final String path) + throws Exception { + boolean interrupted = false; + try { + // Clear the interrupted flag, though understanding that the thread may be + // immediately interrupted again. Gives a chance at the first invocation + // succeeding. + Thread.interrupted(); + while (true) { + try { + curatorFramework.delete().forPath(path); + return; + } catch (KeeperException.NoNodeException nne) { + // Delete request may have been in flight, interrupted, then + // performed again. Ignore this. + LOG.debug("Node {} or its parent has already been deleted", path, + nne); + return; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private static List getChildrenUninterruptibly( + final CuratorFramework curatorFramework, final String path) + throws Exception { + boolean interrupted = false; + try { + // Clear the interrupted flag, though understanding that the thread may be + // immediately interrupted again. Gives a chance at the first invocation + // succeeding. + Thread.interrupted(); + while (true) { + try { + return curatorFramework.getChildren().forPath(path); + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + /* Release all locks - including PERSISTENT locks */ public static void releaseAllLocks(HiveConf conf) throws Exception { try { String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); List locks = getLocks(conf, null, parent, false, false); Exception lastExceptionGot = null; - if (locks != null) { + if (CollectionUtils.isNotEmpty(locks)) { for (HiveLock lock : locks) { try { unlockPrimitive(lock, parent, curatorFramework); @@ -581,35 +661,42 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { } // if we got exception during doing the unlock, rethrow it here - if(lastExceptionGot != null) { + if (lastExceptionGot != null) { throw lastExceptionGot; } } catch (Exception e) { - LOG.error("Failed to release all locks: ", e); + LOG.error("Failed to release all locks", e); throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } } - /* Get all locks */ + /** + * Get all locks. + */ @Override - public List getLocks(boolean verifyTablePartition, boolean fetchData) - throws LockException { - return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData); + public List getLocks(boolean verifyTablePartition, + boolean fetchData) throws LockException { + return getLocks(ctx.getConf(), null, parent, verifyTablePartition, + fetchData); } - /* Get all locks for a particular object */ + /** + * Get all locks for a particular object. + */ @Override - public List getLocks(HiveLockObject key, boolean verifyTablePartitions, - boolean fetchData) throws LockException { - return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData); + public List getLocks(HiveLockObject key, + boolean verifyTablePartitions, boolean fetchData) throws LockException { + return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, + fetchData); } /** - * @param conf Hive configuration - * @param key The object to be compared against - if key is null, then get all locks + * @param conf Hive configuration + * @param key The object to be compared against - if key is null, then get all + * locks **/ - private static List getLocks(HiveConf conf, - HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData) + private static List getLocks(HiveConf conf, HiveLockObject key, + String parent, boolean verifyTablePartition, boolean fetchData) throws LockException { List locks = new ArrayList(); List children; @@ -675,7 +762,7 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild))); data.setClientIp(clientIp); } catch (Exception e) { - LOG.error("Error in getting data for " + curChild, e); + LOG.error("Error in getting data for {}", curChild, e); // ignore error } } @@ -686,7 +773,9 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { } } - /** Remove all redundant nodes **/ + /** + * Remove all redundant nodes. + */ private void removeAllRedundantNodes() { try { checkRedundantNode("/" + parent); @@ -708,32 +797,33 @@ private void checkRedundantNode(String node) { } children = curatorFramework.getChildren().forPath(node); - if ((children == null) || (children.isEmpty())) - { + if (CollectionUtils.isEmpty(children)) { curatorFramework.delete().forPath(node); } } catch (Exception e) { - LOG.warn("Error in checkRedundantNode for node " + node, e); + LOG.warn("Error in checkRedundantNode for node {}", node, e); } } - /* Release all transient locks, by simply closing the client */ + /** + * Release all transient locks, by simply closing the client. + */ @Override public void close() throws LockException { - try { - - if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { + try { + if (HiveConf.getBoolVar(ctx.getConf(), + HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { removeAllRedundantNodes(); } - } catch (Exception e) { - LOG.error("Failed to close zooKeeper client: " + e); + LOG.error("Failed to close zooKeeper client", e); throw new LockException(e); } } /** - * Get the sequence number from the path. The sequence number is always at the end of the path. + * Get the sequence number from the path. The sequence number is always at the + * end of the path. **/ private int getSequenceNumber(String resPath, String path) { String tst = resPath.substring(path.length()); @@ -757,7 +847,7 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, throws LockException { try { Hive db = Hive.get(conf); - int indx = path.lastIndexOf("LOCK-" + mode.toString()); + int indx = path.lastIndexOf("LOCK-" + mode); String objName = path.substring(("/" + parent + "/").length(), indx-1); String[] names = objName.split("/"); @@ -798,7 +888,7 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, return new HiveLockObject(partn, data); } catch (Exception e) { - LOG.error("Failed to create ZooKeeper object: " + e); + LOG.error("Failed to create ZooKeeper object", e); throw new LockException(e); } } @@ -809,14 +899,11 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, /* Get the mode of the lock encoded in the path */ private static HiveLockMode getLockMode(String path) { - Matcher shMatcher = shMode.matcher(path); - Matcher exMatcher = exMode.matcher(path); - - if (shMatcher.matches()) { + if (shMode.matcher(path).matches()) { return HiveLockMode.SHARED; } - if (exMatcher.matches()) { + if (exMode.matcher(path).matches()) { return HiveLockMode.EXCLUSIVE; } @@ -826,5 +913,4 @@ private static HiveLockMode getLockMode(String path) { @Override public void prepareRetry() throws LockException { } - }