Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 988314) +++ conf/hive-default.xml (working copy) @@ -607,6 +607,48 @@ + hive.support.concurrency + false + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + + + hive.concurrency.manager + org.apache.hadoop.hive.ql.lockmgr.ZooKeeperLockMgr + The concurrency manager for hive. + + + + hive.lock.numretries + 100 + The number of times you want to try to get all the locks + + + + hive.lock.sleep.between.retries + 60 + The sleep time (in seconds) between various retries + + + + hive.zookeeper.quorum + + The list of zookeeper servers to talk to. This is only needed for read/write locks. + + + + hive.zookeeper.client.port + 2181 + The port of zookeeper servers to talk to. This is only needed for read/write locks. + + + + hive.zookeeper.session.timeout + + Zookeeper client's session timeout. The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout. + + + fs.har.impl org.apache.hadoop.hive.shims.HiveHarFileSystem The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop vers less than 0.20 Index: hbase-handler/src/test/templates/TestHBaseCliDriver.vm =================================================================== --- hbase-handler/src/test/templates/TestHBaseCliDriver.vm (revision 988314) +++ hbase-handler/src/test/templates/TestHBaseCliDriver.vm (working copy) @@ -88,6 +88,7 @@ try { System.out.println("Begin query: " + "$fname"); qt.cliInit("$fname"); + qt.clearTestSideEffects(); int ecode = qt.executeClient("$fname"); if (ecode != 0) { fail("Client Execution failed with error code = " + ecode); @@ -112,6 +113,8 @@ if (ecode != 0) { fail("Client execution results failed with error code = " + ecode); } + qt.clearPostTestEffects(); + } catch (Throwable e) { System.out.println("Exception: " + e.getMessage()); e.printStackTrace(); Index: jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java =================================================================== --- jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java (revision 988314) +++ jdbc/src/test/org/apache/hadoop/hive/jdbc/TestJdbcDriver.java (working copy) @@ -79,6 +79,8 @@ Statement stmt = con.createStatement(); assertNotNull("Statement is null", stmt); + stmt.executeQuery("set hive.support.concurrency = false"); + // drop table. ignore error. try { stmt.executeQuery("drop table " + tableName); Index: data/conf/hive-site.xml =================================================================== --- data/conf/hive-site.xml (revision 988314) +++ data/conf/hive-site.xml (working copy) @@ -146,6 +146,12 @@ + hive.support.concurrency + true + Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks. + + + fs.pfile.impl org.apache.hadoop.fs.ProxyLocalFileSystem A proxy for local file system used for cross file system testing Index: hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java =================================================================== --- hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java (revision 988314) +++ hwi/src/test/org/apache/hadoop/hive/hwi/TestHWISessionManager.java (working copy) @@ -86,6 +86,10 @@ assertEquals(hsm.findAllSessionsForUser(user2).size(), 1); assertEquals(hsm.findAllSessionItems().size(), 3); + user1_item1.addQuery("set hive.support.concurrency = false"); + user1_item2.addQuery("set hive.support.concurrency = false"); + user2_item1.addQuery("set hive.support.concurrency = false"); + HWISessionItem searchItem = hsm.findSessionItemByName(user1, "session1"); assertEquals(searchItem, user1_item1); @@ -105,10 +109,12 @@ zero.add(0); zero.add(0); zero.add(0); + zero.add(0); ArrayList zero3 = new ArrayList(); zero3.add(0); zero3.add(0); zero3.add(0); + zero3.add(0); ArrayList zero1 = new ArrayList(); zero1.add(0); assertEquals(zero, searchItem.getQueryRet()); @@ -194,6 +200,7 @@ // cleanup HWISessionItem cleanup = hsm.createSession(user1, "cleanup"); + cleanup.addQuery("set hive.support.concurrency = false"); cleanup.addQuery("drop table " + tableName); cleanup.clientStart(); Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 988314) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -93,6 +93,7 @@ DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100), MAXCREATEDFILES("hive.exec.max.created.files", 100000L), DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"), + DEFAULT_ZOOKEEPER_PARTITION_NAME("hive.lockmgr.zookeeper.default.partition.name", "__HIVE_DEFAULT_ZOOKEEPER_PARTITION__"), // Whether to show a link to the most failed task + debugging tips SHOW_JOB_FAIL_DEBUG_INFO("hive.exec.show.job.failure.debug.info", true), @@ -265,14 +266,25 @@ HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), + HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false), + HIVE_LOCK_MANAGER("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"), + HIVE_LOCK_NUMRETRIES("hive.lock.numretries", 100), + HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", 60), + + HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", ""), + HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", ""), + HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true), // For har files HIVEARCHIVEENABLED("hive.archive.enabled", false), HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false), + ; + public final String varname; public final String defaultVal; public final int defaultIntVal; Index: service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java =================================================================== --- service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (revision 988314) +++ service/src/test/org/apache/hadoop/hive/service/TestHiveServer.java (working copy) @@ -50,6 +50,7 @@ @Override protected void setUp() throws Exception { super.setUp(); + if (standAloneServer) { try { transport = new TSocket(host, port); @@ -74,6 +75,7 @@ public void testExecute() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -106,6 +108,7 @@ public void notestExecute() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -122,6 +125,7 @@ public void testNonHiveCommand() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -173,6 +177,7 @@ */ public void testMetastore() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -198,12 +203,13 @@ || clusterStatus.getState() == JobTrackerState.RUNNING); } - /** + /** * */ public void testFetch() throws Exception { // create and populate a table with 500 rows. try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } @@ -239,6 +245,7 @@ public void testDynamicSerde() throws Exception { try { + client.execute("set hive.support.concurrency = false"); client.execute("drop table " + tableName); } catch (Exception ex) { } Index: service/src/java/org/apache/hadoop/hive/service/HiveServer.java =================================================================== --- service/src/java/org/apache/hadoop/hive/service/HiveServer.java (revision 988314) +++ service/src/java/org/apache/hadoop/hive/service/HiveServer.java (working copy) @@ -110,6 +110,7 @@ CommandProcessorResponse response = null; if (proc != null) { if (proc instanceof Driver) { + ((Driver)proc).destroy(); isHiveQuery = true; response = driver.run(cmd); } else { Index: cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (revision 988314) +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (working copy) @@ -59,9 +59,9 @@ public static final String prompt = "hive"; public static final String prompt2 = " "; // when ';' is not yet seen - + public static final String HIVERCFILE = ".hiverc"; - + private final LogHelper console; private final Configuration conf; @@ -130,7 +130,7 @@ } } else { - CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); + CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf); if (proc != null) { if (proc instanceof Driver) { Driver qp = (Driver) proc; @@ -201,9 +201,11 @@ lastRet = ret; boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); if (ret != 0 && !ignoreErrors) { + CommandProcessorFactory.clean((HiveConf)conf); return ret; } } + CommandProcessorFactory.clean((HiveConf)conf); return lastRet; } @@ -261,7 +263,7 @@ } ss.setIsSilent(saveSilent); } - + public static void main(String[] args) throws Exception { OptionsProcessor oproc = new OptionsProcessor(); @@ -311,7 +313,7 @@ // Execute -i init files (always in silent mode) cli.processInitFiles(ss); - + if (ss.execString != null) { System.exit(cli.processLine(ss.execString)); } Index: ql/src/test/results/clientnegative/lockneg3.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg3.q.out (revision 0) @@ -0,0 +1,25 @@ +PREHOOK: query: drop table tstsrcpart +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrcpart +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tstsrcpart like srcpart +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrcpart like srcpart +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrcpart +PREHOOK: query: insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Output: default@tstsrcpart@ds=2008-04-08/hr=11 +POSTHOOK: query: insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Output: default@tstsrcpart@ds=2008-04-08/hr=11 +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table tstsrcpart is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg2.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg2.q.out (revision 0) @@ -0,0 +1,23 @@ +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tstsrc like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrc like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrc +PREHOOK: query: insert overwrite table tstsrc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tstsrc +POSTHOOK: query: insert overwrite table tstsrc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: UNLOCK TABLE tstsrc +PREHOOK: type: UNLOCKTABLE +FAILED: Error in metadata: Table tstsrc is not locked +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientnegative/lockneg1.q.out =================================================================== --- ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) +++ ql/src/test/results/clientnegative/lockneg1.q.out (revision 0) @@ -0,0 +1,35 @@ +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tstsrc like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrc like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrc +PREHOOK: query: insert overwrite table tstsrc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tstsrc +POSTHOOK: query: insert overwrite table tstsrc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrc SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrc SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrc SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrc SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrc EXCLUSIVE +PREHOOK: type: LOCKTABLE +conflicting lock present +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask Index: ql/src/test/results/clientpositive/lock1.q.out =================================================================== --- ql/src/test/results/clientpositive/lock1.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock1.q.out (revision 0) @@ -0,0 +1,99 @@ +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tstsrc like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrc like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrc +PREHOOK: query: insert overwrite table tstsrc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tstsrc +POSTHOOK: query: insert overwrite table tstsrc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrc shared +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrc shared +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrc SHARED +PREHOOK: query: UNLOCK TABLE tstsrc +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE tstsrc +POSTHOOK: type: UNLOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: lock TABLE tstsrc SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: lock TABLE tstsrc SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrc SHARED +PREHOOK: query: LOCK TABLE tstsrc SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrc SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrc SHARED +default@tstsrc SHARED +PREHOOK: query: UNLOCK TABLE tstsrc +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE tstsrc +POSTHOOK: type: UNLOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tstsrc +PREHOOK: Output: default@tstsrc +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tstsrc +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/results/clientpositive/lock2.q.out =================================================================== --- ql/src/test/results/clientpositive/lock2.q.out (revision 0) +++ ql/src/test/results/clientpositive/lock2.q.out (revision 0) @@ -0,0 +1,156 @@ +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table tstsrc like src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrc like src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrc +PREHOOK: query: insert overwrite table tstsrc select key, value from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@tstsrc +POSTHOOK: query: insert overwrite table tstsrc select key, value from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table tstsrcpart +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table tstsrcpart +POSTHOOK: type: DROPTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: create table tstsrcpart like srcpart +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table tstsrcpart like srcpart +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tstsrcpart +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11' +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Output: default@tstsrcpart@ds=2008-04-08/hr=11 +POSTHOOK: query: insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Output: default@tstsrcpart@ds=2008-04-08/hr=11 +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrc SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrc SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrcpart SHARED +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrcpart SHARED +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: LOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +PREHOOK: type: LOCKTABLE +POSTHOOK: query: LOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE +POSTHOOK: type: LOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrc SHARED +default@tstsrcpart SHARED +default@tstsrcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE tstsrc +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE tstsrc +POSTHOOK: type: UNLOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrcpart SHARED +default@tstsrcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE tstsrcpart +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE tstsrcpart +POSTHOOK: type: UNLOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +default@tstsrcpart@ds=2008-04-08/hr=11 EXCLUSIVE +PREHOOK: query: UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') +PREHOOK: type: UNLOCKTABLE +POSTHOOK: query: UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') +POSTHOOK: type: UNLOCKTABLE +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: SHOW LOCKS +PREHOOK: type: SHOWLOCKS +POSTHOOK: query: SHOW LOCKS +POSTHOOK: type: SHOWLOCKS +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table tstsrc +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tstsrc +PREHOOK: Output: default@tstsrc +POSTHOOK: query: drop table tstsrc +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tstsrc +POSTHOOK: Output: default@tstsrc +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: drop table tstsrcpart +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tstsrcpart +PREHOOK: Output: default@tstsrcpart +POSTHOOK: query: drop table tstsrcpart +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tstsrcpart +POSTHOOK: Output: default@tstsrcpart +POSTHOOK: Lineage: tstsrc.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrc.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).key SIMPLE [(srcpart)srcpart.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: tstsrcpart PARTITION(ds=2008-04-08,hr=11).value SIMPLE [(srcpart)srcpart.FieldSchema(name:value, type:string, comment:default), ] Index: ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (revision 988314) +++ ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.tools.LineageInfo; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.hive.ql.QTestUtil.QTestSetup; /** * TestHiveHistory. @@ -53,6 +54,7 @@ private static Path tmppath = new Path(tmpdir); private static Hive db; private static FileSystem fs; + private QTestSetup setup; /* * intialize the tables @@ -75,6 +77,9 @@ } } + setup = new QTestSetup(); + setup.preTest(conf); + // copy the test files into hadoop if required. int i = 0; Path[] hadoopDataFile = new Path[2]; @@ -109,6 +114,19 @@ } } + @Override + protected void tearDown() { + try { + setup.tearDown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + /** * Check history file output for this query. */ @@ -133,7 +151,7 @@ SessionState.start(ss); String cmd = "select a.key from src a"; - Driver d = new Driver(); + Driver d = new Driver(conf); int ret = d.run(cmd).getResponseCode(); if (ret != 0) { fail("Failed"); Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 988314) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -38,6 +38,7 @@ import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import junit.framework.Test; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -69,6 +70,9 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; +import org.apache.zookeeper.ZooKeeper; +import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; /** * QTestUtil. @@ -103,6 +107,7 @@ private HadoopShims.MiniDFSShim dfs = null; private boolean miniMr = false; private String hadoopVer = null; + private QTestSetup setup = null; public boolean deleteDirectory(File path) { if (path.exists()) { @@ -198,10 +203,10 @@ .concat("/build/ql/test/data/warehouse/")); conf.set("mapred.job.tracker", "localhost:" + mr.getJobTrackerPort()); } - } - public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) throws Exception { + public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) + throws Exception { this.outDir = outDir; this.logDir = logDir; conf = new HiveConf(Driver.class); @@ -227,6 +232,8 @@ overWrite = true; } + setup = new QTestSetup(); + setup.preTest(conf); init(); } @@ -301,6 +308,13 @@ /** * Clear out any side effects of running tests */ + public void clearPostTestEffects () throws Exception { + setup.postTest(conf); + } + + /** + * Clear out any side effects of running tests + */ public void clearTestSideEffects () throws Exception { // delete any tables other than the source tables for (String s: db.getAllTables()) { @@ -312,9 +326,9 @@ // modify conf by using 'set' commands conf = new HiveConf (Driver.class); initConf(); + setup.preTest(conf); } - public void cleanUp() throws Exception { String warehousePath = ((new URI(testWarehouse)).getPath()); // Drop any tables that remain due to unsuccessful runs @@ -329,6 +343,7 @@ } FunctionRegistry.unregisterTemporaryUDF("test_udaf"); FunctionRegistry.unregisterTemporaryUDF("test_error"); + setup.tearDown(); } private void runLoadCmd(String loadCmd) throws Exception { @@ -916,6 +931,59 @@ } /** + * QTestSetup defines test fixtures which are reused across testcases, + * and are needed before any test can be run + */ + public static class QTestSetup + { + private MiniZooKeeperCluster zooKeeperCluster = null; + private int zkPort; + private ZooKeeper zooKeeper; + + public QTestSetup() { + } + + public void preTest(HiveConf conf) throws Exception { + + if (zooKeeperCluster == null) { + String tmpdir = System.getProperty("user.dir")+"/../build/ql/tmp"; + zooKeeperCluster = new MiniZooKeeperCluster(); + zkPort = zooKeeperCluster.startup(new File(tmpdir, "zookeeper")); + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, null); + + String zkServer = "localhost"; + conf.set("hive.zookeeper.quorum", zkServer); + conf.set("hive.zookeeper.client.port", "" + zkPort); + } + + public void postTest(HiveConf conf) throws Exception { + if (zooKeeperCluster == null) { + return; + } + + if (zooKeeper != null) { + zooKeeper.close(); + } + + ZooKeeperHiveLockManager.releaseAllLocks(conf); + } + + public void tearDown() throws Exception { + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + } + } + + /** * QTRunner: Runnable class for running a a single query file. * **/ @@ -962,17 +1030,18 @@ * (in terms of destination tables) */ public static boolean queryListRunner(File[] qfiles, String[] resDirs, - String[] logDirs, boolean mt) { + String[] logDirs, boolean mt, Test test) { assert (qfiles.length == resDirs.length); assert (qfiles.length == logDirs.length); boolean failed = false; - try { QTestUtil[] qt = new QTestUtil[qfiles.length]; + QTestSetup[] qsetup = new QTestSetup[qfiles.length]; for (int i = 0; i < qfiles.length; i++) { - qt[i] = new QTestUtil(resDirs[i], logDirs[i]); + qt[i] = new QTestUtil(resDirs[i], logDirs[i], false, "0.20"); qt[i].addFile(qfiles[i]); + qt[i].clearTestSideEffects(); } if (mt) { @@ -980,6 +1049,7 @@ qt[0].cleanUp(); qt[0].createSources(); + qt[0].clearTestSideEffects(); QTRunner[] qtRunners = new QTestUtil.QTRunner[qfiles.length]; Thread[] qtThread = new Thread[qfiles.length]; Index: ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (revision 988314) +++ ql/src/test/org/apache/hadoop/hive/ql/TestMTQueries.java (working copy) @@ -46,7 +46,7 @@ qfiles[i] = new File(inpDir, testNames[i]); } - boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true); + boolean success = QTestUtil.queryListRunner(qfiles, resDirs, logDirs, true, this); if (!success) { fail("One or more queries failed"); } Index: ql/src/test/queries/clientnegative/lockneg2.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg2.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg2.q (revision 0) @@ -0,0 +1,5 @@ +drop table tstsrc; +create table tstsrc like src; +insert overwrite table tstsrc select key, value from src; + +UNLOCK TABLE tstsrc; Index: ql/src/test/queries/clientnegative/lockneg3.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg3.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg3.q (revision 0) @@ -0,0 +1,7 @@ +drop table tstsrcpart; +create table tstsrcpart like srcpart; + +insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11'; + +UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11'); Index: ql/src/test/queries/clientnegative/lockneg1.q =================================================================== --- ql/src/test/queries/clientnegative/lockneg1.q (revision 0) +++ ql/src/test/queries/clientnegative/lockneg1.q (revision 0) @@ -0,0 +1,7 @@ +drop table tstsrc; +create table tstsrc like src; +insert overwrite table tstsrc select key, value from src; + +LOCK TABLE tstsrc SHARED; +LOCK TABLE tstsrc SHARED; +LOCK TABLE tstsrc EXCLUSIVE; Index: ql/src/test/queries/clientpositive/lock1.q =================================================================== --- ql/src/test/queries/clientpositive/lock1.q (revision 0) +++ ql/src/test/queries/clientpositive/lock1.q (revision 0) @@ -0,0 +1,18 @@ +drop table tstsrc; +create table tstsrc like src; +insert overwrite table tstsrc select key, value from src; + +SHOW LOCKS; + +LOCK TABLE tstsrc shared; +SHOW LOCKS; +UNLOCK TABLE tstsrc; +SHOW LOCKS; +lock TABLE tstsrc SHARED; +SHOW LOCKS; +LOCK TABLE tstsrc SHARED; +SHOW LOCKS; +UNLOCK TABLE tstsrc; +SHOW LOCKS; + +drop table tstsrc; Index: ql/src/test/queries/clientpositive/lock2.q =================================================================== --- ql/src/test/queries/clientpositive/lock2.q (revision 0) +++ ql/src/test/queries/clientpositive/lock2.q (revision 0) @@ -0,0 +1,24 @@ +drop table tstsrc; +create table tstsrc like src; +insert overwrite table tstsrc select key, value from src; + +drop table tstsrcpart; +create table tstsrcpart like srcpart; + +insert overwrite table tstsrcpart partition (ds='2008-04-08', hr='11') +select key, value from srcpart where ds='2008-04-08' and hr='11'; + +LOCK TABLE tstsrc SHARED; +LOCK TABLE tstsrcpart SHARED; +LOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11') EXCLUSIVE; +SHOW LOCKS; +UNLOCK TABLE tstsrc; +SHOW LOCKS; +UNLOCK TABLE tstsrcpart; +SHOW LOCKS; +UNLOCK TABLE tstsrcpart PARTITION(ds='2008-04-08', hr='11'); +SHOW LOCKS; + + +drop table tstsrc; +drop table tstsrcpart; Index: ql/src/test/templates/TestCliDriver.vm =================================================================== --- ql/src/test/templates/TestCliDriver.vm (revision 988314) +++ ql/src/test/templates/TestCliDriver.vm (working copy) @@ -71,6 +71,7 @@ @Override protected void tearDown() { try { + qt.clearPostTestEffects(); if (getName().equals("testCliDriver_shutdown")) qt.shutdown(); } @@ -110,7 +111,7 @@ if (qt.shouldBeSkipped("$fname")) { return; } - + qt.cliInit("$fname", false); int ecode = qt.executeClient("$fname"); if (ecode != 0) { Index: ql/src/test/templates/TestNegativeCliDriver.vm =================================================================== --- ql/src/test/templates/TestNegativeCliDriver.vm (revision 988314) +++ ql/src/test/templates/TestNegativeCliDriver.vm (working copy) @@ -16,13 +16,13 @@ public class $className extends TestCase { private static QTestUtil qt; + static { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()"); + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", false, "0.20"); // do a one time initialization qt.cleanUp(); qt.createSources(); - } catch (Exception e) { System.out.println("Exception: " + e.getMessage()); e.printStackTrace(); @@ -47,6 +47,21 @@ } } + @Override + protected void tearDown() { + try { + qt.clearPostTestEffects(); + if (getName().equals("testNegativeCliDriver_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); #foreach ($qf in $qfiles) @@ -55,9 +70,17 @@ #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testNegativeCliDriver_$tname")); #end + suite.addTest(new $className("testNegativeCliDriver_shutdown")); return suite; } + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testNegativeCliDriver_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + static String debugHint = "\nSee build/ql/tmp/hive.log, " + "or try \"ant test ... -Dtest.silent=false\" to get more logs."; @@ -75,7 +98,7 @@ System.out.println("Test $fname skipped"); return; } - + qt.cliInit("$fname", false); int ecode = qt.executeClient("$fname"); if (ecode == 0) { Index: ql/src/test/templates/TestParse.vm =================================================================== --- ql/src/test/templates/TestParse.vm (revision 988314) +++ ql/src/test/templates/TestParse.vm (working copy) @@ -48,17 +48,41 @@ } } + @Override + protected void tearDown() { + try { + qt.clearPostTestEffects(); + if (getName().equals("testParse_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + public static Test suite() { TestSuite suite = new TestSuite(); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testParse_$tname")); #end + suite.addTest(new $className("testParse_shutdown")); return suite; } + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testParse_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + static String debugHint = "\nSee build/ql/tmp/hive.log, " + "or try \"ant test ... -Dtest.silent=false\" to get more logs."; Index: ql/src/test/templates/TestParseNegative.vm =================================================================== --- ql/src/test/templates/TestParseNegative.vm (revision 988314) +++ ql/src/test/templates/TestParseNegative.vm (working copy) @@ -28,7 +28,7 @@ fail("Unexpected exception in static initialization"); } } - + public $className(String name) { super(name); qt = null; @@ -37,7 +37,7 @@ @Override protected void setUp() { try { - qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", + qt = new QTestUtil("$resultsDir.getCanonicalPath()", "$logDir.getCanonicalPath()", miniMR, hadoopVer); } catch (Exception e) { @@ -48,14 +48,38 @@ } } + @Override + protected void tearDown() { + try { + qt.clearPostTestEffects(); + if (getName().equals("testParseNegative_shutdown")) + qt.shutdown(); + } + catch (Exception e) { + System.out.println("Exception: " + e.getMessage()); + e.printStackTrace(); + System.out.flush(); + fail("Unexpected exception in tearDown"); + } + } + + /** + * Dummy last test. This is only meant to shutdown qt + */ + public void testParseNegative_shutdown() { + System.out.println ("Cleaning up " + "$className"); + } + public static Test suite() { TestSuite suite = new TestSuite(); + #foreach ($qf in $qfiles) #set ($fname = $qf.getName()) #set ($eidx = $fname.length() - 2) #set ($tname = $fname.substring(0, $eidx)) suite.addTest(new $className("testParseNegative_$tname")); #end + suite.addTest(new $className("testParseNegative_shutdown")); return suite; } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (working copy) @@ -520,4 +520,12 @@ ProtectMode mode = getProtectMode(); return (!mode.offline && !mode.readOnly); } + + /** + * @return include the db name + */ + public String getCompleteName() { + return getTable().getCompleteName() + "@" + getName(); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java (revision 0) @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A Hive Table Partition: is a fundamental storage unit within a Table. Currently, Hive does not support + * hierarchical partitions - For eg: if partition ds=1, hr=1 exists, there is no way to access ds=1 + * + * Hierarchical partitions are needed in some cases, for eg. locking. For now, create a dummy partition to + * satisfy this + */ +public class DummyPartition extends Partition { + + @SuppressWarnings("nls") + static final private Log LOG = LogFactory + .getLog("hive.ql.metadata.DummyPartition"); + + private String name; + public DummyPartition() { + } + + public DummyPartition(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getCompleteName() { + return getName(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (working copy) @@ -794,4 +794,11 @@ ProtectMode mode = getProtectMode(); return (!mode.offline && !mode.readOnly); } + + /** + * @return include the db name + */ + public String getCompleteName() { + return getDbName() + "@" + getTableName(); + } }; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -35,6 +35,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collections; +import java.util.Comparator; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -59,6 +61,7 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -84,11 +87,18 @@ import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.MsckDesc; import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -225,6 +235,21 @@ return showFunctions(showFuncs); } + ShowLocksDesc showLocks = work.getShowLocksDesc(); + if (showLocks != null) { + return showLocks(showLocks); + } + + LockTableDesc lockTbl = work.getLockTblDesc(); + if (lockTbl != null) { + return lockTable(lockTbl); + } + + UnlockTableDesc unlockTbl = work.getUnlockTblDesc(); + if (unlockTbl != null) { + return unlockTable(unlockTbl); + } + ShowPartitionsDesc showParts = work.getShowPartsDesc(); if (showParts != null) { return showPartitions(db, showParts); @@ -1130,6 +1155,163 @@ } /** + * Write a list of the current locks to a file. + * + * @param showLocks + * the locks we're interested in. + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int showLocks(ShowLocksDesc showLocks) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("show Locks LockManager not specified"); + } + + // write the results in the file + try { + Path resFile = new Path(showLocks.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + DataOutput outStream = fs.create(resFile); + List locks = lockMgr.getLocks(); + + Collections.sort(locks, new Comparator() { + + @Override + public int compare(HiveLock o1, HiveLock o2) { + int cmp = o1.getHiveLockObject().getName().compareTo(o2.getHiveLockObject().getName()); + if (cmp == 0) { + if (o1.getHiveLockMode() == o2.getHiveLockMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getHiveLockMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + + }); + + Iterator locksIter = locks.iterator(); + + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + outStream.writeBytes(lock.getHiveLockObject().getName()); + outStream.write(separator); + outStream.writeBytes(lock.getHiveLockMode().toString()); + outStream.write(terminator); + } + ((FSDataOutputStream) outStream).close(); + } catch (FileNotFoundException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (IOException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (Exception e) { + throw new HiveException(e.toString()); + } + return 0; + } + + /** + * Lock the table/partition specified + * + * @param lockTbl + * the table/partition to be locked along with the mode + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int lockTable(LockTableDesc lockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("lock Table LockManager not specified"); + } + + HiveLockMode mode = HiveLockMode.valueOf(lockTbl.getMode()); + String tabName = lockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = lockTbl.getPartSpec(); + if (partSpec == null) { + HiveLock lck = lockMgr.lock(new HiveLockObject(tbl), mode, true); + if (lck == null) { + return 1; + } + return 0; + } + + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + HiveLock lck = lockMgr.lock(new HiveLockObject(par), mode, true); + if (lck == null) { + return 1; + } + return 0; + } + + /** + * Unlock the table/partition specified + * + * @param unlockTbl + * the table/partition to be unlocked + * @return Returns 0 when execution succeeds and above 0 if it fails. + * @throws HiveException + * Throws this exception if an unexpected error occurs. + */ + private int unlockTable(UnlockTableDesc unlockTbl) throws HiveException { + Context ctx = driverContext.getCtx(); + HiveLockManager lockMgr = ctx.getHiveLockMgr(); + if (lockMgr == null) { + throw new HiveException("unlock Table LockManager not specified"); + } + + String tabName = unlockTbl.getTableName(); + Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tabName); + if (tbl == null) { + throw new HiveException("Table " + tabName + " does not exist "); + } + + Map partSpec = unlockTbl.getPartSpec(); + HiveLockObject obj = null; + + if (partSpec == null) { + obj = new HiveLockObject(tbl); + } + else { + Partition par = db.getPartition(tbl, partSpec, false); + if (par == null) { + throw new HiveException("Partition " + partSpec + " for table " + tabName + " does not exist"); + } + obj = new HiveLockObject(par); + } + + List locks = lockMgr.getLocks(obj); + if ((locks == null) || (locks.isEmpty())) { + throw new HiveException("Table " + tabName + " is not locked "); + } + Iterator locksIter = locks.iterator(); + while (locksIter.hasNext()) { + HiveLock lock = locksIter.next(); + lockMgr.unlock(lock); + } + + return 0; + } + + /** * Shows a description of a function. * * @param descFunc Index: ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnlockTableDesc.java (revision 0) @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * UnlockTableDesc. + * + */ +@Explain(displayName = "Unlock Table") +public class UnlockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private Map partSpec; + + public UnlockTableDesc() { + } + + public UnlockTableDesc(String tableName, Map partSpec) { + this.tableName = tableName; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (working copy) @@ -38,7 +38,10 @@ private DropTableDesc dropTblDesc; private AlterTableDesc alterTblDesc; private ShowTablesDesc showTblsDesc; + private LockTableDesc lockTblDesc; + private UnlockTableDesc unlockTblDesc; private ShowFunctionsDesc showFuncsDesc; + private ShowLocksDesc showLocksDesc; private DescFunctionDesc descFunctionDesc; private ShowPartitionsDesc showPartsDesc; private DescTableDesc descTblDesc; @@ -143,6 +146,26 @@ } /** + * @param lockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + LockTableDesc lockTblDesc) { + this(inputs, outputs); + + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + UnlockTableDesc unlockTblDesc) { + this(inputs, outputs); + + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param showFuncsDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -153,6 +176,16 @@ } /** + * @param showLocksDesc + */ + public DDLWork(HashSet inputs, HashSet outputs, + ShowLocksDesc showLocksDesc) { + this(inputs, outputs); + + this.showLocksDesc = showLocksDesc; + } + + /** * @param descFuncDesc */ public DDLWork(HashSet inputs, HashSet outputs, @@ -331,6 +364,30 @@ } /** + * @return the showLocksDesc + */ + @Explain(displayName = "Show Lock Operator") + public ShowLocksDesc getShowLocksDesc() { + return showLocksDesc; + } + + /** + * @return the lockTblDesc + */ + @Explain(displayName = "Lock Table Operator") + public LockTableDesc getLockTblDesc() { + return lockTblDesc; + } + + /** + * @return the unlockTblDesc + */ + @Explain(displayName = "Unlock Table Operator") + public UnlockTableDesc getUnlockTblDesc() { + return unlockTblDesc; + } + + /** * @return the descFuncDesc */ @Explain(displayName = "Show Function Operator") @@ -347,6 +404,30 @@ } /** + * @param showLocksDesc + * the showLocksDesc to set + */ + public void setShowLocksDesc(ShowLocksDesc showLocksDesc) { + this.showLocksDesc = showLocksDesc; + } + + /** + * @param lockTblDesc + * the lockTblDesc to set + */ + public void setLockTblDesc(LockTableDesc lockTblDesc) { + this.lockTblDesc = lockTblDesc; + } + + /** + * @param unlockTblDesc + * the unlockTblDesc to set + */ + public void setUnlockTblDesc(UnlockTableDesc unlockTblDesc) { + this.unlockTblDesc = unlockTblDesc; + } + + /** * @param descFuncDesc * the showFuncsDesc to set */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ShowLocksDesc.java (revision 0) @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +import org.apache.hadoop.fs.Path; + +/** + * ShowLocksDesc. + * + */ +@Explain(displayName = "Show Locks") +public class ShowLocksDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + String resFile; + + /** + * table name for the result of show locks. + */ + private static final String table = "showlocks"; + /** + * thrift ddl for the result of show locks. + */ + private static final String schema = "tab_name,mode#string:string"; + + public String getTable() { + return table; + } + + public String getSchema() { + return schema; + } + + public ShowLocksDesc() { + } + + /** + * @param resFile + */ + public ShowLocksDesc(Path resFile) { + this.resFile = resFile.toString(); + } + + /** + * @return the resFile + */ + @Explain(displayName = "result file", normalExplain = false) + public String getResFile() { + return resFile; + } + + /** + * @param resFile + * the resFile to set + */ + public void setResFile(String resFile) { + this.resFile = resFile; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LockTableDesc.java (revision 0) @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * LockTableDesc. + * + */ +@Explain(displayName = "Lock Table") +public class LockTableDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + + private String tableName; + private String mode; + private Map partSpec; + + public LockTableDesc() { + } + + public LockTableDesc(String tableName, String mode, Map partSpec) { + this.tableName = tableName; + this.mode = mode; + this.partSpec = partSpec; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + public Map getPartSpec() { + return partSpec; + } + + public void setPartSpec(Map partSpec) { + this.partSpec = partSpec; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import java.util.List; + +public interface HiveLockManager { + + public void setContext(HiveLockManagerCtx ctx) throws LockException; + + /** + * @param key object to be locked + * @param mode mode of the lock (SHARED/EXCLUSIVE) + * @param keepAlive if the lock needs to be persisted after the statement + */ + public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) + throws LockException; + public void unlock(HiveLock hiveLock) throws LockException; + + public List getLocks() throws LockException; + public List getLocks(HiveLockObject key) throws LockException; + public void close() throws LockException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (revision 0) @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import org.apache.commons.lang.StringEscapeUtils; + +import org.apache.hadoop.hive.ql.parse.ErrorMsg; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + +public class ZooKeeperHiveLockManager implements HiveLockManager { + HiveLockManagerCtx ctx; + public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager"); + static final private LogHelper console = new LogHelper(LOG); + + private ZooKeeper zooKeeper; + + public ZooKeeperHiveLockManager() { + } + + /** + * @param conf The hive configuration + * Get the quorum server address from the configuration. The format is: + * host1:port, host2:port.. + **/ + private static String getQuorumServers(HiveConf conf) { + String hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM); + String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); + return hosts + ":" + port; + } + + /** + * @param ctx The lock manager context (containing the Hive configuration file) + * Start the ZooKeeper client based on the zookeeper cluster specified in the conf. + **/ + public void setContext(HiveLockManagerCtx ctx) throws LockException { + this.ctx = ctx; + HiveConf conf = ctx.getConf(); + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String quorumServers = ZooKeeperHiveLockManager.getQuorumServers(conf); + + try { + if (zooKeeper != null) { + zooKeeper.close(); + } + + zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); + } + } + + /** + * Since partition names can contain "/", which need all the parent directories to be created by ZooKeeper, + * replace "/" by a dummy name to ensure a single hierarchy. + **/ + private String getObjectName(HiveLockObject key, HiveLockMode mode) { + return "/" + key.getName().replaceAll("/", ctx.getConf().getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)) + "-" + mode + "-"; + } + + /** + * @param key The object to be locked + * @param mode The mode of the lock + * @param keepAlive Whether the lock is to be persisted after the statement + * Acuire the lock. Return null if a conflicting lock is present. + **/ + public ZooKeeperHiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) + throws LockException { + String name = getObjectName(key, mode); + String res; + + try { + if (keepAlive) { + res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); + } + else { + res = zooKeeper.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + int seqNo = getSequenceNumber(res, name); + if (seqNo == -1) { + zooKeeper.delete(res, -1); + return null; + } + + List children = zooKeeper.getChildren("/", false); + + String exLock = getObjectName(key, HiveLockMode.EXCLUSIVE); + String shLock = getObjectName(key, HiveLockMode.SHARED); + + for (String child : children) { + child = "/" + child; + + // Is there a conflicting lock on the same object with a lower sequence number + int childSeq = seqNo; + if (child.startsWith(exLock)) { + childSeq = getSequenceNumber(child, exLock); + } + if ((mode == HiveLockMode.EXCLUSIVE) && child.startsWith(shLock)) { + childSeq = getSequenceNumber(child, shLock); + } + + if ((childSeq >= 0) && (childSeq < seqNo)) { + zooKeeper.delete(res, -1); + console.printError("conflicting lock present "); + return null; + } + } + + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper lock: " + e); + throw new LockException(e); + } + + return new ZooKeeperHiveLock(res, key, mode); + } + + /* Remove the lock specified */ + public void unlock(HiveLock hiveLock) throws LockException { + unlock(ctx.getConf(), zooKeeper, hiveLock); + } + + /* Remove the lock specified */ + private static void unlock(HiveConf conf, ZooKeeper zkpClient, HiveLock hiveLock) throws LockException { + ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock; + try { + zkpClient.delete(zLock.getPath(), -1); + } catch (Exception e) { + LOG.error("Failed to release ZooKeeper lock: " + e); + throw new LockException(e); + } + } + + /* Release all locks - including PERSISTENT locks */ + public static void releaseAllLocks(HiveConf conf) throws Exception { + try { + int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String quorumServers = getQuorumServers(conf); + ZooKeeper zkpClient = new ZooKeeper(quorumServers, sessionTimeout, new DummyWatcher()); + List locks = getLocks(conf, zkpClient, null); + if (locks != null) { + for (HiveLock lock : locks) { + unlock(conf, zkpClient, lock); + } + } + + zkpClient.close(); + zkpClient = null; + } catch (Exception e) { + LOG.error("Failed to release all locks: " + e.getMessage()); + throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg()); + } + } + + /* Get all locks */ + public List getLocks() throws LockException { + return getLocks(ctx.getConf(), zooKeeper, null); + } + + /* Get all locks for a particular object */ + public List getLocks(HiveLockObject key) throws LockException { + return getLocks(ctx.getConf(), zooKeeper, key); + } + + /** + * @param conf Hive configuration + * @param zkpClient The ZooKeeper client + * @param key The object to be compared against - if key is null, then get all locks + **/ + private static List getLocks(HiveConf conf, ZooKeeper zkpClient, HiveLockObject key) throws LockException { + List locks = new ArrayList(); + List children; + + try { + children = zkpClient.getChildren("/", false); + } catch (Exception e) { + LOG.error("Failed to get ZooKeeper children: " + e); + throw new LockException(e); + } + + for (String child : children) { + child = "/" + child; + + HiveLockMode mode = getLockMode(conf, child); + if (mode == null) { + continue; + } + + HiveLockObject obj = getLockObject(conf, child, mode); + if ((key == null) || (obj.getName().equals(key.getName()))) { + HiveLock lck = (HiveLock)(new ZooKeeperHiveLock(child, obj, mode)); + locks.add(lck); + } + } + + return locks; + } + + /* Release all transient locks, by simply closing the client */ + public void close() throws LockException { + try { + if (zooKeeper != null) { + zooKeeper.close(); + zooKeeper = null; + } + } catch (Exception e) { + LOG.error("Failed to close zooKeeper client: " + e); + throw new LockException(e); + } + } + + /** + * Get the sequence number from the path. The sequence number is always at the end of the path. + **/ + private int getSequenceNumber(String resPath, String path) { + String tst = resPath.substring(path.length()); + try { + return (new Integer(tst)).intValue(); + } catch (Exception e) { + return -1; // invalid number + } + } + + /** + * Get the object from the path of the lock. + * The object may correspond to a table, a partition or a parent to a partition. + * For eg: if Table T is partitioned by ds, hr and ds=1/hr=1 is a valid partition, + * the lock may also correspond to T@ds=1, which is not a valid object + **/ + private static HiveLockObject getLockObject(HiveConf conf, String path, HiveLockMode mode) throws LockException { + try { + Hive db = Hive.get(conf); + int indx = path.lastIndexOf(mode.toString()); + String objName = path.substring(1, indx-1); + + String[] names = objName.split("@"); + Table tab = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, + names[1], false); // do not throw exception if table does not exist + assert (tab != null); + + if (names.length == 2) { + return new HiveLockObject(tab); + } + + String[] parts = names[2].split(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); + + Map partSpec = new HashMap(); + for (indx = 0; indx < parts.length; indx++) { + String[] partVals = parts[indx].split("="); + partSpec.put(partVals[0], partVals[1]); + } + + Partition partn; + try { + partn = db.getPartition(tab, partSpec, false); + } catch (HiveException e) { + partn =null; + } + + if (partn == null) { + return new HiveLockObject(new DummyPartition( + objName.replaceAll(conf.getVar(HiveConf.ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME), "/"))); + } + + return new HiveLockObject(partn); + } catch (Exception e) { + LOG.error("Failed to create ZooKeeper object: " + e); + throw new LockException(e); + } + } + + private static Pattern shMode = Pattern.compile("^.*-(SHARED)-([0-9]+)$"); + private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$"); + + /* Get the mode of the lock encoded in the path */ + private static HiveLockMode getLockMode(HiveConf conf, String path) { + + Matcher shMatcher = shMode.matcher(path); + Matcher exMatcher = exMode.matcher(path); + + if (shMatcher.matches()) + return HiveLockMode.SHARED; + + if (exMatcher.matches()) { + return HiveLockMode.EXCLUSIVE; + } + + return null; + } + + public static class DummyWatcher implements Watcher { + public void process(org.apache.zookeeper.WatchedEvent event) { + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLock.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr.zookeeper; + +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; + +public class ZooKeeperHiveLock extends HiveLock { + private String path; + private HiveLockObject obj; + private HiveLockMode mode; + + public ZooKeeperHiveLock(String path, HiveLockObject obj, HiveLockMode mode) { + this.path = path; + this.obj = obj; + this.mode = mode; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public HiveLockObject getHiveLockObject() { + return obj; + } + + public void setHiveLockObject(HiveLockObject obj) { + this.obj = obj; + } + + public HiveLockMode getHiveLockMode() { + return mode; + } + + public void setHiveLockMode(HiveLockMode mode) { + this.mode = mode; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLock.java (revision 0) @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +public abstract class HiveLock { + public abstract HiveLockObject getHiveLockObject(); + public abstract HiveLockMode getHiveLockMode(); +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/package-info.java (revision 0) @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Hive Lock Manager interfaces and some custom implmentations + */ +package org.apache.hadoop.hive.ql.lockmgr; Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/LockException.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Exception from lock manager. + */ + +public class LockException extends HiveException { + + private static final long serialVersionUID = 1L; + + public LockException() { + super(); + } + + public LockException(String message) { + super(message); + } + + public LockException(Throwable cause) { + super(cause); + } + + public LockException(String message, Throwable cause) { + super(message, cause); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java (revision 0) @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public enum HiveLockMode { + SHARED, EXCLUSIVE; +} + Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java (revision 0) @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; + +public class HiveLockObject { + /** + * The table. + */ + private Table t; + + /** + * The partition. This is null for a non partitioned table. + */ + private Partition p; + + public HiveLockObject() { + this.t = null; + this.p = null; + } + + public HiveLockObject(Table t) { + this.t = t; + this.p = null; + } + + public HiveLockObject(Partition p) { + this.t = null; + this.p = p; + } + + public Table getTable() { + return t; + } + + public void setTable (Table t) { + this.t = t; + } + + public Partition getPartition() { + return p; + } + + public void setPartition (Partition p) { + this.p = p; + } + + public String getName() { + if (t != null) { + return t.getCompleteName(); + } + else { + return p.getCompleteName(); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManagerCtx.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lockmgr; + +import org.apache.hadoop.hive.conf.HiveConf; + +public class HiveLockManagerCtx { + HiveConf conf; + + public HiveLockManagerCtx() { + } + + public HiveLockManagerCtx(HiveConf conf) { + this.conf = conf; + } + + public HiveConf getConf() { + return conf; + } + + public void setConf(HiveConf conf) { + this.conf = conf; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (working copy) @@ -19,9 +19,12 @@ package org.apache.hadoop.hive.ql.processors; import static org.apache.commons.lang.StringUtils.isBlank; +import java.util.Map; +import java.util.HashMap; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.conf.HiveConf; /** * CommandProcessorFactory. @@ -33,7 +36,12 @@ // prevent instantiation } + static Map mapDrivers = new HashMap(); public static CommandProcessor get(String cmd) { + return get(cmd, null); + } + + public static CommandProcessor get(String cmd, HiveConf conf) { String cmdl = cmd.toLowerCase(); if ("set".equals(cmdl)) { @@ -46,9 +54,28 @@ } else if ("delete".equals(cmdl)) { return new DeleteResourceProcessor(); } else if (!isBlank(cmd)) { - return new Driver(); + if (conf == null) { + return new Driver(); + } + + Driver drv = mapDrivers.get(conf); + if (drv == null) { + drv = new Driver(); + mapDrivers.put(conf, drv); + } + drv.init(); + return drv; } + return null; } + public static void clean(HiveConf conf) { + Driver drv = mapDrivers.get(conf); + if (drv != null) { + drv.destroy(); + } + + mapDrivers.remove(conf); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.List; import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; @@ -41,6 +42,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.conf.Configuration; /** @@ -67,7 +70,6 @@ // Keeps track of scratch directories created for different scheme/authority private final Map fsScratchDirs = new HashMap(); - private Configuration conf; protected int pathid = 10000; protected boolean explain = false; @@ -75,6 +77,12 @@ String executionId; + // List of Locks for this query + protected List hiveLocks; + protected HiveLockManager hiveLockMgr; + + private boolean needLockMgr; + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -86,7 +94,7 @@ public Context(Configuration conf, String executionId) { this.conf = conf; this.executionId = executionId; - + // non-local tmp location is configurable. however it is the same across // all external file systems nonLocalScratchPath = @@ -106,7 +114,7 @@ public void setExplain(boolean value) { explain = value; } - + /** * Find whether the current query is an explain query * @return true if the query is an explain query, false if not @@ -119,7 +127,7 @@ /** * Get a tmp directory on specified URI * - * @param scheme Scheme of the target FS + * @param scheme Scheme of the target FS * @param authority Authority of the target FS * @param mkdir create the directory if true * @param scratchdir path of tmp directory @@ -166,7 +174,7 @@ /** * Create a map-reduce scratch directory on demand and return it. - * + * */ public String getMRScratchDir() { @@ -231,7 +239,7 @@ /** * Get a path to store map-reduce intermediate data in. - * + * * @return next available path for map-red intermediate data */ public String getMRTmpFileURI() { @@ -241,8 +249,8 @@ /** - * Given a URI for mapreduce intermediate output, swizzle the - * it to point to the local file system. This can be called in + * Given a URI for mapreduce intermediate output, swizzle the + * it to point to the local file system. This can be called in * case the caller decides to run in local mode (in which case * all intermediate data can be stored locally) * @@ -259,7 +267,7 @@ ("Invalid URI: " + originalURI + ", cannot relativize against" + mrbase.toString()); - return getLocalScratchDir(!explain) + Path.SEPARATOR + + return getLocalScratchDir(!explain) + Path.SEPARATOR + relURI.getPath(); } @@ -343,6 +351,7 @@ } removeScratchDir(); originalTracker = null; + setNeedLockMgr(false); } public DataInput getStream() { @@ -458,6 +467,22 @@ return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local"); } + public List getHiveLocks() { + return hiveLocks; + } + + public void setHiveLocks(List hiveLocks) { + this.hiveLocks = hiveLocks; + } + + public HiveLockManager getHiveLockMgr() { + return hiveLockMgr; + } + + public void setHiveLockMgr(HiveLockManager hiveLockMgr) { + this.hiveLockMgr = hiveLockMgr; + } + public void setOriginalTracker(String originalTracker) { this.originalTracker = originalTracker; } @@ -474,7 +499,7 @@ pathToCS = new HashMap (); pathToCS.put(path, cs); } - + public ContentSummary getCS(String path) { if(pathToCS == null) pathToCS = new HashMap (); @@ -517,4 +542,12 @@ } paths.addAll(toAdd); } + + public boolean isNeedLockMgr() { + return needLockMgr; + } + + public void setNeedLockMgr(boolean needLockMgr) { + this.needLockMgr = needLockMgr; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy) @@ -150,6 +150,10 @@ + "STRING instead."), CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"), LOAD_INTO_NON_NATIVE("A non-native table cannot be used as target for LOAD"), + LOCKMGR_NOT_SPECIFIED("lock manager not specified correctly, set hive.lock.manager"), + LOCKMGR_NOT_INITIALIZED("lock manager could not be initialized, check hive.lock.manager "), + LOCK_CANNOT_BE_ACQUIRED("locks on the underlying objects cannot be acquired. retry after some time"), + ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED("Check hive.zookeeper.quorum and hive.zookeeper.client.port"), OVERWRITE_ARCHIVED_PART("Cannot overwrite an archived partition. " + "Unarchive before running this command."), ARCHIVE_METHODS_DISABLED("Archiving methods are currently disabled. " + Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -117,6 +117,9 @@ TOK_SHOWFUNCTIONS; TOK_SHOWPARTITIONS; TOK_SHOW_TABLESTATUS; +TOK_SHOWLOCKS; +TOK_LOCKTABLE; +TOK_UNLOCKTABLE; TOK_DROPTABLE; TOK_TABCOLLIST; TOK_TABCOL; @@ -237,6 +240,8 @@ | dropIndexStatement | alterIndexRebuild | dropFunctionStatement + | lockStatement + | unlockStatement ; ifNotExists @@ -577,8 +582,27 @@ | KW_SHOW KW_PARTITIONS Identifier partitionSpec? -> ^(TOK_SHOWPARTITIONS Identifier partitionSpec?) | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=Identifier)? KW_LIKE showStmtIdentifier partitionSpec? -> ^(TOK_SHOW_TABLESTATUS showStmtIdentifier $db_name? partitionSpec?) + | KW_SHOW KW_LOCKS -> ^(TOK_SHOWLOCKS) ; +lockStatement +@init { msgs.push("lock statement"); } +@after { msgs.pop(); } + : KW_LOCK KW_TABLE Identifier partitionSpec? lockMode -> ^(TOK_LOCKTABLE Identifier lockMode partitionSpec?) + ; + +lockMode +@init { msgs.push("lock mode"); } +@after { msgs.pop(); } + : KW_SHARED | KW_EXCLUSIVE + ; + +unlockStatement +@init { msgs.push("unlock statement"); } +@after { msgs.pop(); } + : KW_UNLOCK KW_TABLE Identifier partitionSpec? -> ^(TOK_UNLOCKTABLE Identifier partitionSpec?) + ; + metastoreCheck @init { msgs.push("metastore check statement"); } @after { msgs.pop(); } @@ -1012,7 +1036,7 @@ @init { msgs.push("select clause"); } @after { msgs.pop(); } : - KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) + KW_SELECT hintClause? (((KW_ALL | dist=KW_DISTINCT)? selectList) | (transform=KW_TRANSFORM selectTrfmClause)) -> {$transform == null && $dist == null}? ^(TOK_SELECT hintClause? selectList) -> {$transform == null && $dist != null}? ^(TOK_SELECTDI hintClause? selectList) @@ -1027,7 +1051,7 @@ : selectItem ( COMMA selectItem )* -> selectItem+ ; - + selectTrfmClause @init { msgs.push("transform clause"); } @after { msgs.pop(); } @@ -1770,7 +1794,10 @@ KW_SSL: 'SSL'; KW_UNDO: 'UNDO'; KW_LOCK: 'LOCK'; +KW_LOCKS: 'LOCKS'; KW_UNLOCK: 'UNLOCK'; +KW_SHARED: 'SHARED'; +KW_EXCLUSIVE: 'EXCLUSIVE'; KW_PROCEDURE: 'PROCEDURE'; KW_UNSIGNED: 'UNSIGNED'; KW_WHILE: 'WHILE'; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (working copy) @@ -56,6 +56,7 @@ commandType.put(HiveParser.TOK_SHOW_TABLESTATUS, "SHOW_TABLESTATUS"); commandType.put(HiveParser.TOK_SHOWFUNCTIONS, "SHOWFUNCTIONS"); commandType.put(HiveParser.TOK_SHOWPARTITIONS, "SHOWPARTITIONS"); + commandType.put(HiveParser.TOK_SHOWLOCKS, "SHOWLOCKS"); commandType.put(HiveParser.TOK_CREATEFUNCTION, "CREATEFUNCTION"); commandType.put(HiveParser.TOK_DROPFUNCTION, "DROPFUNCTION"); commandType.put(HiveParser.TOK_CREATEVIEW, "CREATEVIEW"); @@ -65,6 +66,8 @@ commandType.put(HiveParser.TOK_ALTERINDEX_REBUILD, "ALTERINDEX_REBUILD"); commandType.put(HiveParser.TOK_ALTERVIEW_PROPERTIES, "ALTERVIEW_PROPERTIES"); commandType.put(HiveParser.TOK_QUERY, "QUERY"); + commandType.put(HiveParser.TOK_LOCKTABLE, "LOCKTABLE"); + commandType.put(HiveParser.TOK_UNLOCKTABLE, "UNLOCKTABLE"); } static { @@ -109,12 +112,15 @@ case HiveParser.TOK_SHOW_TABLESTATUS: case HiveParser.TOK_SHOWFUNCTIONS: case HiveParser.TOK_SHOWPARTITIONS: + case HiveParser.TOK_SHOWLOCKS: case HiveParser.TOK_CREATEINDEX: case HiveParser.TOK_DROPINDEX: case HiveParser.TOK_ALTERTABLE_CLUSTER_SORT: case HiveParser.TOK_ALTERTABLE_TOUCH: case HiveParser.TOK_ALTERTABLE_ARCHIVE: case HiveParser.TOK_ALTERTABLE_UNARCHIVE: + case HiveParser.TOK_LOCKTABLE: + case HiveParser.TOK_UNLOCKTABLE: return new DDLSemanticAnalyzer(conf); case HiveParser.TOK_ALTERTABLE_PARTITION: String commandType = null; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (working copy) @@ -69,6 +69,9 @@ import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc; import org.apache.hadoop.hive.ql.plan.ShowTablesDesc; +import org.apache.hadoop.hive.ql.plan.ShowLocksDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.serde.Constants; @@ -131,7 +134,7 @@ super(conf); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME)); - + reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.DEFAULT_ZOOKEEPER_PARTITION_NAME)); // Partition value can't end in this suffix reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ORIGINAL)); reservedPartitionValues.add(HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED)); @@ -171,6 +174,9 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWFUNCTIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowFunctions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_SHOWLOCKS) { + ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); + analyzeShowLocks(ast); } else if (ast.getToken().getType() == HiveParser.TOK_DESCFUNCTION) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeDescFunction(ast); @@ -212,6 +218,10 @@ } else if (ast.getToken().getType() == HiveParser.TOK_SHOWPARTITIONS) { ctx.setResFile(new Path(ctx.getLocalTmpFileURI())); analyzeShowPartitions(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_LOCKTABLE) { + analyzeLockTable(ast); + } else if (ast.getToken().getType() == HiveParser.TOK_UNLOCKTABLE) { + analyzeUnlockTable(ast); } else { throw new SemanticException("Unsupported command."); } @@ -808,6 +818,84 @@ /** * Add the task according to the parsed command tree. This is used for the CLI + * command "SHOW LOCKS;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeShowLocks(ASTNode ast) throws SemanticException { + ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile()); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + showLocksDesc), conf)); + setFetchTask(createFetchTask(showLocksDesc.getSchema())); + + // Need to initialize the lock manager + ctx.setNeedLockMgr(true); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "LOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeLockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText().toLowerCase()); + String mode = unescapeIdentifier(ast.getChild(1).getText().toUpperCase()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if (partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + LockTableDesc lockTblDesc = new LockTableDesc(tableName, mode, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + lockTblDesc), conf)); + + // Need to initialize the lock manager + ctx.setNeedLockMgr(true); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI + * command "UNLOCK TABLE ..;". + * + * @param ast + * The parsed command tree. + * @throws SemanticException + * Parsing failed + */ + private void analyzeUnlockTable(ASTNode ast) + throws SemanticException { + String tableName = unescapeIdentifier(ast.getChild(0).getText().toLowerCase()); + List> partSpecs = getPartitionSpecs(ast); + + // We only can have a single partition spec + assert(partSpecs.size() <= 1); + Map partSpec = null; + if (partSpecs.size() > 0) { + partSpec = partSpecs.get(0); + } + + UnlockTableDesc unlockTblDesc = new UnlockTableDesc(tableName, partSpec); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + unlockTblDesc), conf)); + + // Need to initialize the lock manager + ctx.setNeedLockMgr(true); + } + + /** + * Add the task according to the parsed command tree. This is used for the CLI * command "DESCRIBE FUNCTION;". * * @param ast Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 988314) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -53,6 +55,14 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; @@ -72,6 +82,10 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Table; public class Driver implements CommandProcessor { @@ -86,6 +100,7 @@ private Context ctx; private QueryPlan plan; private Schema schema; + private HiveLockManager hiveLockMgr; private String errorMessage; private String SQLState; @@ -94,6 +109,40 @@ private int maxthreads; private final int sleeptime = 2000; + private int checkLockManager() { + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (supportConcurrency && (hiveLockMgr == null)) { + try { + setLockManager(); + } catch (SemanticException e) { + errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (12); + } + } + return (0); + } + + private void setLockManager() throws SemanticException { + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (supportConcurrency) { + String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER); + if ((lockMgr == null) || (lockMgr.isEmpty())) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); + } + + try { + hiveLockMgr = (HiveLockManager) + ReflectionUtils.newInstance(conf.getClassByName(lockMgr), conf); + hiveLockMgr.setContext(new HiveLockManagerCtx(conf)); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e.getMessage()); + } + } + } + public void init() { Operator.resetId(); } @@ -345,20 +394,256 @@ return plan; } + public static class LockObject { + HiveLockObject obj; + HiveLockMode mode; + + public LockObject(HiveLockObject obj, HiveLockMode mode) { + this.obj = obj; + this.mode = mode; + } + + public HiveLockObject getObj() { + return obj; + } + + public HiveLockMode getMode() { + return mode; + } + + public String getName() { + return obj.getName(); + } + } + + /** + * @param t The table to be locked + * @param p The partition to be locked + * @param mode The mode of the lock (SHARED/EXCLUSIVE) + * Get the list of objects to be locked. If a partition needs to be locked (in any mode), all its parents + * should also be locked in SHARED mode. + **/ + private List getLockObjects(Table t, Partition p, HiveLockMode mode) { + List locks = new LinkedList(); + + if (t != null) { + locks.add(new LockObject(new HiveLockObject(t), mode)); + return locks; + } + + if (p != null) { + locks.add(new LockObject(new HiveLockObject(p), mode)); + + // All the parents are locked in shared mode + mode = HiveLockMode.SHARED; + + String partName = p.getName(); + String partialName = ""; + String[] partns = p.getName().split("/"); + for (int idx = 0; idx < partns.length -1; idx++) { + String partn = partns[idx]; + partialName += partialName + partn; + locks.add(new LockObject(new HiveLockObject(new DummyPartition(p.getTable().getDbName() + "@" + p.getTable().getTableName() + "@" + partialName)), mode)); + partialName += "/"; + } + + locks.add(new LockObject(new HiveLockObject(p.getTable()), mode)); + } + return locks; + } + + /** + * Acquire read and write locks needed by the statement. The list of objects to be locked are obtained + * from he inputs and outputs populated by the compiler. The lock acuisition scheme is pretty simple. + * If all the locks cannot be obtained, error out. Deadlock is avoided by making sure that the locks + * are lexicographically sorted. + **/ + public int acquireReadWriteLocks() { + try { + int tryNum = 1; + int sleepTime = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES) * 1000; + int numRetries = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES); + + boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + if (!supportConcurrency) { + return 0; + } + + List lockObjects = new ArrayList(); + + // Sort all the inputs, outputs. + // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all its parents also + for (ReadEntity input : plan.getInputs()) { + if (input.getType() == ReadEntity.Type.TABLE) { + lockObjects.addAll(getLockObjects(input.getTable(), null, HiveLockMode.SHARED)); + } + else { + lockObjects.addAll(getLockObjects(null, input.getPartition(), HiveLockMode.SHARED)); + } + } + + for (WriteEntity output : plan.getOutputs()) { + if (output.getTyp() == WriteEntity.Type.TABLE) { + lockObjects.addAll(getLockObjects(output.getTable(), null, HiveLockMode.EXCLUSIVE)); + } + else if (output.getTyp() == WriteEntity.Type.PARTITION) { + lockObjects.addAll(getLockObjects(null, output.getPartition(), HiveLockMode.EXCLUSIVE)); + } + } + + if (lockObjects.isEmpty() && !ctx.isNeedLockMgr()) { + return 0; + } + + int ret = checkLockManager(); + if (ret != 0) { + return ret; + } + + + ctx.setHiveLockMgr(hiveLockMgr); + + Collections.sort(lockObjects, new Comparator() { + + @Override + public int compare(LockObject o1, LockObject o2) { + int cmp = o1.getName().compareTo(o2.getName()); + if (cmp == 0) { + if (o1.getMode() == o2.getMode()) { + return cmp; + } + // EXCLUSIVE locks occur before SHARED locks + if (o1.getMode() == HiveLockMode.EXCLUSIVE) { + return -1; + } + return +1; + } + return cmp; + } + + }); + + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + while (true) { + List hiveLocks = acquireLocks(lockObjects); + + if (hiveLocks == null) { + if (tryNum == numRetries) { + throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); + } + tryNum++; + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + else { + ctx.setHiveLocks(hiveLocks); + break; + } + } + return (0); + } catch (SemanticException e) { + errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return (10); + } + } + + /** + * @param lockObjects The list of objects to be locked + * Lock the objects specified in the list. The same object is not locked twice, and the list passed is sorted + * such that EXCLUSIVE locks occur before SHARED locks. + **/ + private List acquireLocks(List lockObjects) throws SemanticException { + // walk the list and acquire the locks - if any lock cant be acquired, release all locks, sleep and retry + LockObject prevLockObj = null; + List hiveLocks = new ArrayList(); + + for (LockObject lockObject: lockObjects) { + // No need to acquire a lock twice on the same object + // It is ensured that EXCLUSIVE locks occur before SHARED locks on the same object + if ((prevLockObj != null) && (prevLockObj.getName().equals(lockObject.getName()))) { + prevLockObj = lockObject; + continue; + } + + HiveLock lock = null; + try { + lock = ctx.getHiveLockMgr().lock(lockObject.getObj(), lockObject.getMode(), false); + } catch (LockException e) { + lock = null; + } + + if (lock == null) { + releaseLocks(hiveLocks); + return null; + } + + hiveLocks.add(lock); + prevLockObj = lockObject; + } + + return hiveLocks; + } + + /** + * Release all the locks acquired implicitly by the statement. Note that the locks acquired + * with 'keepAlive' set to True are not released. + **/ + private void releaseLocks() { + if (ctx != null && ctx.getHiveLockMgr() != null) { + try { + ctx.getHiveLockMgr().close(); + ctx.setHiveLocks(null); + } catch (LockException e) { + } + } + } + + /** + * @param hiveLocks list of hive locks to be released + * Release all the locks specified. If some of the locks have already been released, ignore them + **/ + private void releaseLocks(List hiveLocks) { + if (hiveLocks != null) { + for (HiveLock hiveLock: hiveLocks) { + try { + ctx.getHiveLockMgr().unlock(hiveLock); + } catch (LockException e) { + // The lock may have been released. Ignore and continue + } + } + ctx.setHiveLocks(null); + } + } + public CommandProcessorResponse run(String command) { errorMessage = null; SQLState = null; int ret = compile(command); if (ret != 0) { + releaseLocks(ctx.getHiveLocks()); return new CommandProcessorResponse(ret, errorMessage, SQLState); } + ret = acquireReadWriteLocks(); + if (ret != 0) { + releaseLocks(ctx.getHiveLocks()); + return new CommandProcessorResponse(ret, errorMessage, SQLState); + } + ret = execute(); if (ret != 0) { + releaseLocks(ctx.getHiveLocks()); return new CommandProcessorResponse(ret, errorMessage, SQLState); } + releaseLocks(ctx.getHiveLocks()); return new CommandProcessorResponse(ret); } @@ -722,6 +1007,10 @@ return 0; } + public void destroy() { + releaseLocks(); + } + public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { return plan.getQueryPlan();