Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 959479) +++ conf/hive-default.xml (working copy) @@ -576,6 +576,36 @@ + hive.support.concurrency + true + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + + + hive.concurrency.manager + org.apache.hadoop.hive.ql.lockmgr.ZooKeeperLockMgr + The concurrency manager for hive. + + + + hive.lock.numretries + 100 + The number of times you want to try to get all the locks + + + + hive.lock.sleep.between.retries + 60 + The sleep time (in seconds) between various retries + + + + hive.zookeeper.quorum + + The list of zookeeper servers to talk to. This is only needed for read/write locks. + + + fs.har.impl org.apache.hadoop.hive.shims.HiveHarFileSystem The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20 Index: data/conf/hive-site.xml =================================================================== --- data/conf/hive-site.xml (revision 959479) +++ data/conf/hive-site.xml (working copy) @@ -145,4 +145,10 @@ Track progress of a task + + hive.support.concurrency + true + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 959479) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -92,6 +92,7 @@ DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000), DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100), DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"), + DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__"), // hadoop stuff HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"), @@ -252,14 +253,25 @@ HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), + HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), + HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), + HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60), + + HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", ""), + HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""), + HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 60*1000), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true), // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false), HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false), + ; + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: ql/src/test/results/clientnegative/lockneg3.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) @@ -0,0 +1,4 @@ +PREHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table srcpart is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg2.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) @@ -0,0 +1,4 @@ +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table src is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg1.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) @@ -0,0 +1,12 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE src EXCLUSIVE +PREHOOK: type: LOCKTABLE +conflicting lock present src cannot be locked in mode EXCLUSIVE +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientpositive/lock1.q.out =================================================================== --- ql/src/test/results/clientpositive/lock1.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock1.q.out (revision 0) @@ -0,0 +1,44 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +src SHARED +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS Index: ql/src/test/results/clientpositive/lock2.q.out =================================================================== --- ql/src/test/results/clientpositive/lock2.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock2.q.out (revision 0) @@ -0,0 +1,46 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE srcpart SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE srcpart SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +srcpart SHARED +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +srcpart SHARED +PREHOOK: query: UNLOCK TABLE srcpart +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE srcpart +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS Index: ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (revision 959479) +++ ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (working copy) @@ -46,7 +46,7 @@ qfiles[i] = new File(inpDir, testNames[i]); } - boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true); + boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true, this); if (!success) { fail("One or more queries failed"); } Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 959479) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.tools.LineageInfo; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.QTestSetup; /** * TestHiveHistory. @@ -53,6 +54,7 @@ private static Path tmppath = new Path(tmpdir); private static Hive db; private static FileSystem fs; + private QTestSetup setup; /* * intialize the tables @@ -75,6 +77,9 @@ } } + setup = new QTestSetup(this); + setup.preTest(conf); + // copy the test files into hadoop if required. int i = 0; Path[] hadoopDataFile = new Path[2]; @@ -109,6 +114,19 @@ } } + @Override + protected void tearDown() { + try { + setup.tearDown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + /** * Check history file output for this query. */ @@ -133,7 +151,7 @@ SessionState.start(ss); String cmd = "select a.key from src a"; - Driver d = new Driver(); + Driver d = new Driver(conf); int ret = d.run(cmd).getResponseCode(); if (ret != 0) { fail("Failed"); Index: ql/src/test/org/apache/hadoop/hive/ql/QTestSetup.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestSetup.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestSetup.java (revision 0) @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.File; + +import junit.extensions.TestSetup; +import junit.framework.Test; + + +import org.apache.hadoop.hbase.MiniZooKeeperCluster; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * HBaseTestSetup defines HBase-specific test fixtures which are + * reused across testcases. + */ +public class QTestSetup extends TestSetup +{ + private MiniZooKeeperCluster zooKeeperCluster = null; + private int zkPort; + + private static final int NUM_REGIONSERVERS = 1; + + public QTestSetup(Test test) { + super(test); + } + + public void preTest(HiveConf conf) throws Exception { + if (zooKeeperCluster == null) { + // We set up fixtures on demand for the first testcase, and leave + // them allocated for reuse across all others. Then tearDown + // will get called once at the very end after all testcases have + // run, giving us a guaranteed opportunity to shut them down. + String tmpdir = System.getProperty("user.dir")+"/../build/ql/tmp"; + zooKeeperCluster = new MiniZooKeeperCluster(); + zkPort = zooKeeperCluster.startup(new File(tmpdir, "zookeeper")); + String zkServer = "localhost"; + conf.set("hive.zookeeper.quorum", zkServer); + conf.set("hive.zookeeper.client.port", "" + zkPort); + } + } + + @Override + public void tearDown() throws Exception { + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 959479) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -37,6 +37,7 @@ import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import junit.framework.Test; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -98,6 +99,7 @@ private HadoopShims.MiniDFSShim dfs = null; private boolean miniMr = false; private String hadoopVer = null; + private QTestSetup setup = null; public boolean deleteDirectory(File path) { if (path.exists()) { @@ -169,7 +171,7 @@ } public QTestUtil(String outDir, String logDir) throws Exception { - this(outDir, logDir, false, "0.20"); + this(outDir, logDir, false, "0.20", null); } private String getHadoopMainVersion(String input) { @@ -185,8 +187,14 @@ } public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) throws Exception { + this(outDir, logDir, miniMr, hadoopVer, null); + } + + public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer, QTestSetup setup) + throws Exception { this.outDir = outDir; this.logDir = logDir; + this.setup = setup; conf = new HiveConf(Driver.class); this.miniMr = miniMr; this.hadoopVer = getHadoopMainVersion(hadoopVer); @@ -220,10 +228,14 @@ srcTables = new LinkedList(); init(); + if (setup != null) { + setup.preTest(conf); + } } public void shutdown() throws Exception { cleanUp(); + setup.tearDown(); if (dfs != null) { dfs.shutdown(); @@ -959,16 +971,17 @@ * (in terms of destination tables) */ public static boolean queryListRunner(File[] qfiles, String[] resDirs, - String[] logDirs, boolean mt) { + String[] logDirs, boolean mt, Test test) { assert (qfiles.length == resDirs.length); assert (qfiles.length == logDirs.length); boolean failed = false; - try { QTestUtil[] qt = new QTestUtil[qfiles.length]; + QTestSetup[] qsetup = new QTestSetup[qfiles.length]; for (int i = 0; i < qfiles.length; i++) { - qt[i] = new QTestUtil(resDirs[i], logDirs[i]); + qsetup[i] = new QTestSetup(test); + qt[i] = new QTestUtil(resDirs[i], logDirs[i], false, "0.20", qsetup[i]); qt[i].addFile(qfiles[i]); } @@ -1015,6 +1028,10 @@ } } } + + for (int i = 0; i < qfiles.length; i++) { + qsetup[i].tearDown(); + } } catch (Exception e) { e.printStackTrace(); return false; Index: ql/src/test/queries/clientnegative/lockneg2.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg2.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg2.q (revision 0) @@ -0,0 +1 @@ +UNLOCK TABLE src; Index: ql/src/test/queries/clientnegative/lockneg3.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg3.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg3.q (revision 0) @@ -0,0 +1 @@ +UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11'); Index: ql/src/test/queries/clientnegative/lockneg1.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg1.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg1.q (revision 0) @@ -0,0 +1,3 @@ +LOCK TABLE src SHARED; +LOCK TABLE src SHARED; +LOCK TABLE src EXCLUSIVE; Index: ql/src/test/queries/clientpositive/lock1.q =================================================================== --- ql/src/test/queries/clientpositive/lock1.q (revision 0) +++ ql/src/test/queries/clientpositive/lock1.q (revision 0) @@ -0,0 +1,10 @@ +LOCK TABLE src SHARED; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; +LOCK TABLE src SHARED; +SHOW LOCKS; +LOCK TABLE src SHARED; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; Index: ql/src/test/queries/clientpositive/lock2.q =================================================================== --- ql/src/test/queries/clientpositive/lock2.q (revision 0) +++ ql/src/test/queries/clientpositive/lock2.q (revision 0) @@ -0,0 +1,10 @@ +LOCK TABLE src SHARED; +LOCK TABLE srcpart SHARED; +LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; +UNLOCK TABLE srcpart; +SHOW LOCKS; +UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11'); +SHOW LOCKS; Index: ql/src/test/templates/TestCliDriver.vm =================================================================== --- ql/src/test/templates/TestCliDriver.vm (revision 959479) +++ ql/src/test/templates/TestCliDriver.vm (working copy) @@ -8,6 +8,7 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestSetup; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.history.HiveHistoryViewer; import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo; @@ -23,10 +24,12 @@ public class $className extends TestCase { private QTestUtil qt; + private QTestSetup setup; - public $className(String name) { + public $className(String name, QTestSetup setup) { super(name); qt = null; + this.setup = setup; } @Override @@ -39,8 +42,7 @@ miniMR = true; hadoopVer = "$hadoopVersion"; - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR, hadoopVer); - + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR, hadoopVer, setup); } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); @@ -65,13 +67,14 @@ public static Test suite() { TestSuite suite = new TestSuite(); + QTestSetup setup = new QTestSetup(suite); #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) - suite.addTest(new $className("testCliDriver_$tname")); + suite.addTest(new $className("testCliDriver_$tname", setup)); #end - return suite; + return setup; } static String debugHint = "\nSee build/ql/tmp/hive.log, " @@ -90,7 +93,7 @@ if (qt.shouldBeSkipped("$fname")) { return; } - + qt.cliInit("$fname"); int ecode = qt.executeClient("$fname"); if (ecode != 0) { Index: ql/src/test/templates/TestNegativeCliDriver.vm =================================================================== --- ql/src/test/templates/TestNegativeCliDriver.vm (revision 959479) +++ ql/src/test/templates/TestNegativeCliDriver.vm (working copy) @@ -8,6 +8,7 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestSetup; import org.apache.hadoop.hive.ql.exec.Task; import org.antlr.runtime.*; @@ -16,16 +17,18 @@ public class $className extends TestCase { private QTestUtil qt; + private QTestSetup setup; - public $className(String name) { + public $className(String name, QTestSetup setup) { super(name); qt = null; + this.setup = setup; } @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20", setup); } catch (Throwable e) { @@ -35,15 +38,29 @@ } } + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); + QTestSetup setup = new QTestSetup(suite); #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) - suite.addTest(new $className("testNegativeCliDriver_$tname")); + suite.addTest(new $className("testNegativeCliDriver_$tname", setup)); #end - return suite; + return setup; } static String debugHint = "\nSee build/ql/tmp/hive.log, " @@ -63,7 +80,7 @@ System.out.println("Test $fname skipped"); return; } - + qt.cliInit("$fname"); int ecode = qt.executeClient("$fname"); if (ecode == 0) { Index: ql/src/test/templates/TestParse.vm =================================================================== --- ql/src/test/templates/TestParse.vm (revision 959479) +++ ql/src/test/templates/TestParse.vm (working copy) @@ -8,21 +8,24 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestSetup; import org.apache.hadoop.hive.ql.exec.Task; public class $className extends TestCase { private QTestUtil qt; + private QTestSetup setup; - public $className(String name) { + public $className(String name, QTestSetup setup) { super(name); qt = null; + this.setup = setup; } @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20", setup); } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); @@ -32,13 +35,28 @@ } } + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); + QTestSetup setup = new QTestSetup(suite); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) - suite.addTest(new $className("testParse_$tname")); + suite.addTest(new $className("testParse_$tname", setup)); #end return suite; } Index: ql/src/test/templates/TestParseNegative.vm =================================================================== --- ql/src/test/templates/TestParseNegative.vm (revision 959479) +++ ql/src/test/templates/TestParseNegative.vm (working copy) @@ -8,21 +8,24 @@ import java.util.*; import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.hive.ql.QTestSetup; import org.apache.hadoop.hive.ql.exec.Task; public class $className extends TestCase { private QTestUtil qt; + private QTestSetup setup; - public $className(String name) { + public $className(String name, QTestSetup setup) { super(name); qt = null; + this.setup = setup; } @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20", setup); } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); @@ -32,13 +35,28 @@ } } + @Override + protected void tearDown() { + try { + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); + QTestSetup setup = new QTestSetup(suite); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) - suite.addTest(new $className("testParseNegative_$tname")); + suite.addTest(new $className("testParseNegative_$tname", setup)); #end return suite; } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TMemoryBuffer; + +/** + * A Hive Table Partition: is a fundamental storage unit within a Table. Currently, Hive does not support + * hierarchical partitions - For eg: if partition ds=1, hr=1 exists, there is no way to access ds=1 + * + * Hierarchical partitions are needed in some cases, for eg. locking. For now, create a dummy partition to + * satisfy this + */ +public class DummyPartition extends Partition { + + @SuppressWarnings("nls") + static final private Log LOG = LogFactory + .getLog("hive.ql.metadata.DummyPartition"); + + private String name; + public DummyPartition() { + } + + public DummyPartition(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -80,11 +81,18 @@ import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.MsckDesc; import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -210,6 +218,21 @@ return showFunctions(showFuncs); } + ShowLocksDesc showLocks = work.getShowLocksDesc(); + if (showLocks != null) { + return showLocks(showLocks); + } + + LockTableDesc lockTbl = work.getLockTblDesc(); + if (lockTbl != null) { + return lockTable(lockTbl); + } + + UnlockTableDesc unlockTbl = work.getUnlockTblDesc(); + if (unlockTbl != null) { + return unlockTable(unlockTbl); + } + ShowPartitionsDesc showParts = work.getShowPartsDesc(); if (showParts != null) { return showPartitions(db, showParts); @@ -1094,6 +1117,145 @@ } /** + * Write a list of the current locks to a file. + * + * @param showLocks + * the locks we're interested in. + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int showLocks(ShowLocksDesc showLocks) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("show Locks LockManager not specified"); + } + + // write the results in the file + try { + Path resFile = new Path(showLocks.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = fs.create(resFile); + Set locks = lockMgr.getLocks(); + Iterator locksIter = locks.iterator(); + + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + outStream.writeBytes(lock.getHiveLockObject().getName(false)); + outStream.write(separator); + outStream.writeBytes(lock.getHiveLockMode().toString()); + outStream.write(terminator); + } + ((FSDataOutputStream) outStream).close(); + } catch (FileNotFoundException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (IOException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (Exception e) { + throw new HiveException(e.toString()); + } + return 0; + } + + /** + * Lock the table/partition specified + * + * @param lockTbl + * the table/partition to be locked along with the mode + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int lockTable(LockTableDesc lockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("lock Table LockManager not specified"); + } + + HiveLockMode mode = HiveLockMode.valueOf(lockTbl.getMode()); + String tabName = lockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = lockTbl.getPartSpec(); + if (partSpec == null) { + HiveLock lck = lockMgr.lock(new HiveLockObject(tbl), mode); + if (lck == null) { + console.printError("conflicting lock present " + tbl + " cannot be locked in mode " + mode); + return 1; + + } + return 0; + } + + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + HiveLock lck = lockMgr.lock(new HiveLockObject(par), mode); + if (lck == null) { + console.printError("conflicting lock present " + tbl + " cannot be locked in mode " + mode); + return 1; + } + return 0; + } + + /** + * Unlock the table/partition specified + * + * @param unlockTbl + * the table/partition to be unlocked + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int unlockTable(UnlockTableDesc unlockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("unlock Table LockManager not specified"); + } + + String tabName = unlockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = unlockTbl.getPartSpec(); + HiveLockObject obj = null; + + if (partSpec == null) { + obj = new HiveLockObject(tbl); + } + else { + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + obj = new HiveLockObject(par); + } + + Set locks = lockMgr.getLocks(obj); + if ((locks == null) || (locks.isEmpty())) { + throw new HiveException("Table " + tabName + " is not locked "); + } + Iterator locksIter = locks.iterator(); + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + lockMgr.unlock(lock); + } + + return 0; + } + + /** * Shows a description of a function. * * @param descFunc Index: ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * UnlockTableDesc. + * + */ +@Explain(displayName = "Lock Table") +public class UnlockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private Map partSpec; + + public UnlockTableDesc() { + } + + public UnlockTableDesc(String tableName, Map partSpec) { + this.tableName = tableName; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy) @@ -37,7 +37,10 @@ private DropTableDesc dropTblDesc; private AlterTableDesc alterTblDesc; private ShowTablesDesc showTblsDesc; + private LockTableDesc lockTblDesc; + private UnlockTableDesc unlockTblDesc; private ShowFunctionsDesc showFuncsDesc; + private ShowLocksDesc showLocksDesc; private DescFunctionDesc descFunctionDesc; private ShowPartitionsDesc showPartsDesc; private DescTableDesc descTblDesc; @@ -138,6 +141,26 @@ } /** + * @param lockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + LockTableDesc lockTblDesc) { + this(inputs, outputs); + + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + UnlockTableDesc unlockTblDesc) { + this(inputs, outputs); + + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param showFuncsDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -148,6 +171,16 @@ } /** + * @param showLocksDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + ShowLocksDesc showLocksDesc) { + this(inputs, outputs); + + this.showLocksDesc = showLocksDesc; + } + + /** * @param descFuncDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -312,6 +345,30 @@ } /** + * @return the showLocksDesc + */ + @Explain(displayName = "Show Lock Operator") + public ShowLocksDesc getShowLocksDesc() { + return showLocksDesc; + } + + /** + * @return the lockTblDesc + */ + @Explain(displayName = "Lock Table Operator") + public LockTableDesc getLockTblDesc() { + return lockTblDesc; + } + + /** + * @return the unlockTblDesc + */ + @Explain(displayName = "Unlock Table Operator") + public UnlockTableDesc getUnlockTblDesc() { + return unlockTblDesc; + } + + /** * @return the descFuncDesc */ @Explain(displayName = "Show Function Operator") @@ -328,6 +385,30 @@ } /** + * @param showLocksDesc + * the showLocksDesc to set + */ + public void setShowLocksDesc(ShowLocksDesc showLocksDesc) { + this.showLocksDesc = showLocksDesc; + } + + /** + * @param lockTblDesc + * the lockTblDesc to set + */ + public void setLockTblDesc(LockTableDesc lockTblDesc) { + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + * the unlockTblDesc to set + */ + public void setUnlockTblDesc(UnlockTableDesc unlockTblDesc) { + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param descFuncDesc * the showFuncsDesc to set */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +import org.apache.hadoop.fs.Path; + +/** + * ShowLocksDesc. + * + */ +@Explain(displayName = "Show Locks") +public class ShowLocksDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + String resFile; + + /** + * table name for the result of show locks. + */ + private static final String table = "showlocks"; + /** + * thrift ddl for the result of show locks. + */ + private static final String schema = "tab_name,mode#string:string"; + + public String getTable() { + return table; + } + + public String getSchema() { + return schema; + } + + public ShowLocksDesc() { + } + + /** + * @param resFile + */ + public ShowLocksDesc(Path resFile) { + this.resFile = resFile.toString(); + } + + /** + * @return the resFile + */ + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { + return resFile; + } + + /** + * @param resFile + * the resFile to set + */ + public void setResFile(String resFile) { + this.resFile = resFile; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * LockTableDesc. + * + */ +@Explain(displayName = "Lock Table") +public class LockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private String mode; + private Map partSpec; + + public LockTableDesc() { + } + + public LockTableDesc(String tableName, String mode, Map partSpec) { + this.tableName = tableName; + this.mode = mode; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import java.util.Set; + +public interface HiveLockManager { + + public void setContext(HiveLockManagerCtx ctx) throws LockException; + public HiveLock lock(HiveLockObject key, HiveLockMode mode) throws LockException; + public void unlock(HiveLock hiveLock) throws LockException; + + public Set getLocks() throws LockException; + public Set getLocks(HiveLockObject key) throws LockException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import org.apache.commons.lang.StringEscapeUtils; + +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +public class ZooKeeperHiveLockManager implements HiveLockManager { + HiveLockManagerCtx ctx; + public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager"); + static final private LogHelper console = new LogHelper(LOG); + + private ZooKeeper zooKeeper; + + public ZooKeeperHiveLockManager() { + } + + private String getQuorumServers(HiveConf conf) { + String host = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM); + String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); + return host + ":" + port; + } + + public void setContext(HiveLockManagerCtx ctx) throws LockException { + this.ctx = ctx; + HiveConf conf = ctx.getConf(); + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String quorumServers = getQuorumServers(conf); + + try { + zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, null); + } catch (IOException e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(e); + } + } + + private String getObjectName(HiveLockObject key, HiveLockMode mode) { + return "/" + key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) + "-" + mode + "-"; + } + + public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode) throws LockException { + String name = getObjectName(key, mode); + String res; + + try { + res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + int seqNo = getSequenceNumber(res, name); + if (seqNo == -1) { + zooKeeper.delete(res, -1); + return null; + } + + List children = zooKeeper.getChildren("/", false); + + String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE); + String shLock = getObjectName(key, HiveLockMode.SHARED); + + for (String child : children) { + child = "/" + child; + + // Is there a conflicting lock on the same object with a lower sequence number + int childSeq = seqNo; + if (child.startsWith(exLock)) { + childSeq = getSequenceNumber(child, exLock); + } + if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { + childSeq = getSequenceNumber(child, shLock); + } + + if ((childSeq >= 0) && (childSeq < seqNo)) { + zooKeeper.delete(res, -1); + return null; + } + } + + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper lock: " + e); + throw new LockException(e); + } + + return new ZooKeeperHiveLock(res, key, mode); + } + + public void unlock(HiveLock hiveLock) throws LockException { + ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; + try { + zooKeeper.delete(zLock.getPath(), -1); + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper lock: " + e); + throw new LockException(e); + } + } + + public Set getLocks() throws LockException { + return getLocks(null); + } + + public Set getLocks(HiveLockObject key) throws LockException { + Set locks = new LinkedHashSet(); + List children; + + try { + children = zooKeeper.getChildren("/", false); + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper children: " + e); + throw new LockException(e); + } + + for (String child : children) { + child = "/" + child; + + HiveLockMode mode = getLockMode(child); + if (mode == null) { + continue; + } + + HiveLockObject obj = getLockObject(child, mode); + if ((key == null) || (obj.getName().equals(key.getName()))) { + HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode)); + locks.add(lck); + } + } + + return locks; + } + + private int getSequenceNumber(String resPath, String path) { + String tst = resPath.substring(path.length()); + try { + return (new Integer(tst)).intValue(); + } catch (Exception e) { + return -1; // invalid number + } + } + + private HiveLockObject getLockObject(String path, HiveLockMode mode) throws LockException { + try { + Hive db = Hive.get(ctx.getConf()); + int indx = path.lastIndexOf(mode.toString()); + String objName = path.substring(0, indx-1); + + String[] names = objName.split("@"); + Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, + names[1], false); // do not throw exception if table does not exist + assert (tab != null); + + if (names.length == 2) { + return new HiveLockObject(tab); + } + + String[] parts = names[2].split(ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); + + Map partSpec = new HashMap(); + for (indx = 0; indx < parts.length; indx++) { + String[] partVals = parts[indx].split("="); + partSpec.put(partVals[0], partVals[1]); + } + + Partition partn = db.getPartition(tab, partSpec, false); + if (partn == null) { + return new HiveLockObject(new DummyPartition(objName)); + } + + return new HiveLockObject(partn); + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(e); + } + } + + private static Pattern shMode = Pattern.compile("^.*-(SHARED)-([0-9]+)$"); + private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$"); + + private HiveLockMode getLockMode(String path) { + + Matcher shMatcher = shMode.matcher(path); + Matcher exMatcher = exMode.matcher(path); + + if (shMatcher.matches()) + return HiveLockMode.SHARED; + + if (exMatcher.matches()) { + return HiveLockMode.EXCLUSIVE; + } + + return null; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; + +public class ZooKeeperHiveLock extends HiveLock { + private String path; + private HiveLockObject obj; + private HiveLockMode mode; + + public ZooKeeperHiveLock(String path, HiveLockObject obj, HiveLockMode mode) { + this.path = path; + this.obj = obj; + this.mode = mode; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public HiveLockObject getHiveLockObject() { + return obj; + } + + public void setHiveLockObject(HiveLockObject obj) { + this.obj = obj; + } + + public HiveLockMode getHiveLockMode() { + return mode; + } + + public void setHiveLockMode(HiveLockMode mode) { + this.mode = mode; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive Lock Manager interfaces and some custom implmentations + */ +package org.apache.hadoop.hive.ql.lockmgr; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +public abstract class HiveLock { + public abstract HiveLockObject getHiveLockObject(); + public abstract HiveLockMode getHiveLockMode(); +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Exception from lock manager. + */ + +public class LockException extends HiveException { + + private static final long serialVersionUID = 1L; + + public LockException() { + super(); + } + + public LockException(String message) { + super(message); + } + + public LockException(Throwable cause) { + super(cause); + } + + public LockException(String message, Throwable cause) { + super(message, cause); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public enum HiveLockMode { + SHARED, EXCLUSIVE; +} + Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; + +public class HiveLockObject { + /** + * The table. + */ + private Table t; + + /** + * The partition. This is null for a non partitioned table. + */ + private Partition p; + + public HiveLockObject() { + this.t = null; + this.p = null; + } + + public HiveLockObject(Table t) { + this.t = t; + this.p = null; + } + + public HiveLockObject(Partition p) { + this.t = null; + this.p = p; + } + + public Table getTable() { + return t; + } + + public void setTable (Table t) { + this.t = t; + } + + public Partition getPartition() { + return p; + } + + public void setPartition (Partition p) { + this.p = p; + } + + public String getName() { + return getName(true); + } + + public String getName(boolean includeDB) { + + if (t != null) { + if (includeDB) { + return t.getDbName() + "@" + t.getTableName(); + } + else { + return t.getTableName(); + } + } + + if (p != null) { + if (includeDB) { + return p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + p.getName(); + } + else { + return p.getTable().getTableName() + "@" + p.getName(); + } + } + + return ""; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public class HiveLockManagerCtx { + HiveConf conf; + + public HiveLockManagerCtx() { + } + + public HiveLockManagerCtx(HiveConf conf) { + this.conf = conf; + } + + public HiveConf getConf() { + return conf; + } + + public void setConf(HiveConf conf) { + this.conf = conf; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.List; import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; @@ -37,6 +38,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for @@ -81,6 +84,10 @@ String executionId; + // List of Locks for this query + protected List hiveLocks; + protected HiveLockManager hiveLockMgr; + public Context(HiveConf conf) throws IOException { this(conf, generateExecutionId()); } @@ -486,4 +493,20 @@ public boolean isLocalOnlyExecutionMode() { return conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local"); } + + public List getHiveLocks() { + return hiveLocks; + } + + public void setHiveLocks(List hiveLocks) { + this.hiveLocks = hiveLocks; + } + + public HiveLockManager getHiveLockMgr() { + return hiveLockMgr; + } + + public void setHiveLockMgr(HiveLockManager hiveLockMgr) { + this.hiveLockMgr = hiveLockMgr; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -107,6 +107,9 @@ TOK_SHOWFUNCTIONS; TOK_SHOWPARTITIONS; TOK_SHOW_TABLESTATUS; +TOK_SHOWLOCKS; +TOK_LOCKTABLE; +TOK_UNLOCKTABLE; TOK_DROPTABLE; TOK_TABCOLLIST; TOK_TABCOL; @@ -219,6 +222,8 @@ | dropViewStatement | createFunctionStatement | dropFunctionStatement + | lockStatement + | unlockStatement ; ifNotExists @@ -446,8 +451,27 @@ | KW_SHOW KW_PARTITIONS Identifier partitionSpec? -> ^(TOK_SHOWPARTITIONS Identifier partitionSpec?) | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=Identifier)? KW_LIKE showStmtIdentifier partitionSpec? -> ^(TOK_SHOW_TABLESTATUS showStmtIdentifier $db_name? partitionSpec?) + | KW_SHOW KW_LOCKS -> ^(TOK_SHOWLOCKS) ; +lockStatement +@init { msgs.push("lock statement"); } +@after { msgs.pop(); } + : KW_LOCK KW_TABLE Identifier partitionSpec? lockMode -> ^(TOK_LOCKTABLE Identifier lockMode partitionSpec?) + ; + +lockMode +@init { msgs.push("lock mode"); } +@after { msgs.pop(); } + : KW_SHARED | KW_EXCLUSIVE + ; + +unlockStatement +@init { msgs.push("unlock statement"); } +@after { msgs.pop(); } + : KW_UNLOCK KW_TABLE Identifier partitionSpec? -> ^(TOK_UNLOCKTABLE Identifier partitionSpec?) + ; + metastoreCheck @init { msgs.push("metastore check statement"); } @after { msgs.pop(); } @@ -881,7 +905,7 @@ @init { msgs.push("select clause"); } @after { msgs.pop(); } : - KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) + KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) | (transform=KW_TRANSFORM selectTrfmClause)) -> {$transform == null && $dist == null}? ^(TOK_SELECT hintClause? selectList) -> {$transform == null && $dist != null}? ^(TOK_SELECTDI hintClause? selectList) @@ -896,7 +920,7 @@ : selectItem ( COMMA selectItem )* -> selectItem+ ; - + selectTrfmClause @init { msgs.push("transform clause"); } @after { msgs.pop(); } @@ -1627,7 +1651,10 @@ KW_SSL: 'SSL'; KW_UNDO: 'UNDO'; KW_LOCK: 'LOCK'; +KW_LOCKS: 'LOCKS'; KW_UNLOCK: 'UNLOCK'; +KW_SHARED: 'SHARED'; +KW_EXCLUSIVE: 'EXCLUSIVE'; KW_PROCEDURE: 'PROCEDURE'; KW_UNSIGNED: 'UNSIGNED'; KW_WHILE: 'WHILE'; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy) @@ -150,6 +150,9 @@ + "STRING instead."), CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"), LOAD_INTO_NON_NATIVE("A non-native table cannot be used as target for LOAD"), + LOCKMGR_NOT_SPECIFIED("lock manager not specified correctly, set hive.lock.manager"), + LOCK_CANNOT_BE_ACQUIRED("locks on the underlying objects cannot be acquired. retry after some time"), + ZOOKEEPER_QUERUM_NOT_SPECIFIED("ZooKeeper quorum not specified"), OVERWRITE_ARCHIVED_PART("Cannot overwrite an archived partition. " + "Unarchive before running this command."), ARCHIVE_METHODS_DISABLED("Archiving methods are currently disabled. " + @@ -160,7 +163,7 @@ RESERVED_PART_VAL("Partition value contains a reserved substring"), HOLD_DDLTIME_ON_NONEXIST_PARTITIONS("HOLD_DDLTIME hint cannot be applied to dynamic " + "partitions or non-existent partitions"), - ; + ; private String mesg; private String sqlState; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (working copy) @@ -55,12 +55,15 @@ commandType.put(HiveParser.TOK_SHOW_TABLESTATUS, "SHOW_TABLESTATUS"); commandType.put(HiveParser.TOK_SHOWFUNCTIONS, "SHOWFUNCTIONS"); commandType.put(HiveParser.TOK_SHOWPARTITIONS, "SHOWPARTITIONS"); + commandType.put(HiveParser.TOK_SHOWLOCKS, "SHOWLOCKS"); commandType.put(HiveParser.TOK_CREATEFUNCTION, "CREATEFUNCTION"); commandType.put(HiveParser.TOK_DROPFUNCTION, "DROPFUNCTION"); commandType.put(HiveParser.TOK_CREATEVIEW, "CREATEVIEW"); commandType.put(HiveParser.TOK_DROPVIEW, "DROPVIEW"); commandType.put(HiveParser.TOK_ALTERVIEW_PROPERTIES, "ALTERVIEW_PROPERTIES"); commandType.put(HiveParser.TOK_QUERY, "QUERY"); + commandType.put(HiveParser.TOK_LOCKTABLE, "LOCKTABLE"); + commandType.put(HiveParser.TOK_UNLOCKTABLE, "UNLOCKTABLE"); } public static BaseSemanticAnalyzer get(HiveConf conf, ASTNode tree) @@ -97,11 +100,14 @@ case HiveParser.TOK_SHOW_TABLESTATUS: case HiveParser.TOK_SHOWFUNCTIONS: case HiveParser.TOK_SHOWPARTITIONS: + case HiveParser.TOK_SHOWLOCKS: case HiveParser.TOK_ALTERTABLE_FILEFORMAT: case HiveParser.TOK_ALTERTABLE_CLUSTER_SORT: case HiveParser.TOK_ALTERTABLE_TOUCH: case HiveParser.TOK_ALTERTABLE_ARCHIVE: case HiveParser.TOK_ALTERTABLE_UNARCHIVE: + case HiveParser.TOK_LOCKTABLE: + case HiveParser.TOK_UNLOCKTABLE: return new DDLSemanticAnalyzer(conf); case HiveParser.TOK_CREATEFUNCTION: case HiveParser.TOK_DROPFUNCTION: Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -55,6 +55,9 @@ import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.serde.Constants; @@ -97,6 +100,8 @@ super(conf); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME)); + reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); + // Partition value can't end in this suffix reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL)); reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED)); @@ -119,6 +124,9 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWFUNCTIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowFunctions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_SHOWLOCKS) { + ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + analyzeShowLocks(ast); } else if (ast.getToken().getType() == HiveParser.TOK_DESCFUNCTION) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeDescFunction(ast); @@ -160,6 +168,10 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWPARTITIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowPartitions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_LOCKTABLE) { + analyzeLockTable(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_UNLOCKTABLE) { + analyzeUnlockTable(ast); } else { throw new SemanticException("Unsupported command."); } @@ -456,6 +468,75 @@ /** * Add the task according to the parsed command tree. This is used for the CLI + * command "SHOW LOCKS;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeShowLocks(ASTNode ast) throws SemanticException { + ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile()); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + showLocksDesc), conf)); + setFetchTask(createFetchTask(showLocksDesc.getSchema())); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "LOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeLockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText()); + String mode = unescapeIdentifier(ast.getChild(1).getText()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if(partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + LockTableDesc lockTblDesc = new LockTableDesc(tableName, mode, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + lockTblDesc), conf)); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "UNLOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeUnlockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if(partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + UnlockTableDesc unlockTblDesc = new UnlockTableDesc(tableName, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + unlockTblDesc), conf)); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI * command "DESCRIBE FUNCTION;". * * @param ast Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 959479) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -53,6 +55,14 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ErrorMsg; @@ -71,6 +81,10 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; public class Driver implements CommandProcessor { @@ -372,6 +386,188 @@ return plan; } + public static class LockObject { + HiveLockObject obj; + HiveLockMode mode; + + public LockObject(HiveLockObject obj, HiveLockMode mode) { + this.obj = obj; + this.mode = mode; + } + + public HiveLockObject getObj() { + return obj; + } + + public HiveLockMode getMode() { + return mode; + } + + public String getName() { + return obj.getName(); + } + } + + private List getLockObjects(Table t, Partition p, HiveLockMode mode) { + List locks = new LinkedList(); + + if (t != null) { + locks.add(new LockObject(new HiveLockObject(t), mode)); + return locks; + } + + if (p != null) { + locks.add(new LockObject(new HiveLockObject(p), mode)); + String partName = p.getName(); + + String partialName = ""; + String[] partns = p.getName().split("/"); + for (String partn: partns) { + partialName += partialName + partn; + if (partName.equals(partialName)) { + locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + partialName)), mode)); + } + else { + partialName += partialName + "/"; + } + } + + locks.add(new LockObject(new HiveLockObject(p.getTable()), mode)); + } + return locks; + } + + public int acquireReadWriteLocks() { + try { + int tryNum = 1; + int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (!supportConcurrency) { + return 0; + } + + String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER); + if ((lockMgr == null) || (lockMgr.isEmpty())) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); + } + + try { + HiveLockManager hiveLockMgr = (HiveLockManager) ReflectionUtils.newInstance( + conf.getClassByName(lockMgr), conf); + ctx.setHiveLockMgr(hiveLockMgr); + hiveLockMgr.setContext(new HiveLockManagerCtx(conf)); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); + } + + List lockObjects = new ArrayList(); + + // Sort all the inputs, outputs. + // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all its parents also + for (ReadEntity input : plan.getInputs()) { + lockObjects.addAll(getLockObjects(input.getTable(), input.getPartition(), HiveLockMode.SHARED)); + } + + for (WriteEntity output : plan.getOutputs()) { + lockObjects.addAll(getLockObjects(output.getTable(), output.getPartition(), HiveLockMode.EXCLUSIVE)); + } + + Collections.sort(lockObjects, new Comparator() { + + @Override + public int compare(LockObject o1, LockObject o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + if (o1.getMode() == o2.getMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + + }); + + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + List hiveLocks = getLocks(lockObjects); + + if (hiveLocks == null) { + if (tryNum == numRetries) { + throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } + tryNum++; + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + else { + ctx.setHiveLocks(hiveLocks); + } + return (0); + } catch (SemanticException e) { + errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (10); + } + } + + private List getLocks(List lockObjects) throws SemanticException { + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + LockObject prevLockObj = null; + List hiveLocks = new ArrayList(); + + for (LockObject lockObject: lockObjects) { + // No need to acquire a lock twice on the same object + if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { + prevLockObj = lockObject; + continue; + } + + HiveLock lock = null; + try { + lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode()); + } catch (LockException e) { + lock = null; + } + + if (lock == null) { + releaseLocks(hiveLocks); + return null; + } + + hiveLocks.add(lock); + prevLockObj = lockObject; + } + + return hiveLocks; + } + + public void releaseLocks() throws SemanticException { + releaseLocks(ctx.getHiveLocks()); + } + + public void releaseLocks(List hiveLocks) throws SemanticException { + try { + if (hiveLocks != null) { + for (HiveLock hiveLock: hiveLocks) { + ctx.getHiveLockMgr().unlock(hiveLock); + } + } + } catch (LockException e) { + throw new SemanticException(e); + } + } + public CommandProcessorResponse run(String command) { errorMessage = null; SQLState = null; @@ -381,6 +577,11 @@ return new CommandProcessorResponse(ret, errorMessage, SQLState); } + ret = acquireReadWriteLocks(); + if (ret != 0) { + return new CommandProcessorResponse(ret, errorMessage, SQLState); + } + ret = execute(); if (ret != 0) { return new CommandProcessorResponse(ret, errorMessage, SQLState); @@ -733,6 +934,7 @@ public int close() { try { + releaseLocks(); if (ctx != null) { ctx.clear(); }