diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 17a2d20..de6c859 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -17,16 +17,21 @@ */ package org.apache.hadoop.hive.ql.lockmgr; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; -import org.apache.hadoop.hive.metastore.api.TxnToWriteId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ValidTxnList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -38,8 +43,8 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.util.ReflectionUtils; - -import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of {@link HiveTxnManager} that does not support @@ -47,7 +52,7 @@ */ class DummyTxnManager extends HiveTxnManagerImpl { static final private Logger LOG = - LoggerFactory.getLogger(DummyTxnManager.class.getName()); + LoggerFactory.getLogger(DummyTxnManager.class); private HiveLockManager lockMgr; @@ -102,12 +107,12 @@ public HiveLockManager getLockManager() throws LockException { if (supportConcurrency) { String lockMgrName = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER); - if ((lockMgrName == null) || (lockMgrName.isEmpty())) { + if (StringUtils.isBlank(lockMgrName)) { throw new LockException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); } try { - LOG.info("Creating lock manager of type " + lockMgrName); + LOG.info("Creating lock manager of type {}", lockMgrName); lockMgr = (HiveLockManager)ReflectionUtils.newInstance( conf.getClassByName(lockMgrName), conf); lockManagerCtx = new HiveLockManagerCtx(conf); @@ -163,7 +168,8 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDri if (!input.needsLock()) { continue; } - LOG.debug("Adding " + input.getName() + " to list of lock inputs"); + LOG.debug("Adding {}:{} to list of lock inputs", input.getName(), + HiveLockMode.SHARED); if (input.getType() == ReadEntity.Type.DATABASE) { lockObjects.addAll(getLockObjects(plan, input.getDatabase(), null, null, HiveLockMode.SHARED)); @@ -182,7 +188,9 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDri if (lockMode == null) { continue; } - LOG.debug("Adding " + output.getName() + " to list of lock outputs"); + + LOG.debug("Adding {}:{} to list of lock outputs", output.getName(), + lockMode); List lockObj = null; if (output.getType() == WriteEntity.Type.DATABASE) { lockObjects.addAll(getLockObjects(plan, output.getDatabase(), null, null, lockMode)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java index 4576222..d10c1d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.lockmgr; +/** + * The lock modes arranged in order of least isolated to most. + */ public enum HiveLockMode { SHARED, - EXCLUSIVE, - SEMI_SHARED; // SEMI_SHARED can share with SHARED but not with itself + SEMI_SHARED, // SEMI_SHARED can share with SHARED but not with itself + EXCLUSIVE; } - diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java index 411664e..12354cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObj.java @@ -46,4 +46,9 @@ public void setMode(HiveLockMode mode) { public String getName() { return obj.getName(); } + + @Override + public String toString() { + return "HiveLockObj [obj=" + obj + ", mode=" + mode + "]"; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java index 286a47f..e61818f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; +import java.util.Arrays; import java.util.Map; import org.apache.commons.lang.StringUtils; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; public class HiveLockObject { + String[] pathNames = null; public static class HiveLockObjectData { @@ -314,6 +316,12 @@ public int hashCode() { return builder.toHashCode(); } + @Override + public String toString() { + return "HiveLockObject [pathNames=" + Arrays.toString(pathNames) + ", data=" + + data + "]"; + } + private static String removeDelimiter(String in) { if (in == null) { return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java index d295c15..3857549 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java @@ -93,4 +93,10 @@ public int hashCode() { } return builder.toHashCode(); } + + @Override + public String toString() { + return "ZooKeeperHiveLock [path=" + path + ", obj=" + obj + ", mode=" + mode + + "]"; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 64f6c27..68a297d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -18,230 +18,250 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.curator.utils.ZKPaths.PathAndNode; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.lockmgr.*; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.metadata.*; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +/** + * A {@link HiveLockManager} that stores the locks in ZooKeeper. + */ public class ZooKeeperHiveLockManager implements HiveLockManager { - HiveLockManagerCtx ctx; - public static final Logger LOG = LoggerFactory.getLogger("ZooKeeperHiveLockManager"); + + public static final Logger LOG = + LoggerFactory.getLogger(ZooKeeperHiveLockManager.class); + static final private LogHelper console = new LogHelper(LOG); - private static CuratorFramework curatorFramework; + private CuratorFramework curatorFramework; + + HiveLockManagerCtx ctx; - // All the locks are created under this parent - private String parent; + /** All ZK locks are created under this root ZNode. */ + private String zkNamespace; private long sleepTime; private int numRetriesForLock; - private int numRetriesForUnLock; - private static String clientIp; + private final String clientIP; - static { - clientIp = "UNKNOWN"; + /** + * Constructor. + */ + public ZooKeeperHiveLockManager() { + String cip = "UNKNOWN"; try { - InetAddress clientAddr = InetAddress.getLocalHost(); - clientIp = clientAddr.getHostAddress(); - } catch (Exception e1) { + cip = InetAddress.getLocalHost().getHostAddress(); + } catch (Exception e) { + LOG.warn("Could not determine local host IP address", e); } - } - - public ZooKeeperHiveLockManager() { + clientIP = cip; } /** - * @param ctx The lock manager context (containing the Hive configuration file) - * Start the ZooKeeper client based on the zookeeper cluster specified in the conf. + * @param ctx The lock manager context (containing the Hive configuration + * file) Start the ZooKeeper client based on the zookeeper cluster + * specified in the conf. **/ @Override public void setContext(HiveLockManagerCtx ctx) throws LockException { + final HiveConf conf = ctx.getConf(); + this.ctx = ctx; - HiveConf conf = ctx.getConf(); - sleepTime = conf.getTimeVar( - HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); + sleepTime = + conf.getTimeVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, + TimeUnit.MILLISECONDS); numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); - numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); + + zkNamespace = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); + final String rootNodePath = ZKPaths.makePath(zkNamespace, ""); try { curatorFramework = CuratorFrameworkSingleton.getInstance(conf); - parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); - try{ - curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" + parent, new byte[0]); - } catch (Exception e) { - // ignore if the parent already exists - if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { - LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e); - } + + // Create the root node if it does not exist. This could simply create the + // node and fail, but even if the creation fails, the creation actions is + // logged in the ZK logs, so it is better to check before creating it. + if (curatorFramework.checkExists().forPath(rootNodePath) == null) { + curatorFramework.create().withMode(CreateMode.PERSISTENT) + .forPath(rootNodePath); + } + } catch (KeeperException ke) { + if (ke.code() == Code.NODEEXISTS) { + LOG.debug("Root node {} already exists.", rootNodePath); + } else { + throw new LockException( + ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg(), ke); } } catch (Exception e) { - LOG.error("Failed to create curatorFramework object: ", e); - throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); + throw new LockException( + ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg(), e); } } + /** + * Refresh to enable new configurations. + */ @Override public void refresh() { HiveConf conf = ctx.getConf(); - sleepTime = conf.getTimeVar( - HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); - numRetriesForLock = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); - numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES); - } - /** - * @param key object to be locked - * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1, - * the last name would be db/T/ds=1/hr=1 - **/ - private static String getLastObjectName(String parent, HiveLockObject key) { - return "/" + parent + "/" + key.getName(); - } + this.sleepTime = + conf.getTimeVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, + TimeUnit.MILLISECONDS); - /** - * @param key object to be locked - * Get the list of names for all the parents. - * For eg: if you need to lock db/T/ds=1/hr=1, the following list will be returned: - * {db, db/T, db/T/ds=1, db/T/ds=1/hr=1} - **/ - private List getObjectNames(HiveLockObject key) { - List parents = new ArrayList(); - String curParent = "/" + parent + "/"; - String[] names = key.getName().split("/"); - - for (String name : names) { - curParent = curParent + name; - parents.add(curParent); - curParent = curParent + "/"; - } - return parents; + this.numRetriesForLock = + conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); } /** - * @param lockObjects List of objects and the modes of the locks requested - * @param keepAlive Whether the lock is to be persisted after the statement - * - * Acuire all the locks. Release all the locks and return null if any lock + * Acquire all the locks. Release all the locks and return null if any lock * could not be acquired. - **/ + * + * @param lockObjects List of objects and the modes of the locks requested + * @param keepAlive Whether the lock is to be persisted after the statement + * @param lDrvState The state of the driver + * + * @throws LockException if the DriverState is set to 'aborted' during the + * process of creating all the locks + */ @Override - public List lock(List lockObjects, - boolean keepAlive, LockedDriverState lDrvState) throws LockException - { - // Sort the objects first. You are guaranteed that if a partition is being locked, - // the table has already been locked - - Collections.sort(lockObjects, new Comparator() { - - @Override - public int compare(HiveLockObj o1, HiveLockObj o2) { - int cmp = o1.getName().compareTo(o2.getName()); - if (cmp == 0) { - if (o1.getMode() == o2.getMode()) { - return cmp; - } - // EXCLUSIVE locks occur before SHARED locks - if (o1.getMode() == HiveLockMode.EXCLUSIVE) { - return -1; + public List lock(final List lockObjects, + final boolean keepAlive, final LockedDriverState lDrvState) + throws LockException { + + final List lockObjectList = new ArrayList<>(lockObjects); + + Collections.sort(lockObjectList, new Comparator() { + @Override + public int compare(HiveLockObj o1, HiveLockObj o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + cmp = o2.getMode().compareTo(o1.getMode()); } - return +1; + return cmp; } - return cmp; - } }); - // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep - // and retry - HiveLockObj prevLockObj = null; - List hiveLocks = new ArrayList(); + // Walk the list and acquire the locks. If any lock cannot be acquired, + // release all locks. + final List hiveLocks = new ArrayList<>(); + final Set lockedNames = new HashSet<>(); - for (HiveLockObj lockObject : lockObjects) { - // No need to acquire a lock twice on the same object - // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object - if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { - prevLockObj = lockObject; + for (final HiveLockObj lockObject : lockObjectList) { + LOG.trace("If applicable, creating lock: {}", lockObject); + + // There may be times where an EXCLUSIVE and a SHARED lock will exist on + // the same object. In this case, only apply the first lock. Since the + // locks are sorted, with EXCLUSIVE first, only exclusive locks lock. + if (!lockedNames.add(lockObject.getName())) { continue; } - HiveLock lock = null; - boolean isInterrupted = false; if (lDrvState != null) { + final boolean isInterrupted; lDrvState.stateLock.lock(); - if (lDrvState.isAborted()) { - isInterrupted = true; - } - lDrvState.stateLock.unlock(); - } - if (!isInterrupted) { try { - lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); - } catch (LockException e) { - console.printError("Error in acquireLocks..." ); - LOG.error("Error in acquireLocks...", e); - lock = null; + isInterrupted = lDrvState.isAborted(); + } finally { + lDrvState.stateLock.unlock(); } - } - - if (lock == null) { - releaseLocks(hiveLocks); if (isInterrupted) { + releaseLocks(hiveLocks); throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); } + } + + final HiveLock lock; + try { + lock = lockRetry(lockObject.getObj(), lockObject.getMode(), keepAlive); + } catch (LockException e) { + console.printError("Error in acquire locks"); + LOG.error("Error in acquire locks", e); + releaseLocks(hiveLocks); return null; + } catch (InterruptedException ie) { + releaseLocks(hiveLocks); + Thread.currentThread().interrupt(); + throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg(), ie); } - hiveLocks.add(lock); - prevLockObj = lockObject; + if (lock != null) { + hiveLocks.add(lock); + } else { + releaseLocks(hiveLocks); + return null; + } } return hiveLocks; - } /** - * @param hiveLocks - * list of hive locks to be released Release all the locks specified. If some of the - * locks have already been released, ignore them - **/ + * Release all the locks specified. If some of the locks have already been + * released, ignore them. + * + * @param hiveLocks list of hive locks to be released. + */ @Override - public void releaseLocks(List hiveLocks) { - if (hiveLocks != null) { - int len = hiveLocks.size(); - for (int pos = len-1; pos >= 0; pos--) { - HiveLock hiveLock = hiveLocks.get(pos); + public void releaseLocks(final List hiveLocks) { + LOG.debug("Releasing locks: {}", hiveLocks); + if (CollectionUtils.isNotEmpty(hiveLocks)) { + ListIterator it = hiveLocks.listIterator(hiveLocks.size()); + while (it.hasPrevious()) { + final HiveLock hiveLock = it.previous(); try { - LOG.debug("About to release lock for {}", - hiveLock.getHiveLockObject().getName()); + LOG.trace("About to release lock: {}", hiveLock); unlock(hiveLock); } catch (LockException e) { // The lock may have been released. Ignore and continue @@ -252,48 +272,53 @@ public void releaseLocks(List hiveLocks) { } /** - * @param key - * The object to be locked - * @param mode - * The mode of the lock - * @param keepAlive - * Whether the lock is to be persisted after the statement Acquire the - * lock. Return null if a conflicting lock is present. - **/ + * @param key The object to be locked + * @param mode The mode of the lock + * @param keepAlive Whether the lock is to be persisted after the statement + * Acquire the lock. Return null if a conflicting lock is present. + */ @Override - public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, - boolean keepAlive) throws LockException { - return lock(key, mode, keepAlive, false); + public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) + throws LockException { + try { + return lockRetry(key, mode, keepAlive); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg(), ie); + } } /** - * @param name - * The name of the zookeeper child - * @param data - * The data for the zookeeper child - * @param mode - * The mode in which the child needs to be created - * @throws KeeperException - * @throws InterruptedException - **/ - private String createChild(String name, byte[] data, CreateMode mode) - throws Exception { - return curatorFramework.create().withMode(mode).forPath(name, data); + * Given an absolute ZNode path and a lock mode, return an absolute ZNode path + * of the new lock node. + * + * @param path The path of the root ZNode to create the lock under + * @param mode The lock mode + * @return an absolute ZNode path of the new lock node + */ + private String getLockName(final String path, final HiveLockMode mode) { + return ZKPaths.makePath(path, "LOCK-" + mode + '-'); } - private String getLockName(String parent, HiveLockMode mode) { - return parent + "/" + "LOCK-" + mode + "-"; - } + /** + * Try to obtain a lock. Retry several times if the lock cannot be obtained + * for any reason. + * + * @return null if the lock cannot be obtained because a pre-existing lock + * blocks it. + */ + private ZooKeeperHiveLock lockRetry(final HiveLockObject key, + final HiveLockMode mode, final boolean keepAlive) + throws LockException, InterruptedException { - private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, - boolean keepAlive, boolean parentCreated) throws LockException { LOG.debug("Acquiring lock for {} with mode {} {}", key.getName(), mode, key.getData().getLockMode()); - int tryNum = 0; - ZooKeeperHiveLock ret = null; - Set conflictingLocks = new HashSet(); + final Set conflictingLocks = new HashSet<>(); + + ZooKeeperHiveLock lock = null; Exception lastException = null; + int tryNum = 0; do { lastException = null; @@ -303,393 +328,410 @@ private ZooKeeperHiveLock lock (HiveLockObject key, HiveLockMode mode, Thread.sleep(sleepTime); prepareRetry(); } - ret = lockPrimitive(key, mode, keepAlive, parentCreated, conflictingLocks); - if (ret != null) { + lock = lockPrimitive(key, mode, keepAlive, conflictingLocks); + } catch (KeeperException ke) { + lastException = ke; + switch (ke.code()) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + case NONODE: + case NODEEXISTS: + LOG.debug("Possibly transient ZooKeeper exception", ke); + break; + default: + LOG.error("Serious Zookeeper exception", ke); break; } - } catch (Exception e1) { - lastException = e1; - if (e1 instanceof KeeperException) { - KeeperException e = (KeeperException) e1; - switch (e.code()) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - case NONODE: - case NODEEXISTS: - LOG.debug("Possibly transient ZooKeeper exception: ", e); - break; - default: - LOG.error("Serious Zookeeper exception: ", e); - break; - } - } else { - LOG.error("Other unexpected exception: ", e1); - } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw ie; + } catch (Exception e) { + lastException = e; + LOG.error("Other unexpected exception", e); } - } while (tryNum < numRetriesForLock); + } while (tryNum < numRetriesForLock && lock == null); - if (ret == null) { + if (lock == null) { console.printError("Unable to acquire " + key.getData().getLockMode() - + ", " + mode + " lock " + key.getDisplayName() + " after " - + tryNum + " attempts."); - printConflictingLocks(key,mode,conflictingLocks); + + ", " + mode + " lock " + key.getDisplayName() + " after " + tryNum + + " attempts."); + if (lastException != null) { - LOG.error("Exceeds maximum retries with errors: ", lastException); + LOG.error("Exceeds maximum retries with errors", lastException); throw new LockException(lastException); } + + printConflictingLocks(key, mode, conflictingLocks); } - return ret; + + return lock; } private void printConflictingLocks(HiveLockObject key, HiveLockMode mode, Set conflictingLocks) { if (!conflictingLocks.isEmpty()) { - HiveLockObjectData requestedLock = new HiveLockObjectData(key.getData().toString()); - LOG.debug("Requested lock " + key.getDisplayName() - + ":: mode:" + requestedLock.getLockMode() + "," + mode - + "; query:" + requestedLock.getQueryStr()); + HiveLockObjectData requestedLock = + new HiveLockObjectData(key.getData().toString()); + LOG.debug("Requested lock " + key.getDisplayName() + ":: mode:" + + requestedLock.getLockMode() + "," + mode + "; query:" + + requestedLock.getQueryStr()); for (String conflictingLock : conflictingLocks) { - HiveLockObjectData conflictingLockData = new HiveLockObjectData(conflictingLock); - LOG.debug("Conflicting lock to " + key.getDisplayName() - + ":: mode:" + conflictingLockData.getLockMode() - + ";query:" + conflictingLockData.getQueryStr() - + ";queryId:" + conflictingLockData.getQueryId() - + ";clientIp:" + conflictingLockData.getClientIp()); + HiveLockObjectData conflictingLockData = + new HiveLockObjectData(conflictingLock); + LOG.debug("Conflicting lock to " + key.getDisplayName() + ":: mode:" + + conflictingLockData.getLockMode() + ";query:" + + conflictingLockData.getQueryStr() + ";queryId:" + + conflictingLockData.getQueryId() + ";clientIp:" + + conflictingLockData.getClientIp()); } } } /** * Creates a primitive lock object on ZooKeeper. + * * @param key The lock data - * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED) - * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper - * locks - * @param parentCreated If we expect, that the parent is already created then true, otherwise - * we will try to create the parents as well - * @param conflictingLocks The set where we should collect the conflicting locks when - * the logging level is set to DEBUG - * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock + * @param mode The lock mode + * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise + * EPHEMERAL ZooKeeper locks + * @param conflictingLocks The set where we should collect the conflicting + * locks when the logging level is set to DEBUG + * @return The created ZooKeeperHiveLock object, null if there was a + * conflicting lock * @throws Exception If there was an unexpected Exception */ - private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, - HiveLockMode mode, boolean keepAlive, boolean parentCreated, - Set conflictingLocks) - throws Exception { - String res; - - // If the parents have already been created, create the last child only - List names = new ArrayList(); - String lastName; + private ZooKeeperHiveLock lockPrimitive(final HiveLockObject key, + final HiveLockMode mode, final boolean keepAlive, + final Collection conflictingLocks) throws Exception { HiveLockObjectData lockData = key.getData(); - lockData.setClientIp(clientIp); - - if (parentCreated) { - lastName = getLastObjectName(parent, key); - names.add(lastName); - } else { - names = getObjectNames(key); - lastName = names.get(names.size() - 1); + lockData.setClientIp(clientIP); + + final String rootLockPath = + ZKPaths.makePath(this.zkNamespace, "", key.getPaths()); + final String lockPatch = getLockName(rootLockPath, mode); + + final String finalLockPath = curatorFramework.create() + .creatingParentContainersIfNeeded().withProtection() + .withMode(keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL + : CreateMode.EPHEMERAL_SEQUENTIAL) + .forPath(lockPatch, + key.getData().toString().getBytes(StandardCharsets.UTF_8)); + + final int seqNo = extractSequenceNumber(finalLockPath); + if (seqNo < 0) { + curatorFramework.delete().guaranteed().forPath(finalLockPath); + throw new LockException( + "The created node does not contain a sequence number: " + + finalLockPath); } - // Create the parents first - for (String name : names) { - try { - res = createChild(name, new byte[0], CreateMode.PERSISTENT); - } catch (Exception e) { - if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) { - //if the exception is not 'NODEEXISTS', re-throw it - throw e; - } - } + final PathAndNode pathAndNode = ZKPaths.getPathAndNode(finalLockPath); + List children = Collections.emptyList(); + try { + children = curatorFramework.getChildren().forPath(pathAndNode.getPath()); + } catch (Exception e) { + curatorFramework.delete().guaranteed().forPath(finalLockPath); + ThreadUtils.checkInterrupted(e); + throw e; } - res = createChild(getLockName(lastName, mode), key.getData().toString() - .getBytes(), keepAlive ? CreateMode.PERSISTENT_SEQUENTIAL - : CreateMode.EPHEMERAL_SEQUENTIAL); - - int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); - if (seqNo == -1) { - curatorFramework.delete().forPath(res); - throw new LockException("The created node does not contain a sequence number: " + res); + final Set blockingLocks = new HashSet<>(0); + for (final String lockNode : children) { + final String currentLockPath = + ZKPaths.makePath(pathAndNode.getPath(), lockNode); + + final int existingSeqNum = extractSequenceNumber(currentLockPath); + + // Check if this is an existing lock with lower sequence number + if (existingSeqNum >= 0 && existingSeqNum < seqNo) { + switch (mode) { + case SHARED: + final HiveLockMode currentLockMode = getLockMode(currentLockPath); + if (HiveLockMode.SHARED == currentLockMode) { + // Ignore if the discovered lock mode is also SHARED + // Otherwise fall through and add to blocking locks list + break; + } + case EXCLUSIVE: + // Cannot grab exclusive lock if there already exists a lock + default: + blockingLocks.add(currentLockPath); + break; + } + } } - List children = curatorFramework.getChildren().forPath(lastName); - - String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE); - String shLock = getLockName(lastName, HiveLockMode.SHARED); - - for (String child : children) { - child = lastName + "/" + child; - - // Is there a conflicting lock on the same object with a lower sequence - // number - int childSeq = seqNo; - if (child.startsWith(exLock)) { - childSeq = getSequenceNumber(child, exLock); - } - if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { - childSeq = getSequenceNumber(child, shLock); - } + if (!blockingLocks.isEmpty()) { + // delete the newly created lock - will have to try again later + curatorFramework.delete().guaranteed().forPath(finalLockPath); - if ((childSeq >= 0) && (childSeq < seqNo)) { - try { - curatorFramework.delete().forPath(res); - } finally { - if (LOG.isDebugEnabled()) { - try { - String data = new String(curatorFramework.getData().forPath(child)); - conflictingLocks.add(data); - } catch (Exception e) { - //ignored - } + if (LOG.isDebugEnabled()) { + for (final String blockingLockPath : blockingLocks) { + try { + final String dataStr = + new String(curatorFramework.getData().forPath(blockingLockPath), + StandardCharsets.UTF_8); + conflictingLocks.add(dataStr); + } catch (Exception e) { + LOG.debug("Could not get data for node: {}", blockingLockPath, e); } } - return null; } + + LOG.info("Cannot obtain lock. Blocked by existing lock."); + LOG.debug("Blocked by {}", blockingLocks); + return null; } - Metrics metrics = MetricsFactory.getInstance(); + + final Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { try { - switch(mode) { + switch (mode) { case EXCLUSIVE: - metrics.incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); + metrics + .incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); break; case SEMI_SHARED: - metrics.incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); + metrics + .incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); break; - default: + case SHARED: metrics.incrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); break; + default: + break; } - } catch (Exception e) { - LOG.warn("Error Reporting hive client zookeeper lock operation to Metrics system", e); + LOG.warn("Error Reporting hive client zookeeper lock operation " + + "to Metrics system", e); } } - return new ZooKeeperHiveLock(res, key, mode); - } - /* Remove the lock specified */ - @Override - public void unlock(HiveLock hiveLock) throws LockException { - unlockWithRetry(hiveLock, parent); + return new ZooKeeperHiveLock(finalLockPath, key, mode); } - private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException { + // Hardcoded in {@link org.apache.zookeeper.server.PrepRequestProcessor} + static final int SEQUENTIAL_SUFFIX_DIGITS = 10; - int tryNum = 0; - do { + /** + * Extracts the ten-digit suffix from a sequential znode path. + * + * @param path the path of a sequential znodes + * @return the sequence number + */ + private int extractSequenceNumber(final String path) { + final int length = path.length(); + if (length >= SEQUENTIAL_SUFFIX_DIGITS) { + final String seqStr = path.substring(length - SEQUENTIAL_SUFFIX_DIGITS); try { - tryNum++; - if (tryNum > 1) { - Thread.sleep(sleepTime); - } - unlockPrimitive(hiveLock, parent, curatorFramework); - break; + return Integer.parseInt(seqStr); } catch (Exception e) { - if (tryNum >= numRetriesForUnLock) { - String name = ((ZooKeeperHiveLock)hiveLock).getPath(); - throw new LockException("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.", - e); - } } - } while (tryNum < numRetriesForUnLock); + } + // invalid number + return -1; + } - return; + /** + * Remove the lock specified. + */ + @Override + public void unlock(HiveLock hiveLock) throws LockException { + unlockPrimitive(hiveLock, zkNamespace, curatorFramework); } - /* Remove the lock specified */ + /** + * Remove the lock specified. Ignore any {@link InterruptedException} and + * complete the lock. Any locks not deleted may block future queries. + */ @VisibleForTesting - static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException { - ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; - HiveLockMode lMode = hiveLock.getHiveLockMode(); - HiveLockObject obj = zLock.getHiveLockObject(); - String name = getLastObjectName(parent, obj); - try { - //catch InterruptedException to make sure locks can be released when the query is cancelled. - try { - curatorFramework.delete().forPath(zLock.getPath()); - } catch (InterruptedException ie) { - curatorFramework.delete().forPath(zLock.getPath()); - } + static void unlockPrimitive(HiveLock hiveLock, String parent, + CuratorFramework curatorFramework) throws LockException { - // Delete the parent node if all the children have been deleted - List children = null; - try { - children = curatorFramework.getChildren().forPath(name); - } catch (InterruptedException ie) { - children = curatorFramework.getChildren().forPath(name); - } - if (children == null || children.isEmpty()) { - try { - curatorFramework.delete().forPath(name); - } catch (InterruptedException ie) { - curatorFramework.delete().forPath(name); - } - } - Metrics metrics = MetricsFactory.getInstance(); + Objects.requireNonNull(hiveLock, "Lock must not be null"); + + final ZooKeeperHiveLock zLock = (ZooKeeperHiveLock) hiveLock; + + try { + curatorFramework.delete().guaranteed().forPath(zLock.getPath()); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + throw new LockException(e); + } finally { + final Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { + final HiveLockMode lMode = hiveLock.getHiveLockMode(); try { - switch(lMode) { + switch (lMode) { case EXCLUSIVE: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); + metrics.decrementCounter( + MetricsConstant.ZOOKEEPER_HIVE_EXCLUSIVELOCKS); break; case SEMI_SHARED: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); + metrics.decrementCounter( + MetricsConstant.ZOOKEEPER_HIVE_SEMISHAREDLOCKS); + break; + case SHARED: + metrics + .decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); break; default: - metrics.decrementCounter(MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS); break; } } catch (Exception e) { - LOG.warn("Error Reporting hive client zookeeper unlock operation to Metrics system", e); + LOG.warn("Error Reporting hive client zookeeper unlock operation " + + "to Metrics system", e); } } - } catch (KeeperException.NoNodeException nne) { - //can happen in retrying deleting the zLock after exceptions like InterruptedException - //or in a race condition where parent has already been deleted by other process when it - //is to be deleted. Both cases should not raise error - LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted."); - } catch (KeeperException.NotEmptyException nee) { - //can happen in a race condition where another process adds a zLock under this parent - //just before it is about to be deleted. It should not be a problem since this parent - //can eventually be deleted by the process which hold its last child zLock - LOG.debug("Node " + name + " to be deleted is not empty."); - } catch (Exception e) { - //exceptions including InterruptException and other KeeperException - LOG.error("Failed to release ZooKeeper lock: ", e); - throw new LockException(e); } - } - /* Release all locks - including PERSISTENT locks */ - public static void releaseAllLocks(HiveConf conf) throws Exception { try { - String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); - List locks = getLocks(conf, null, parent, false, false); - Exception lastExceptionGot = null; - if (locks != null) { - for (HiveLock lock : locks) { - try { - unlockPrimitive(lock, parent, curatorFramework); - } catch (Exception e) { - lastExceptionGot = e; - } - } + final PathAndNode pathAndNode = ZKPaths.getPathAndNode(zLock.getPath()); + final String lockParentPath = pathAndNode.getPath(); + // Delete the parent node if all the children have been deleted + final Stat stat = curatorFramework.checkExists().forPath(lockParentPath); + if (stat != null && stat.getNumChildren() == 0) { + curatorFramework.delete().forPath(lockParentPath); } + } catch (Exception e) { + // This clean up is "nice to have" do not exit on error here + ThreadUtils.checkInterrupted(e); + } + } + + /** + * Release all locks - including PERSISTENT locks. + */ + public static void releaseAllLocks(final HiveConf conf) throws Exception { + final String nameSpace = + conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); + Preconditions.checkState(nameSpace != null); - // if we got exception during doing the unlock, rethrow it here - if(lastExceptionGot != null) { - throw lastExceptionGot; + final CuratorFramework curatorFramework = + CuratorFrameworkSingleton.getInstance(conf); + + final String namespacePath = ZKPaths.makePath(nameSpace, ""); + + try { + LOG.debug("Deleting entire Hive lock namespace: {}", namespacePath); + curatorFramework.delete().deletingChildrenIfNeeded() + .forPath(namespacePath); + } catch (KeeperException ke) { + if (ke.code() == Code.NONODE) { + LOG.debug("Namespace node not present"); + } else { + throw ke; } - } catch (Exception e) { - LOG.error("Failed to release all locks: ", e); - throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); } } - /* Get all locks */ + /** + * Get all locks. + */ @Override - public List getLocks(boolean verifyTablePartition, boolean fetchData) - throws LockException { - return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData); + public List getLocks(boolean verifyTablePartition, + boolean fetchData) throws LockException { + return getLocks(ctx.getConf(), null, zkNamespace, verifyTablePartition, + fetchData); } - /* Get all locks for a particular object */ + /** + * Get all locks for a particular object. + */ @Override - public List getLocks(HiveLockObject key, boolean verifyTablePartitions, - boolean fetchData) throws LockException { - return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData); + public List getLocks(HiveLockObject key, + boolean verifyTablePartitions, boolean fetchData) throws LockException { + return getLocks(ctx.getConf(), key, zkNamespace, verifyTablePartitions, + fetchData); } /** - * @param conf Hive configuration - * @param key The object to be compared against - if key is null, then get all locks + * @param conf Hive configuration + * @param key The object to be compared against - if key is null, then get all + * locks **/ - private static List getLocks(HiveConf conf, - HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData) + private List getLocks(HiveConf conf, HiveLockObject key, + String parent, boolean verifyTablePartition, boolean fetchData) throws LockException { - List locks = new ArrayList(); - List children; + final List locks = new ArrayList<>(); + final String commonParent; + List children = Collections.emptyList(); boolean recurse = true; - String commonParent; try { if (key != null) { - commonParent = "/" + parent + "/" + key.getName(); - children = curatorFramework.getChildren().forPath(commonParent); + commonParent = ZKPaths.makePath(parent, key.getName()); recurse = false; + } else { + commonParent = ZKPaths.makePath(parent, ""); } - else { - commonParent = "/" + parent; - children = curatorFramework.getChildren().forPath(commonParent); - } + children = curatorFramework.getChildren().forPath(commonParent); } catch (Exception e) { // no locks present return locks; } - Queue childn = new LinkedList(); - if (children != null && !children.isEmpty()) { - for (String child : children) { - childn.add(commonParent + "/" + child); - } + final Queue childn = new ArrayDeque<>(); + for (String child : children) { + childn.add(ZKPaths.makePath(commonParent, child)); } - while (true) { + while (!childn.isEmpty()) { String curChild = childn.poll(); - if (curChild == null) { - return locks; - } if (recurse) { try { children = curatorFramework.getChildren().forPath(curChild); for (String child : children) { - childn.add(curChild + "/" + child); + childn.add(ZKPaths.makePath(curChild, child)); } } catch (Exception e) { + LOG.debug("Exception while getting locks for: {}", curChild, e); // nothing to do } } - HiveLockMode mode = getLockMode(curChild); + final HiveLockMode mode = getLockMode(curChild); if (mode == null) { continue; } HiveLockObjectData data = null; + // set the lock object with a dummy data, and then do a set if needed. - HiveLockObject obj = getLockObject(conf, curChild, mode, data, parent, verifyTablePartition); + HiveLockObject obj = getLockObject(conf, curChild, mode, data, parent, + verifyTablePartition); if (obj == null) { continue; } - if ((key == null) || - (obj.getName().equals(key.getName()))) { + if ((key == null) || (obj.getName().equals(key.getName()))) { if (fetchData) { try { - data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild))); - data.setClientIp(clientIp); + data = new HiveLockObjectData( + new String(curatorFramework.getData().forPath(curChild), + StandardCharsets.UTF_8)); + data.setClientIp(clientIP); } catch (Exception e) { - LOG.error("Error in getting data for " + curChild, e); + LOG.warn("Error in getting data for {}", curChild, e); // ignore error } } obj.setData(data); - HiveLock lck = (new ZooKeeperHiveLock(curChild, obj, mode)); - locks.add(lck); + locks.add(new ZooKeeperHiveLock(curChild, obj, mode)); } } + return locks; } - /** Remove all redundant nodes **/ + /** + * Remove all redundant nodes. + */ private void removeAllRedundantNodes() { try { - checkRedundantNode("/" + parent); + checkRedundantNode(ZKPaths.makePath(zkNamespace, "")); } catch (Exception e) { LOG.warn("Exception while removing all redundant nodes", e); } @@ -704,62 +746,49 @@ private void checkRedundantNode(String node) { List children = curatorFramework.getChildren().forPath(node); for (String child : children) { - checkRedundantNode(node + "/" + child); + checkRedundantNode(ZKPaths.makePath(node, child)); } - children = curatorFramework.getChildren().forPath(node); - if ((children == null) || (children.isEmpty())) - { + final Stat stat = curatorFramework.checkExists().forPath(node); + if (stat != null && stat.getNumChildren() == 0) { curatorFramework.delete().forPath(node); } } catch (Exception e) { - LOG.warn("Error in checkRedundantNode for node " + node, e); + LOG.warn("Error in checkRedundantNode for node {}", node, e); } } - /* Release all transient locks, by simply closing the client */ + /** + * Release all transient locks, by simply closing the client. + */ @Override public void close() throws LockException { - try { - - if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { + try { + if (HiveConf.getBoolVar(ctx.getConf(), + HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) { removeAllRedundantNodes(); } - } catch (Exception e) { - LOG.error("Failed to close zooKeeper client: " + e); + LOG.error("Failed to close zooKeeper client", e); throw new LockException(e); } } /** - * Get the sequence number from the path. The sequence number is always at the end of the path. - **/ - private int getSequenceNumber(String resPath, String path) { - String tst = resPath.substring(path.length()); - try { - return Integer.parseInt(tst); - } catch (Exception e) { - return -1; // invalid number - } - } - - /** - * Get the object from the path of the lock. - * The object may correspond to a table, a partition or a parent to a partition. - * For eg: if Table T is partitioned by ds, hr and ds=1/hr=1 is a valid partition, - * the lock may also correspond to T@ds=1, which is not a valid object - * @param verifyTablePartition + * Get the object from the path of the lock. The object may correspond to a + * table, a partition or a parent to a partition. For eg: if Table T is + * partitioned by ds, hr and ds=1/hr=1 is a valid partition, the lock may also + * correspond to T@ds=1, which is not a valid object **/ private static HiveLockObject getLockObject(HiveConf conf, String path, - HiveLockMode mode, HiveLockObjectData data, - String parent, boolean verifyTablePartition) - throws LockException { + HiveLockMode mode, HiveLockObjectData data, String parent, + boolean verifyTablePartition) throws LockException { try { - Hive db = Hive.get(conf); - int indx = path.lastIndexOf("LOCK-" + mode.toString()); - String objName = path.substring(("/" + parent + "/").length(), indx-1); - String[] names = objName.split("/"); + final Hive db = Hive.get(conf); + final PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); + final String objectPath = StringUtils.removeStart(pathAndNode.getPath(), + ZKPaths.makePath(parent, "")); + final String[] names = ZKPaths.split(objectPath).toArray(new String[0]); if (names.length < 2) { return null; @@ -779,8 +808,8 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, return new HiveLockObject(tab, data); } - Map partSpec = new HashMap(); - for (indx = 2; indx < names.length; indx++) { + Map partSpec = new HashMap<>(); + for (int indx = 2; indx < names.length; indx++) { String[] partVals = names[indx].split("="); partSpec.put(partVals[0], partVals[1]); } @@ -793,30 +822,26 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, } if (partn == null) { - return new HiveLockObject(new DummyPartition(tab, path, partSpec), data); + partn = new DummyPartition(tab, path, partSpec); } return new HiveLockObject(partn, data); } catch (Exception e) { - LOG.error("Failed to create ZooKeeper object: " + e); throw new LockException(e); } } - private static Pattern shMode = Pattern.compile("^.*-(SHARED)-([0-9]+)$"); - private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$"); + private static Pattern shMode = Pattern.compile("^.*LOCK-(SHARED)-([0-9]+)$"); + private static Pattern exMode = Pattern.compile("^.*LOCK-(EXCLUSIVE)-([0-9]+)$"); /* Get the mode of the lock encoded in the path */ private static HiveLockMode getLockMode(String path) { - Matcher shMatcher = shMode.matcher(path); - Matcher exMatcher = exMode.matcher(path); - - if (shMatcher.matches()) { + if (shMode.matcher(path).matches()) { return HiveLockMode.SHARED; } - if (exMatcher.matches()) { + if (exMode.matcher(path).matches()) { return HiveLockMode.EXCLUSIVE; } @@ -826,5 +851,4 @@ private static HiveLockMode getLockMode(String path) { @Override public void prepareRetry() throws LockException { } - } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java index 4482f86..3b2864c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java @@ -18,33 +18,27 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Paths; - +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.zookeeper.KeeperException; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.test.TestingServer; +import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.After; import org.junit.Test; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - public class TestZookeeperLockManager { private HiveConf conf; @@ -138,7 +132,7 @@ public void testMetrics() throws Exception{ HiveLockManagerCtx ctx = new HiveLockManagerCtx(conf); ZooKeeperHiveLockManager zMgr= new ZooKeeperHiveLockManager(); zMgr.setContext(ctx); - ZooKeeperHiveLock curLock = zMgr.lock(hiveLock, HiveLockMode.SHARED, false); + HiveLock curLock = zMgr.lock(hiveLock, HiveLockMode.SHARED, false); String json = metrics.dumpJson(); MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.COUNTER, MetricsConstant.ZOOKEEPER_HIVE_SHAREDLOCKS, 1);