Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 984642) +++ conf/hive-default.xml (working copy) @@ -601,6 +601,48 @@ + hive.support.concurrency + false + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + + + hive.concurrency.manager + org.apache.hadoop.hive.ql.lockmgr.ZooKeeperLockMgr + The concurrency manager for hive. + + + + hive.lock.numretries + 100 + The number of times you want to try to get all the locks + + + + hive.lock.sleep.between.retries + 60 + The sleep time (in seconds) between various retries + + + + hive.zookeeper.quorum + + The list of zookeeper servers to talk to. This is only needed for read/write locks. + + + + hive.zookeeper.client.port + + The port of zookeeper servers to talk to. This is only needed for read/write locks. + + + + hive.zookeeper.session.timeout + + Zookeeper client's session timeout. The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout. + + + fs.har.impl org.apache.hadoop.hive.shims.HiveHarFileSystem The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20 Index: jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java =================================================================== --- jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java (revision 984642) +++ jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java (working copy) @@ -127,6 +127,8 @@ res = stmt.executeQuery("create view " + viewName + " comment '"+viewComment +"' as select * from "+ tableName); assertFalse(res.next()); + + stmt.executeQuery("set hive.support.concurrency = false"); } protected void tearDown() throws Exception { Index: lib/hbase-0.20.3-test.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/hbase-0.20.3-test.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: lib/hbase-0.20.3.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/hbase-0.20.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: lib/zookeeper-3.2.2.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/zookeeper-3.2.2.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: data/conf/hive-site.xml =================================================================== --- data/conf/hive-site.xml (revision 984642) +++ data/conf/hive-site.xml (working copy) @@ -146,6 +146,12 @@ + hive.support.concurrency + true + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + + fs.pfile.impl org.apache.hadoop.fs.ProxyLocalFileSystem A proxy for local file system used for cross file system testing Index: hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java =================================================================== --- hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java (revision 984642) +++ hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java (working copy) @@ -86,6 +86,10 @@ assertEquals(hsm.findAllSessionsForUser(user2).size(), 1); assertEquals(hsm.findAllSessionItems().size(), 3); + user1_item1.addQuery("set hive.support.concurrency = false"); + user1_item2.addQuery("set hive.support.concurrency = false"); + user2_item1.addQuery("set hive.support.concurrency = false"); + HWISessionItem searchItem = hsm.findSessionItemByName(user1, "session1"); assertEquals(searchItem, user1_item1); @@ -105,10 +109,12 @@ zero.add(0); zero.add(0); zero.add(0); + zero.add(0); ArrayList zero3 = new ArrayList(); zero3.add(0); zero3.add(0); zero3.add(0); + zero3.add(0); ArrayList zero1 = new ArrayList(); zero1.add(0); assertEquals(zero, searchItem.getQueryRet()); @@ -194,6 +200,7 @@ // cleanup HWISessionItem cleanup = hsm.createSession(user1, "cleanup"); + cleanup.addQuery("set hive.support.concurrency = false"); cleanup.addQuery("drop table " + tableName); cleanup.clientStart(); Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 984642) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -93,6 +93,7 @@ DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100), MAXCREATEDFILES("hive.exec.max.created.files", 100000L), DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"), + DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__"), // should hive determine whether to run in local mode automatically ? @@ -263,14 +264,25 @@ HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), + HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), + HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), + HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60), + + HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", ""), + HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""), + HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true), // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false), HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false), + ; + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java =================================================================== --- service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (revision 984642) +++ service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (working copy) @@ -50,6 +50,7 @@ @Override protected void setUp() throws Exception { super.setUp(); + if (standAloneServer) { try { transport = new TSocket(host, port); @@ -74,6 +75,7 @@ public void testExecute() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -106,6 +108,7 @@ public void notestExecute() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -122,6 +125,7 @@ public void testNonHiveCommand() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -173,6 +177,7 @@ */ public void testMetastore() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -198,12 +203,13 @@ || clusterStatus.getState() == JobTrackerState.RUNNING); } - /** + /** * */ public void testFetch() throws Exception { // create and populate a table with 500 rows. try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -239,6 +245,7 @@ public void testDynamicSerde() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 984642) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -59,9 +59,9 @@ public static final String prompt = "hive"; public static final String prompt2 = " "; // when ';' is not yet seen - + public static final String HIVERCFILE = ".hiverc"; - + private final LogHelper console; private final Configuration conf; @@ -140,6 +140,7 @@ ret = qp.run(cmd).getResponseCode(); if (ret != 0) { qp.close(); + // qp.destroy(); return ret; } @@ -171,6 +172,7 @@ console.printInfo("Time taken: " + timeTaken + " seconds", null); } + // qp.destroy(); } else { ret = proc.run(cmd_1).getResponseCode(); } @@ -197,13 +199,16 @@ } ret = processCmd(command); + command = ""; lastRet = ret; boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); if (ret != 0 && !ignoreErrors) { + CommandProcessorFactory.clean(); return ret; } } + CommandProcessorFactory.clean(); return lastRet; } @@ -261,7 +266,7 @@ } ss.setIsSilent(saveSilent); } - + public static void main(String[] args) throws Exception { OptionsProcessor oproc = new OptionsProcessor(); @@ -311,7 +316,7 @@ // Execute -i init files (always in silent mode) cli.processInitFiles(ss); - + if (ss.execString != null) { System.exit(cli.processLine(ss.execString)); } Index: ql/src/test/results/clientnegative/lockneg3.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) @@ -0,0 +1,4 @@ +PREHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table srcpart is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg2.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) @@ -0,0 +1,4 @@ +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table src is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg1.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) @@ -0,0 +1,12 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE src EXCLUSIVE +PREHOOK: type: LOCKTABLE +conflicting lock present +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientpositive/lock1.q.out =================================================================== --- ql/src/test/results/clientpositive/lock1.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock1.q.out (revision 0) @@ -0,0 +1,44 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +src SHARED +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS Index: ql/src/test/results/clientpositive/lock2.q.out =================================================================== --- ql/src/test/results/clientpositive/lock2.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock2.q.out (revision 0) @@ -0,0 +1,46 @@ +PREHOOK: query: LOCK TABLE src SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE src SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE srcpart SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE srcpart SHARED +POSTHOOK: type: LOCKTABLE +PREHOOK: query: LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +POSTHOOK: type: LOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +src SHARED +srcpart SHARED +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE src +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE src +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +srcpart SHARED +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE srcpart +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE srcpart +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +srcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') +POSTHOOK: type: UNLOCKTABLE +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 984642) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.tools.LineageInfo; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.QTestUtil.QTestSetup; /** * TestHiveHistory. @@ -53,6 +54,7 @@ private static Path tmppath = new Path(tmpdir); private static Hive db; private static FileSystem fs; + private QTestSetup setup; /* * intialize the tables @@ -75,6 +77,9 @@ } } + setup = new QTestSetup(); + setup.preTest(conf); + // copy the test files into hadoop if required. int i = 0; Path[] hadoopDataFile = new Path[2]; @@ -109,6 +114,19 @@ } } + @Override + protected void tearDown() { + try { + setup.tearDown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + /** * Check history file output for this query. */ @@ -133,7 +151,7 @@ SessionState.start(ss); String cmd = "select a.key from src a"; - Driver d = new Driver(); + Driver d = new Driver(conf); int ret = d.run(cmd).getResponseCode(); if (ret != 0) { fail("Failed"); Index: ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (revision 984642) +++ ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (working copy) @@ -46,7 +46,7 @@ qfiles[i] = new File(inpDir, testNames[i]); } - boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true); + boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true, this); if (!success) { fail("One or more queries failed"); } Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 984642) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -38,6 +38,7 @@ import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import junit.framework.Test; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -69,6 +70,8 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; +import org.apache.zookeeper.ZooKeeper; /** * QTestUtil. @@ -103,6 +106,7 @@ private HadoopShims.MiniDFSShim dfs = null; private boolean miniMr = false; private String hadoopVer = null; + private QTestSetup setup = null; public boolean deleteDirectory(File path) { if (path.exists()) { @@ -198,10 +202,10 @@ .concat("/build/ql/test/data/warehouse/")); conf.set("mapred.job.tracker", "localhost:" + mr.getJobTrackerPort()); } - } - public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) throws Exception { + public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) + throws Exception { this.outDir = outDir; this.logDir = logDir; conf = new HiveConf(Driver.class); @@ -227,6 +231,8 @@ overWrite = true; } + setup = new QTestSetup(); + setup.preTest(conf); init(); } @@ -242,6 +248,8 @@ mr.shutdown(); mr = null; } + + drv.destroy(); } public void addFile(String qFile) throws Exception { @@ -312,6 +320,7 @@ // modify conf by using 'set' commands conf = new HiveConf (Driver.class); initConf(); + setup.preTest(conf); } @@ -329,6 +338,7 @@ } FunctionRegistry.unregisterTemporaryUDF("test_udaf"); FunctionRegistry.unregisterTemporaryUDF("test_error"); + setup.tearDown(); } private void runLoadCmd(String loadCmd) throws Exception { @@ -909,6 +919,47 @@ } /** + * QTestSetup defines test fixtures which are reused across testcases, + * and are needed before any test can be run + */ + public static class QTestSetup + { + private MiniZooKeeperCluster zooKeeperCluster = null; + private int zkPort; + private ZooKeeper zooKeeper; + + public QTestSetup() { + } + + public void preTest(HiveConf conf) throws Exception { + + if (zooKeeperCluster == null) { + String tmpdir = System.getProperty("user.dir")+"/../build/ql/tmp"; + zooKeeperCluster = new MiniZooKeeperCluster(); + zkPort = zooKeeperCluster.startup(new File(tmpdir, "zookeeper")); + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, null); + + String zkServer = "localhost"; + conf.set("hive.zookeeper.quorum", zkServer); + conf.set("hive.zookeeper.client.port", "" + zkPort); + } + + public void tearDown() throws Exception { + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + } + } + + /** * QTRunner: Runnable class for running a a single query file. * **/ @@ -955,17 +1006,18 @@ * (in terms of destination tables) */ public static boolean queryListRunner(File[] qfiles, String[] resDirs, - String[] logDirs, boolean mt) { + String[] logDirs, boolean mt, Test test) { assert (qfiles.length == resDirs.length); assert (qfiles.length == logDirs.length); boolean failed = false; - try { QTestUtil[] qt = new QTestUtil[qfiles.length]; + QTestSetup[] qsetup = new QTestSetup[qfiles.length]; for (int i = 0; i < qfiles.length; i++) { - qt[i] = new QTestUtil(resDirs[i], logDirs[i]); + qt[i] = new QTestUtil(resDirs[i], logDirs[i], false, "0.20"); qt[i].addFile(qfiles[i]); + qt[i].clearTestSideEffects(); } if (mt) { @@ -973,6 +1025,7 @@ qt[0].cleanUp(); qt[0].createSources(); + qt[0].clearTestSideEffects(); QTRunner[] qtRunners = new QTestUtil.QTRunner[qfiles.length]; Thread[] qtThread = new Thread[qfiles.length]; Index: ql/src/test/queries/clientnegative/lockneg2.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg2.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg2.q (revision 0) @@ -0,0 +1 @@ +UNLOCK TABLE src; Index: ql/src/test/queries/clientnegative/lockneg3.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg3.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg3.q (revision 0) @@ -0,0 +1 @@ +UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11'); Index: ql/src/test/queries/clientnegative/lockneg1.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg1.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg1.q (revision 0) @@ -0,0 +1,3 @@ +LOCK TABLE src SHARED; +LOCK TABLE src SHARED; +LOCK TABLE src EXCLUSIVE; Index: ql/src/test/queries/clientpositive/lock1.q =================================================================== --- ql/src/test/queries/clientpositive/lock1.q (revision 0) +++ ql/src/test/queries/clientpositive/lock1.q (revision 0) @@ -0,0 +1,10 @@ +LOCK TABLE src SHARED; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; +LOCK TABLE src SHARED; +SHOW LOCKS; +LOCK TABLE src SHARED; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; Index: ql/src/test/queries/clientpositive/lock2.q =================================================================== --- ql/src/test/queries/clientpositive/lock2.q (revision 0) +++ ql/src/test/queries/clientpositive/lock2.q (revision 0) @@ -0,0 +1,10 @@ +LOCK TABLE src SHARED; +LOCK TABLE srcpart SHARED; +LOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE; +SHOW LOCKS; +UNLOCK TABLE src; +SHOW LOCKS; +UNLOCK TABLE srcpart; +SHOW LOCKS; +UNLOCK TABLE srcpart PARTITION(ds='2008-04-08', hr='11'); +SHOW LOCKS; Index: ql/src/test/templates/TestNegativeCliDriver.vm =================================================================== --- ql/src/test/templates/TestNegativeCliDriver.vm (revision 984642) +++ ql/src/test/templates/TestNegativeCliDriver.vm (working copy) @@ -16,13 +16,13 @@ public class $className extends TestCase { private static QTestUtil qt; + static { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20"); // do a one time initialization qt.cleanUp(); qt.createSources(); - } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); e.printStackTrace(); @@ -47,6 +47,20 @@ } } + @Override + protected void tearDown() { + try { + if (getName().equals("testNegativeCliDriver_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); #foreach ($qf in $qfiles) @@ -55,9 +69,17 @@ #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testNegativeCliDriver_$tname")); #end + suite.addTest(new $className("testNegativeCliDriver_shutdown")); return suite; } + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testNegativeCliDriver_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + static String debugHint = "\nSee build/ql/tmp/hive.log, " + "or try \"ant test ... -Dtest.silent=false\" to get more logs."; @@ -75,7 +97,7 @@ System.out.println("Test $fname skipped"); return; } - + qt.cliInit("$fname", false); int ecode = qt.executeClient("$fname"); if (ecode == 0) { Index: ql/src/test/templates/TestParse.vm =================================================================== --- ql/src/test/templates/TestParse.vm (revision 984642) +++ ql/src/test/templates/TestParse.vm (working copy) @@ -22,7 +22,7 @@ @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20"); } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); @@ -32,17 +32,40 @@ } } + @Override + protected void tearDown() { + try { + if (getName().equals("testParse_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testParse_$tname")); #end + suite.addTest(new $className("testParse_shutdown")); return suite; } + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testParse_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + static String debugHint = "\nSee build/ql/tmp/hive.log, " + "or try \"ant test ... -Dtest.silent=false\" to get more logs."; Index: ql/src/test/templates/TestParseNegative.vm =================================================================== --- ql/src/test/templates/TestParseNegative.vm (revision 984642) +++ ql/src/test/templates/TestParseNegative.vm (working copy) @@ -22,7 +22,7 @@ @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20"); } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); @@ -32,14 +32,37 @@ } } + @Override + protected void tearDown() { + try { + if (getName().equals("testParseNegative_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testParseNegative_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + public static Test suite() { TestSuite suite = new TestSuite(); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testParseNegative_$tname")); #end + suite.addTest(new $className("testParseNegative_shutdown")); return suite; } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TMemoryBuffer; + +/** + * A Hive Table Partition: is a fundamental storage unit within a Table. Currently, Hive does not support + * hierarchical partitions - For eg: if partition ds=1, hr=1 exists, there is no way to access ds=1 + * + * Hierarchical partitions are needed in some cases, for eg. locking. For now, create a dummy partition to + * satisfy this + */ +public class DummyPartition extends Partition { + + @SuppressWarnings("nls") + static final private Log LOG = LogFactory + .getLog("hive.ql.metadata.DummyPartition"); + + private String name; + public DummyPartition() { + } + + public DummyPartition(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -35,6 +35,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collections; +import java.util.Comparator; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -59,6 +61,7 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -84,11 +87,18 @@ import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.MsckDesc; import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -225,6 +235,21 @@ return showFunctions(showFuncs); } + ShowLocksDesc showLocks = work.getShowLocksDesc(); + if (showLocks != null) { + return showLocks(showLocks); + } + + LockTableDesc lockTbl = work.getLockTblDesc(); + if (lockTbl != null) { + return lockTable(lockTbl); + } + + UnlockTableDesc unlockTbl = work.getUnlockTblDesc(); + if (unlockTbl != null) { + return unlockTable(unlockTbl); + } + ShowPartitionsDesc showParts = work.getShowPartsDesc(); if (showParts != null) { return showPartitions(db, showParts); @@ -1130,6 +1155,163 @@ } /** + * Write a list of the current locks to a file. + * + * @param showLocks + * the locks we're interested in. + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int showLocks(ShowLocksDesc showLocks) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("show Locks LockManager not specified"); + } + + // write the results in the file + try { + Path resFile = new Path(showLocks.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = fs.create(resFile); + List locks = lockMgr.getLocks(); + + Collections.sort(locks, new Comparator() { + + @Override + public int compare(HiveLock o1, HiveLock o2) { + int cmp = o1.getHiveLockObject().getName().compareTo(o2.getHiveLockObject().getName()); + if (cmp == 0) { + if (o1.getHiveLockMode() == o2.getHiveLockMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getHiveLockMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + + }); + + Iterator locksIter = locks.iterator(); + + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + outStream.writeBytes(lock.getHiveLockObject().getName(false)); + outStream.write(separator); + outStream.writeBytes(lock.getHiveLockMode().toString()); + outStream.write(terminator); + } + ((FSDataOutputStream) outStream).close(); + } catch (FileNotFoundException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (IOException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (Exception e) { + throw new HiveException(e.toString()); + } + return 0; + } + + /** + * Lock the table/partition specified + * + * @param lockTbl + * the table/partition to be locked along with the mode + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int lockTable(LockTableDesc lockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("lock Table LockManager not specified"); + } + + HiveLockMode mode = HiveLockMode.valueOf(lockTbl.getMode()); + String tabName = lockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = lockTbl.getPartSpec(); + if (partSpec == null) { + HiveLock lck = lockMgr.lock(new HiveLockObject(tbl), mode); + if (lck == null) { + return 1; + } + return 0; + } + + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + HiveLock lck = lockMgr.lock(new HiveLockObject(par), mode); + if (lck == null) { + return 1; + } + return 0; + } + + /** + * Unlock the table/partition specified + * + * @param unlockTbl + * the table/partition to be unlocked + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int unlockTable(UnlockTableDesc unlockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("unlock Table LockManager not specified"); + } + + String tabName = unlockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = unlockTbl.getPartSpec(); + HiveLockObject obj = null; + + if (partSpec == null) { + obj = new HiveLockObject(tbl); + } + else { + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + obj = new HiveLockObject(par); + } + + List locks = lockMgr.getLocks(obj); + if ((locks == null) || (locks.isEmpty())) { + throw new HiveException("Table " + tabName + " is not locked "); + } + Iterator locksIter = locks.iterator(); + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + lockMgr.unlock(lock); + } + + return 0; + } + + /** * Shows a description of a function. * * @param descFunc @@ -1584,7 +1766,7 @@ // alter the table Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, alterTbl .getOldName()); - + Partition part = null; if(alterTbl.getPartSpec() != null) { part = db.getPartition(tbl, alterTbl.getPartSpec(), false); @@ -1772,7 +1954,7 @@ if (part != null) { part.setProtectMode(mode); } else { - tbl.setProtectMode(mode); + tbl.setProtectMode(mode); } } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDCLUSTERSORTCOLUMN) { @@ -1852,12 +2034,12 @@ part.getParameters().put("last_modified_time", Long.toString(System .currentTimeMillis() / 1000)); } - + try { if (part == null) { db.alterTable(alterTbl.getOldName(), tbl); } else { - db.alterPartition(tbl.getTableName(), part); + db.alterPartition(tbl.getTableName(), part); } } catch (InvalidOperationException e) { console.printError("Invalid alter operation: " + e.getMessage()); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * UnlockTableDesc. + * + */ +@Explain(displayName = "Unlock Table") +public class UnlockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private Map partSpec; + + public UnlockTableDesc() { + } + + public UnlockTableDesc(String tableName, Map partSpec) { + this.tableName = tableName; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy) @@ -38,7 +38,10 @@ private DropTableDesc dropTblDesc; private AlterTableDesc alterTblDesc; private ShowTablesDesc showTblsDesc; + private LockTableDesc lockTblDesc; + private UnlockTableDesc unlockTblDesc; private ShowFunctionsDesc showFuncsDesc; + private ShowLocksDesc showLocksDesc; private DescFunctionDesc descFunctionDesc; private ShowPartitionsDesc showPartsDesc; private DescTableDesc descTblDesc; @@ -143,6 +146,26 @@ } /** + * @param lockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + LockTableDesc lockTblDesc) { + this(inputs, outputs); + + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + UnlockTableDesc unlockTblDesc) { + this(inputs, outputs); + + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param showFuncsDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -153,6 +176,16 @@ } /** + * @param showLocksDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + ShowLocksDesc showLocksDesc) { + this(inputs, outputs); + + this.showLocksDesc = showLocksDesc; + } + + /** * @param descFuncDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -331,6 +364,30 @@ } /** + * @return the showLocksDesc + */ + @Explain(displayName = "Show Lock Operator") + public ShowLocksDesc getShowLocksDesc() { + return showLocksDesc; + } + + /** + * @return the lockTblDesc + */ + @Explain(displayName = "Lock Table Operator") + public LockTableDesc getLockTblDesc() { + return lockTblDesc; + } + + /** + * @return the unlockTblDesc + */ + @Explain(displayName = "Unlock Table Operator") + public UnlockTableDesc getUnlockTblDesc() { + return unlockTblDesc; + } + + /** * @return the descFuncDesc */ @Explain(displayName = "Show Function Operator") @@ -347,6 +404,30 @@ } /** + * @param showLocksDesc + * the showLocksDesc to set + */ + public void setShowLocksDesc(ShowLocksDesc showLocksDesc) { + this.showLocksDesc = showLocksDesc; + } + + /** + * @param lockTblDesc + * the lockTblDesc to set + */ + public void setLockTblDesc(LockTableDesc lockTblDesc) { + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + * the unlockTblDesc to set + */ + public void setUnlockTblDesc(UnlockTableDesc unlockTblDesc) { + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param descFuncDesc * the showFuncsDesc to set */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +import org.apache.hadoop.fs.Path; + +/** + * ShowLocksDesc. + * + */ +@Explain(displayName = "Show Locks") +public class ShowLocksDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + String resFile; + + /** + * table name for the result of show locks. + */ + private static final String table = "showlocks"; + /** + * thrift ddl for the result of show locks. + */ + private static final String schema = "tab_name,mode#string:string"; + + public String getTable() { + return table; + } + + public String getSchema() { + return schema; + } + + public ShowLocksDesc() { + } + + /** + * @param resFile + */ + public ShowLocksDesc(Path resFile) { + this.resFile = resFile.toString(); + } + + /** + * @return the resFile + */ + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { + return resFile; + } + + /** + * @param resFile + * the resFile to set + */ + public void setResFile(String resFile) { + this.resFile = resFile; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * LockTableDesc. + * + */ +@Explain(displayName = "Lock Table") +public class LockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private String mode; + private Map partSpec; + + public LockTableDesc() { + } + + public LockTableDesc(String tableName, String mode, Map partSpec) { + this.tableName = tableName; + this.mode = mode; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import java.util.List; + +public interface HiveLockManager { + + public void setContext(HiveLockManagerCtx ctx) throws LockException; + public HiveLock lock(HiveLockObject key, HiveLockMode mode) throws LockException; + public void unlock(HiveLock hiveLock) throws LockException; + + public List getLocks() throws LockException; + public List getLocks(HiveLockObject key) throws LockException; + public void close() throws LockException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import org.apache.commons.lang.StringEscapeUtils; + +import org.apache.hadoop.hive.ql.parse.ErrorMsg; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +public class ZooKeeperHiveLockManager implements HiveLockManager { + HiveLockManagerCtx ctx; + public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager"); + static final private LogHelper console = new LogHelper(LOG); + + private ZooKeeper zooKeeper; + + public ZooKeeperHiveLockManager() { + } + + private String getQuorumServers(HiveConf conf) { + StringBuilder hostPortBuilder = new StringBuilder(); + + String hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM); + String[] hostarr = hosts.split(","); + String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); + int pos = 0; + for (String host : hostarr) { + if (pos > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(port); + } + + return hostPortBuilder.toString(); + } + + public void setContext(HiveLockManagerCtx ctx) throws LockException { + this.ctx = ctx; + HiveConf conf = ctx.getConf(); + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String quorumServers = getQuorumServers(conf); + + try { + zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, null); + } catch (IOException e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); + } + } + + private String getObjectName(HiveLockObject key, HiveLockMode mode) { + return "/" + key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) + "-" + mode + "-"; + } + + public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode) throws LockException { + String name = getObjectName(key, mode); + String res; + + try { + res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + int seqNo = getSequenceNumber(res, name); + if (seqNo == -1) { + zooKeeper.delete(res, -1); + return null; + } + + List children = zooKeeper.getChildren("/", false); + + String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE); + String shLock = getObjectName(key, HiveLockMode.SHARED); + + for (String child : children) { + child = "/" + child; + + // Is there a conflicting lock on the same object with a lower sequence number + int childSeq = seqNo; + if (child.startsWith(exLock)) { + childSeq = getSequenceNumber(child, exLock); + } + if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { + childSeq = getSequenceNumber(child, shLock); + } + + if ((childSeq >= 0) && (childSeq < seqNo)) { + zooKeeper.delete(res, -1); + console.printError("conflicting lock present "); + return null; + } + } + + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper lock: " + e); + throw new LockException(e); + } + + return new ZooKeeperHiveLock(res, key, mode); + } + + public void unlock(HiveLock hiveLock) throws LockException { + ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; + try { + zooKeeper.delete(zLock.getPath(), -1); + } catch (Exception e) { + LOG.error("Failed to release ZooKeeper lock: " + e); + throw new LockException(e); + } + } + + public List getLocks() throws LockException { + return getLocks(null); + } + + public List getLocks(HiveLockObject key) throws LockException { + List locks = new ArrayList(); + List children; + + try { + children = zooKeeper.getChildren("/", false); + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper children: " + e); + throw new LockException(e); + } + + for (String child : children) { + child = "/" + child; + + HiveLockMode mode = getLockMode(child); + if (mode == null) { + continue; + } + + HiveLockObject obj = getLockObject(child, mode); + if ((key == null) || (obj.getName().equals(key.getName()))) { + HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode)); + locks.add(lck); + } + } + + return locks; + } + + public void close() throws LockException { + try { + if (zooKeeper != null) { + zooKeeper.close(); + } + } catch (Exception e) { + LOG.error("Failed to close zooKeeper client: " + e); + throw new LockException(e); + } + } + + private int getSequenceNumber(String resPath, String path) { + String tst = resPath.substring(path.length()); + try { + return (new Integer(tst)).intValue(); + } catch (Exception e) { + return -1; // invalid number + } + } + + private HiveLockObject getLockObject(String path, HiveLockMode mode) throws LockException { + try { + Hive db = Hive.get(ctx.getConf()); + int indx = path.lastIndexOf(mode.toString()); + String objName = path.substring(0, indx-1); + + String[] names = objName.split("@"); + Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, + names[1], false); // do not throw exception if table does not exist + assert (tab != null); + + if (names.length == 2) { + return new HiveLockObject(tab); + } + + String[] parts = names[2].split(ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); + + Map partSpec = new HashMap(); + for (indx = 0; indx < parts.length; indx++) { + String[] partVals = parts[indx].split("="); + partSpec.put(partVals[0], partVals[1]); + } + + Partition partn = db.getPartition(tab, partSpec, false); + if (partn == null) { + return new HiveLockObject(new DummyPartition(objName)); + } + + return new HiveLockObject(partn); + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(e); + } + } + + private static Pattern shMode = Pattern.compile("^.*-(SHARED)-([0-9]+)$"); + private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$"); + + private HiveLockMode getLockMode(String path) { + + Matcher shMatcher = shMode.matcher(path); + Matcher exMatcher = exMode.matcher(path); + + if (shMatcher.matches()) + return HiveLockMode.SHARED; + + if (exMatcher.matches()) { + return HiveLockMode.EXCLUSIVE; + } + + return null; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; + +public class ZooKeeperHiveLock extends HiveLock { + private String path; + private HiveLockObject obj; + private HiveLockMode mode; + + public ZooKeeperHiveLock(String path, HiveLockObject obj, HiveLockMode mode) { + this.path = path; + this.obj = obj; + this.mode = mode; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public HiveLockObject getHiveLockObject() { + return obj; + } + + public void setHiveLockObject(HiveLockObject obj) { + this.obj = obj; + } + + public HiveLockMode getHiveLockMode() { + return mode; + } + + public void setHiveLockMode(HiveLockMode mode) { + this.mode = mode; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive Lock Manager interfaces and some custom implmentations + */ +package org.apache.hadoop.hive.ql.lockmgr; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +public abstract class HiveLock { + public abstract HiveLockObject getHiveLockObject(); + public abstract HiveLockMode getHiveLockMode(); +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Exception from lock manager. + */ + +public class LockException extends HiveException { + + private static final long serialVersionUID = 1L; + + public LockException() { + super(); + } + + public LockException(String message) { + super(message); + } + + public LockException(Throwable cause) { + super(cause); + } + + public LockException(String message, Throwable cause) { + super(message, cause); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public enum HiveLockMode { + SHARED, EXCLUSIVE; +} + Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; + +public class HiveLockObject { + /** + * The table. + */ + private Table t; + + /** + * The partition. This is null for a non partitioned table. + */ + private Partition p; + + public HiveLockObject() { + this.t = null; + this.p = null; + } + + public HiveLockObject(Table t) { + this.t = t; + this.p = null; + } + + public HiveLockObject(Partition p) { + this.t = null; + this.p = p; + } + + public Table getTable() { + return t; + } + + public void setTable (Table t) { + this.t = t; + } + + public Partition getPartition() { + return p; + } + + public void setPartition (Partition p) { + this.p = p; + } + + public String getName() { + return getName(true); + } + + public String getName(boolean includeDB) { + + if (t != null) { + if (includeDB) { + return t.getDbName() + "@" + t.getTableName(); + } + else { + return t.getTableName(); + } + } + + if (p != null) { + if (includeDB) { + return p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + p.getName(); + } + else { + return p.getTable().getTableName() + "@" + p.getName(); + } + } + + return ""; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public class HiveLockManagerCtx { + HiveConf conf; + + public HiveLockManagerCtx() { + } + + public HiveLockManagerCtx(HiveConf conf) { + this.conf = conf; + } + + public HiveConf getConf() { + return conf; + } + + public void setConf(HiveConf conf) { + this.conf = conf; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/AddResourceProcessor.java (working copy) @@ -37,6 +37,9 @@ public void init() { } + public void destroy() { + } + public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); String[] tokens = command.split("\\s+"); Index: ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (working copy) @@ -19,9 +19,12 @@ package org.apache.hadoop.hive.ql.processors; import static org.apache.commons.lang.StringUtils.isBlank; +import java.util.Map; +import java.util.HashMap; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.conf.HiveConf; /** * CommandProcessorFactory. @@ -33,6 +36,8 @@ // prevent instantiation } + static Map mapDrivers = new HashMap(); + public static CommandProcessor get(String cmd) { String cmdl = cmd.toLowerCase(); @@ -46,9 +51,25 @@ } else if ("delete".equals(cmdl)) { return new DeleteResourceProcessor(); } else if (!isBlank(cmd)) { - return new Driver(); + HiveConf conf = SessionState.get().getConf(); + Driver drv = mapDrivers.get(conf); + if (drv == null) { + drv = new Driver(); + mapDrivers.put(conf, drv); + } + drv.init(); + return drv; } return null; } + public static void clean() { + HiveConf conf = SessionState.get().getConf(); + Driver drv = mapDrivers.get(conf); + if (drv != null) { + drv.destroy(); + } + + mapDrivers.remove(conf); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/DeleteResourceProcessor.java (working copy) @@ -36,6 +36,9 @@ public void init() { } + public void destroy() { + } + public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); String[] tokens = command.split("\\s+"); Index: ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java (working copy) @@ -72,6 +72,9 @@ public void init() { } + public void destroy() { + } + public CommandProcessorResponse run(String command) { SessionState ss = SessionState.get(); Index: ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/DfsProcessor.java (working copy) @@ -45,6 +45,9 @@ public void init() { } + public void destroy() { + } + public CommandProcessorResponse run(String command) { String[] tokens = command.split("\\s+"); Index: ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessor.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessor.java (working copy) @@ -20,6 +20,6 @@ public interface CommandProcessor { public void init(); - public CommandProcessorResponse run(String command); + public void destroy(); } Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.List; import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; @@ -41,6 +42,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.conf.Configuration; /** @@ -75,6 +78,10 @@ String executionId; + // List of Locks for this query + protected List hiveLocks; + protected HiveLockManager hiveLockMgr; + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -86,7 +93,7 @@ public Context(Configuration conf, String executionId) { this.conf = conf; this.executionId = executionId; - + // non-local tmp location is configurable. however it is the same across // all external file systems nonLocalScratchPath = @@ -106,7 +113,7 @@ public void setExplain(boolean value) { explain = value; } - + /** * Find whether the current query is an explain query * @return true if the query is an explain query, false if not @@ -119,7 +126,7 @@ /** * Get a tmp directory on specified URI * - * @param scheme Scheme of the target FS + * @param scheme Scheme of the target FS * @param authority Authority of the target FS * @param mkdir create the directory if true * @param scratchdir path of tmp directory @@ -166,7 +173,7 @@ /** * Create a map-reduce scratch directory on demand and return it. - * + * */ public String getMRScratchDir() { @@ -231,7 +238,7 @@ /** * Get a path to store map-reduce intermediate data in. - * + * * @return next available path for map-red intermediate data */ public String getMRTmpFileURI() { @@ -241,8 +248,8 @@ /** - * Given a URI for mapreduce intermediate output, swizzle the - * it to point to the local file system. This can be called in + * Given a URI for mapreduce intermediate output, swizzle the + * it to point to the local file system. This can be called in * case the caller decides to run in local mode (in which case * all intermediate data can be stored locally) * @@ -259,7 +266,7 @@ ("Invalid URI: " + originalURI + ", cannot relativize against" + mrbase.toString()); - return getLocalScratchDir(!explain) + Path.SEPARATOR + + return getLocalScratchDir(!explain) + Path.SEPARATOR + relURI.getPath(); } @@ -458,6 +465,22 @@ return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local"); } + public List getHiveLocks() { + return hiveLocks; + } + + public void setHiveLocks(List hiveLocks) { + this.hiveLocks = hiveLocks; + } + + public HiveLockManager getHiveLockMgr() { + return hiveLockMgr; + } + + public void setHiveLockMgr(HiveLockManager hiveLockMgr) { + this.hiveLockMgr = hiveLockMgr; + } + public void setOriginalTracker(String originalTracker) { this.originalTracker = originalTracker; } @@ -474,7 +497,7 @@ pathToCS = new HashMap (); pathToCS.put(path, cs); } - + public ContentSummary getCS(String path) { if(pathToCS == null) pathToCS = new HashMap (); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -117,6 +117,9 @@ TOK_SHOWFUNCTIONS; TOK_SHOWPARTITIONS; TOK_SHOW_TABLESTATUS; +TOK_SHOWLOCKS; +TOK_LOCKTABLE; +TOK_UNLOCKTABLE; TOK_DROPTABLE; TOK_TABCOLLIST; TOK_TABCOL; @@ -237,6 +240,8 @@ | dropIndexStatement | alterIndexRebuild | dropFunctionStatement + | lockStatement + | unlockStatement ; ifNotExists @@ -577,8 +582,27 @@ | KW_SHOW KW_PARTITIONS Identifier partitionSpec? -> ^(TOK_SHOWPARTITIONS Identifier partitionSpec?) | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=Identifier)? KW_LIKE showStmtIdentifier partitionSpec? -> ^(TOK_SHOW_TABLESTATUS showStmtIdentifier $db_name? partitionSpec?) + | KW_SHOW KW_LOCKS -> ^(TOK_SHOWLOCKS) ; +lockStatement +@init { msgs.push("lock statement"); } +@after { msgs.pop(); } + : KW_LOCK KW_TABLE Identifier partitionSpec? lockMode -> ^(TOK_LOCKTABLE Identifier lockMode partitionSpec?) + ; + +lockMode +@init { msgs.push("lock mode"); } +@after { msgs.pop(); } + : KW_SHARED | KW_EXCLUSIVE + ; + +unlockStatement +@init { msgs.push("unlock statement"); } +@after { msgs.pop(); } + : KW_UNLOCK KW_TABLE Identifier partitionSpec? -> ^(TOK_UNLOCKTABLE Identifier partitionSpec?) + ; + metastoreCheck @init { msgs.push("metastore check statement"); } @after { msgs.pop(); } @@ -1012,7 +1036,7 @@ @init { msgs.push("select clause"); } @after { msgs.pop(); } : - KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) + KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) | (transform=KW_TRANSFORM selectTrfmClause)) -> {$transform == null && $dist == null}? ^(TOK_SELECT hintClause? selectList) -> {$transform == null && $dist != null}? ^(TOK_SELECTDI hintClause? selectList) @@ -1027,7 +1051,7 @@ : selectItem ( COMMA selectItem )* -> selectItem+ ; - + selectTrfmClause @init { msgs.push("transform clause"); } @after { msgs.pop(); } @@ -1770,7 +1794,10 @@ KW_SSL: 'SSL'; KW_UNDO: 'UNDO'; KW_LOCK: 'LOCK'; +KW_LOCKS: 'LOCKS'; KW_UNLOCK: 'UNLOCK'; +KW_SHARED: 'SHARED'; +KW_EXCLUSIVE: 'EXCLUSIVE'; KW_PROCEDURE: 'PROCEDURE'; KW_UNSIGNED: 'UNSIGNED'; KW_WHILE: 'WHILE'; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy) @@ -150,6 +150,10 @@ + "STRING instead."), CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"), LOAD_INTO_NON_NATIVE("A non-native table cannot be used as target for LOAD"), + LOCKMGR_NOT_SPECIFIED("lock manager not specified correctly, set hive.lock.manager"), + LOCKMGR_NOT_INITIALIZED("lock manager could not be initialized, check hive.lock.manager "), + LOCK_CANNOT_BE_ACQUIRED("locks on the underlying objects cannot be acquired. retry after some time"), + ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED("Check hive.zookeeper.quorum and hive.zookeeper.client.port"), OVERWRITE_ARCHIVED_PART("Cannot overwrite an archived partition. " + "Unarchive before running this command."), ARCHIVE_METHODS_DISABLED("Archiving methods are currently disabled. " + Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (working copy) @@ -56,6 +56,7 @@ commandType.put(HiveParser.TOK_SHOW_TABLESTATUS, "SHOW_TABLESTATUS"); commandType.put(HiveParser.TOK_SHOWFUNCTIONS, "SHOWFUNCTIONS"); commandType.put(HiveParser.TOK_SHOWPARTITIONS, "SHOWPARTITIONS"); + commandType.put(HiveParser.TOK_SHOWLOCKS, "SHOWLOCKS"); commandType.put(HiveParser.TOK_CREATEFUNCTION, "CREATEFUNCTION"); commandType.put(HiveParser.TOK_DROPFUNCTION, "DROPFUNCTION"); commandType.put(HiveParser.TOK_CREATEVIEW, "CREATEVIEW"); @@ -65,6 +66,8 @@ commandType.put(HiveParser.TOK_ALTERINDEX_REBUILD, "ALTERINDEX_REBUILD"); commandType.put(HiveParser.TOK_ALTERVIEW_PROPERTIES, "ALTERVIEW_PROPERTIES"); commandType.put(HiveParser.TOK_QUERY, "QUERY"); + commandType.put(HiveParser.TOK_LOCKTABLE, "LOCKTABLE"); + commandType.put(HiveParser.TOK_UNLOCKTABLE, "UNLOCKTABLE"); } static { @@ -109,12 +112,15 @@ case HiveParser.TOK_SHOW_TABLESTATUS: case HiveParser.TOK_SHOWFUNCTIONS: case HiveParser.TOK_SHOWPARTITIONS: + case HiveParser.TOK_SHOWLOCKS: case HiveParser.TOK_CREATEINDEX: case HiveParser.TOK_DROPINDEX: case HiveParser.TOK_ALTERTABLE_CLUSTER_SORT: case HiveParser.TOK_ALTERTABLE_TOUCH: case HiveParser.TOK_ALTERTABLE_ARCHIVE: case HiveParser.TOK_ALTERTABLE_UNARCHIVE: + case HiveParser.TOK_LOCKTABLE: + case HiveParser.TOK_UNLOCKTABLE: return new DDLSemanticAnalyzer(conf); case HiveParser.TOK_ALTERTABLE_PARTITION: String commandType = null; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -69,6 +69,9 @@ import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.serde.Constants; @@ -129,6 +132,8 @@ super(conf); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME)); + reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); + // Partition value can't end in this suffix reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL)); reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED)); @@ -168,6 +173,9 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWFUNCTIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowFunctions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_SHOWLOCKS) { + ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + analyzeShowLocks(ast); } else if (ast.getToken().getType() == HiveParser.TOK_DESCFUNCTION) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeDescFunction(ast); @@ -209,6 +217,10 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWPARTITIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowPartitions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_LOCKTABLE) { + analyzeLockTable(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_UNLOCKTABLE) { + analyzeUnlockTable(ast); } else { throw new SemanticException("Unsupported command."); } @@ -718,6 +730,75 @@ /** * Add the task according to the parsed command tree. This is used for the CLI + * command "SHOW LOCKS;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeShowLocks(ASTNode ast) throws SemanticException { + ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile()); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + showLocksDesc), conf)); + setFetchTask(createFetchTask(showLocksDesc.getSchema())); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "LOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeLockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText()); + String mode = unescapeIdentifier(ast.getChild(1).getText()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if(partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + LockTableDesc lockTblDesc = new LockTableDesc(tableName, mode, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + lockTblDesc), conf)); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "UNLOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeUnlockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if(partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + UnlockTableDesc unlockTblDesc = new UnlockTableDesc(tableName, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + unlockTblDesc), conf)); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI * command "DESCRIBE FUNCTION;". * * @param ast Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 984642) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -53,6 +55,14 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ErrorMsg; @@ -71,6 +81,10 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; public class Driver implements CommandProcessor { @@ -85,6 +99,7 @@ private Context ctx; private QueryPlan plan; private Schema schema; + private HiveLockManager hiveLockMgr; private String errorMessage; private String SQLState; @@ -93,6 +108,24 @@ private int maxthreads; private final int sleeptime = 2000; + private void setLockManager() throws SemanticException { + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (supportConcurrency) { + String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER); + if ((lockMgr == null) || (lockMgr.isEmpty())) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); + } + + try { + hiveLockMgr = (HiveLockManager) + ReflectionUtils.newInstance(conf.getClassByName(lockMgr), conf); + hiveLockMgr.setContext(new HiveLockManagerCtx(conf)); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e.getMessage()); + } + } + } + public void init() { Operator.resetId(); } @@ -233,6 +266,7 @@ this.conf = conf; try { UnixUserGroupInformation.login(conf, true); + setLockManager(); } catch (Exception e) { LOG.warn("Ignoring " + e.getMessage()); } @@ -243,6 +277,7 @@ conf = SessionState.get().getConf(); try { UnixUserGroupInformation.login(conf, true); + setLockManager(); } catch (Exception e) { LOG.warn("Ignoring " + e.getMessage()); } @@ -344,6 +379,178 @@ return plan; } + public static class LockObject { + HiveLockObject obj; + HiveLockMode mode; + + public LockObject(HiveLockObject obj, HiveLockMode mode) { + this.obj = obj; + this.mode = mode; + } + + public HiveLockObject getObj() { + return obj; + } + + public HiveLockMode getMode() { + return mode; + } + + public String getName() { + return obj.getName(); + } + } + + private List getLockObjects(Table t, Partition p, HiveLockMode mode) { + List locks = new LinkedList(); + + if (t != null) { + locks.add(new LockObject(new HiveLockObject(t), mode)); + return locks; + } + + if (p != null) { + locks.add(new LockObject(new HiveLockObject(p), mode)); + String partName = p.getName(); + + String partialName = ""; + String[] partns = p.getName().split("/"); + for (String partn: partns) { + partialName += partialName + partn; + if (partName.equals(partialName)) { + locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + partialName)), mode)); + } + else { + partialName += partialName + "/"; + } + } + + locks.add(new LockObject(new HiveLockObject(p.getTable()), mode)); + } + return locks; + } + + public int acquireReadWriteLocks() { + try { + int tryNum = 1; + int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (!supportConcurrency) { + return 0; + } + + ctx.setHiveLockMgr(hiveLockMgr); + List lockObjects = new ArrayList(); + + // Sort all the inputs, outputs. + // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all its parents also + for (ReadEntity input : plan.getInputs()) { + lockObjects.addAll(getLockObjects(input.getTable(), input.getPartition(), HiveLockMode.SHARED)); + } + + for (WriteEntity output : plan.getOutputs()) { + lockObjects.addAll(getLockObjects(output.getTable(), output.getPartition(), HiveLockMode.EXCLUSIVE)); + } + + Collections.sort(lockObjects, new Comparator() { + + @Override + public int compare(LockObject o1, LockObject o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + if (o1.getMode() == o2.getMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + + }); + + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + while (true) { + List hiveLocks = getLocks(lockObjects); + + if (hiveLocks == null) { + if (tryNum == numRetries) { + throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } + tryNum++; + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + else { + ctx.setHiveLocks(hiveLocks); + break; + } + } + return (0); + } catch (SemanticException e) { + errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (10); + } + } + + private List getLocks(List lockObjects) throws SemanticException { + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + LockObject prevLockObj = null; + List hiveLocks = new ArrayList(); + + for (LockObject lockObject: lockObjects) { + // No need to acquire a lock twice on the same object + if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { + prevLockObj = lockObject; + continue; + } + + HiveLock lock = null; + try { + lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode()); + } catch (LockException e) { + lock = null; + } + + if (lock == null) { + releaseLocks(hiveLocks); + return null; + } + + hiveLocks.add(lock); + prevLockObj = lockObject; + } + + return hiveLocks; + } + + public void releaseLocks() throws SemanticException { + releaseLocks(ctx.getHiveLocks()); + } + + public void releaseLocks(List hiveLocks) throws SemanticException { + if (hiveLocks != null) { + for (HiveLock hiveLock: hiveLocks) { + try { + ctx.getHiveLockMgr().unlock(hiveLock); + } catch (LockException e) { + // The lock may have been released. Ignore and continue + } + } + } + } + public CommandProcessorResponse run(String command) { errorMessage = null; SQLState = null; @@ -353,6 +560,11 @@ return new CommandProcessorResponse(ret, errorMessage, SQLState); } + ret = acquireReadWriteLocks(); + if (ret != 0) { + return new CommandProcessorResponse(ret, errorMessage, SQLState); + } + ret = execute(); if (ret != 0) { return new CommandProcessorResponse(ret, errorMessage, SQLState); @@ -709,6 +921,7 @@ public int close() { try { + releaseLocks(); if (ctx != null) { ctx.clear(); } @@ -721,6 +934,16 @@ return 0; } + public void destroy() { + if (ctx != null && ctx.getHiveLockMgr() != null) { + try { + ctx.getHiveLockMgr().close(); + } catch (LockException e) { + } + } + + } + public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { return plan.getQueryPlan();