diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8ffae3b..71f6c8c 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1763,6 +1763,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Initial amount of time (in milliseconds) to wait between retries\n" + "when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy."), + HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS("hive.zookeeper.release.stale.locks", false, + "Whether to release stale zookeeper locks at HiveServer2 startup time which are\n" + + "created by previous instances of this server. The server is identified by the IP\n" + + "address, so if there is a problem determining the IP address or there are multiple\n" + + "instances of HiveServer2 on the same IP; setting this configuration to true will\n" + + "release every locks created by these servers too. In this case remove the stale locks\n" + + "manually using the UNLOCK TABLE, UNLOCK DATABASE sql commands"), // Transactions HIVE_TXN_MANAGER("hive.txn.manager", diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 14d0ef4..1d2e6d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -55,7 +55,8 @@ private int numRetriesForLock; private int numRetriesForUnLock; - private static String clientIp; + @VisibleForTesting + protected static String clientIp; static { clientIp = "UNKNOWN"; @@ -110,6 +111,27 @@ public void refresh() { } /** + * Release all locks created by this server, identified by the clientIp + */ + public void releaseLocksByThisInstance() throws LockException { + List currentLocks = getLocks(false, true); + List locksToRelease = new ArrayList(); + for (HiveLock lock: currentLocks) { + if (lock.getHiveLockObject() != null && lock.getHiveLockObject().getData()!=null && + lock.getHiveLockObject().getData().getClientIp() !=null && + lock.getHiveLockObject().getData().getClientIp().equals(clientIp)) + { + locksToRelease.add(lock); + } + } + if (locksToRelease.size() > 0) { + LOG.info("Found {} lock(s) using clientIp: [{}], releasing them.", locksToRelease.size(), + clientIp); + releaseLocks(locksToRelease); + } + } + + /** * @param key object to be locked * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1, * the last name would be db/T/ds=1/hr=1 @@ -623,7 +645,9 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { if (fetchData) { try { data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild))); - data.setClientIp(clientIp); + if (data.getClientIp() == null) { + data.setClientIp(clientIp); + } } catch (Exception e) { LOG.error("Error in getting data for " + curChild, e); // ignore error @@ -706,7 +730,6 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, String parent, boolean verifyTablePartition) throws LockException { try { - Hive db = Hive.get(conf); int indx = path.lastIndexOf("LOCK-" + mode.toString()); String objName = path.substring(("/" + parent + "/").length(), indx-1); String[] names = objName.split("/"); @@ -720,6 +743,7 @@ private static HiveLockObject getLockObject(HiveConf conf, String path, } // do not throw exception if table does not exist + Hive db = Hive.get(conf); Table tab = db.getTable(names[0], names[1], false); if (tab == null) { return null; diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java index 3f9926e..e8eff1b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java @@ -18,9 +18,8 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Paths; +import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.common.metrics.MetricsTestUtils; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -28,6 +27,7 @@ import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; @@ -38,41 +38,33 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.After; +import org.junit.BeforeClass; import org.junit.Test; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - public class TestZookeeperLockManager { private HiveConf conf; - private TestingServer server; + private static TestingServer server; private CuratorFramework client; private HiveLockObject hiveLock; private ZooKeeperHiveLock zLock; private HiveLockObjectData lockObjData; private static final String PARENT = "hive"; - private static final String TABLE = "t1"; - private static final String PARENT_LOCK_PATH = "/hive/t1"; - private static final String TABLE_LOCK_PATH = "/hive/t1/00001"; - - @Before - public void setup() { - conf = new HiveConf(); - lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables"); - hiveLock = new HiveLockObject(TABLE, lockObjData); - zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED); + private static final String TABLE = "default/t1"; + private static final String PARENT_LOCK_PATH = "/hive/default/t1"; + private static final String TABLE_LOCK_PATH = "/hive/default/t1/00001"; + @BeforeClass + public static void startServer() { while (server == null) { try { server = new TestingServer(); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); - client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); - client.start(); + server.start(); } catch (Exception e) { System.err.println("Getting bind exception - retrying to allocate server"); server = null; @@ -80,12 +72,78 @@ public void setup() { } } + @Before + public void setup() { + conf = new HiveConf(); + lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables"); + hiveLock = new HiveLockObject(TABLE, lockObjData); + zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + } + @After public void teardown() throws Exception { client.close(); + } + + @AfterClass + public static void stopServer() throws IOException { server.close(); - server = null; + } + + @Test + public void testReleaseStaleLocks() throws Exception { + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "localhost"); + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, String.valueOf(server.getPort())); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS, true); + + HiveLockManagerCtx ctx = new HiveLockManagerCtx(conf); + ZooKeeperHiveLockManager zMgr = new ZooKeeperHiveLockManager(); + zMgr.setContext(ctx); + + // Create a new lock with the original IP + zMgr.lock(hiveLock, HiveLockMode.SHARED, true); + List locks = zMgr.getLocks(false, true); + Assert.assertEquals(locks.size(), 1l); + + zMgr.releaseLocksByThisInstance(); + locks = zMgr.getLocks(false, true); + Assert.assertEquals(locks.size(), 0l); + zMgr.close(); + } + + @Test + public void testNotReleaseOthersStaleLocks() throws Exception { + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "localhost"); + conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, String.valueOf(server.getPort())); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS, true); + + HiveLockManagerCtx ctx = new HiveLockManagerCtx(conf); + ZooKeeperHiveLockManager zMgr = new ZooKeeperHiveLockManager(); + zMgr.setContext(ctx); + + // Create a new lock with the original IP + zMgr.lock(hiveLock, HiveLockMode.SHARED, true); + List locks = zMgr.getLocks(false, true); + Assert.assertEquals(locks.size(), 1l); + + // Set the IP to another + zMgr.clientIp = "OTHER_IP"; + zMgr.releaseLocksByThisInstance(); + locks = zMgr.getLocks(false, true); + // Ensure the lock is not removed + Assert.assertEquals(locks.size(), 1l); + + // Cleanup + zMgr.unlock(locks.get(0)); + zMgr.close(); } @Test diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 590b1f3..b61f3f5 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -51,9 +51,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -114,6 +118,9 @@ public synchronized void init(HiveConf hiveConf) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } + if (hiveConf.getBoolVar(ConfVars.HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS)) { + releaseStaleLocks(hiveConf); + } cliService = new CLIService(this); addService(cliService); final HiveServer2 hiveServer2 = this; @@ -650,6 +657,28 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce zooKeeperClient.close(); } + private void releaseStaleLocks(HiveConf hiveConf) { + try { + HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + HiveLockManager lockManager = txnMgr.getLockManager(); + if (lockManager == null) { + LOG.warn(ConfVars.HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS.varname + " is set, but failed to get" + + " LockManager. Is " + ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + " set to false?" + + " Not releasing any lock."); + return; + } + if (!(lockManager instanceof ZooKeeperHiveLockManager)) { + LOG.warn(ConfVars.HIVE_ZOOKEEPER_RELEASE_STALE_LOCKS.varname + " is set, but the " + + "LockManager is not an instance of ZooKeeperHiveLockManager. Not releasing any " + + "lock"); + return; + } + ((ZooKeeperHiveLockManager)lockManager).releaseLocksByThisInstance(); + } catch (LockException le) { + LOG.warn("Could not release stale locks", le); + } + } + private static class DeleteCallBack implements BackgroundCallback { @Override public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event)