diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9db72ae..1be72eb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,26 +18,6 @@ package org.apache.hadoop.hive.conf; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.security.auth.login.LoginException; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +27,14 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import javax.security.auth.login.LoginException; +import java.io.*; +import java.net.URL; +import java.util.*; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * Hive Configuration. */ @@ -134,7 +122,13 @@ HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN, HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, HiveConf.ConfVars.USERS_IN_ADMIN_ROLE, - HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER + HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + HiveConf.ConfVars.HIVE_TXN_MANAGER, + HiveConf.ConfVars.HIVE_TXN_JDBC_DRIVER, + HiveConf.ConfVars.HIVE_TXN_JDBC_CONNECT_STRING, + HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, + HiveConf.ConfVars.HIVE_TXN_TESTING }; /** @@ -693,6 +687,52 @@ HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace"), HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false), + // Transactions + HIVE_TXN_MANAGER("hive.txn.manager", + "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"), + HIVE_TXN_JDBC_DRIVER("hive.txn.driver", ""), + HIVE_TXN_JDBC_CONNECT_STRING("hive.txn.connection.string", ""), + // time after which transactions are declared aborted if the client has + // not sent a heartbeat, in seconds. + HIVE_TXN_TIMEOUT("hive.txn.timeout", 300), + + // Maximum number of transactions that can be fetched in one call to + // open_txns(). + // Increasing this will decrease the number of delta files created when + // streaming data into Hive. But it will also increase the number of + // open transactions at any given time, possibly impacting read + // performance. + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000), + + // Hack to handle setting up the transaction/lock database for qfile + // testing. + HIVE_TXN_TESTING("hive.txn.testing", false), + + // Whether to run the compactor's initiator thread in this metastore instance or not. + HIVE_COMPACTOR_INITIATOR_ON("hive.compactor.initiator.on", false), + + // Number of compactor worker threads to run on this metastore instance. + HIVE_COMPACTOR_WORKER_THREADS("hive.compactor.worker.threads", 0), + + // Time, in seconds, before a given compaction in working state is declared a failure and + // returned to the initiated state. + HIVE_COMPACTOR_WORKER_TIMEOUT("hive.compactor.worker.timeout", 86400L), + + // Time in seconds between checks to see if any partitions need compacted. This should be + // kept high because each check for compaction requires many calls against the NameNode. + HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", 300L), + + // Number of delta files that must exist in a directory before the compactor will attempt a + // minor compaction. + HIVE_COMPACTOR_DELTA_NUM_THRESHOLD("hive.compactor.delta.num.threshold", 10), + + // Percentage (by size) of base that deltas can be before major compaction is initiated. + HIVE_COMPACTOR_DELTA_PCT_THRESHOLD("hive.compactor.delta.pct.threshold", 0.1f), + + // Number of aborted transactions involving a particular table or partition before major + // compaction is initiated. + HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true), diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 58b2357..ae7c3c9 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -70,6 +70,37 @@ enum PartitionEventType { LOAD_DONE = 1, } +// Enums for transaction and lock management +enum TxnState { + COMMITTED = 1, + ABORTED = 2, + OPEN = 3, +} + +enum LockLevel { + DB = 1, + TABLE = 2, + PARTITION = 3, +} + +enum LockState { + ACQUIRED = 1, + WAITING = 2, + ABORT = 3, + NOT_ACQUIRED = 4, +} + +enum LockType { + SHARED_READ = 1, + SHARED_WRITE = 2, + EXCLUSIVE = 3, +} + +enum CompactionType { + MINOR = 1, + MAJOR = 2, +} + struct HiveObjectRef{ 1: HiveObjectType objectType, 2: string dbName, @@ -383,6 +414,122 @@ struct Function { 8: list resourceUris, } +// Structs for transaction and locks +struct TxnInfo { + 1: required i64 id, + 2: required TxnState state, + 3: required string user, + 4: required string hostname, +} + +struct GetOpenTxnsInfoResponse { + 1: required i64 txn_high_water_mark, + 2: required list open_txns, +} + +struct GetOpenTxnsResponse { + 1: required i64 txn_high_water_mark, + 2: required set open_txns, +} + +struct OpenTxnRequest { + 1: required i32 num_txns, + 2: required string user, + 3: required string hostname, +} + +struct OpenTxnsResponse { + 1: required list txn_ids, +} + +struct AbortTxnRequest { + 1: required i64 txnid, +} + +struct CommitTxnRequest { + 1: required i64 txnid, +} + +struct LockComponent { + 1: required LockType type, + 2: required LockLevel level, + 3: required string dbname, + 4: required string tablename, + 5: required string partitionname, +} + +struct LockRequest { + 1: required list component, + 2: optional i64 txnid, + 3: required string user, + 4: required string hostname, +} + +struct LockResponse { + 1: required i64 lockid, + 2: required LockState state, +} + +struct CheckLockRequest { + 1: required i64 lockid, +} + +struct UnlockRequest { + 1: required i64 lockid, +} + +struct ShowLocksRequest { +} + +struct ShowLocksResponseElement { + 1: required i64 lockid, + 2: required string dbname, + 3: optional string tablename, + 4: optional string partname, + 5: required LockState state, + 6: required LockType type, + 7: optional i64 txnid, + 8: required i64 lastheartbeat, + 9: optional i64 acquiredat, + 10: required string user, + 11: required string hostname, +} + +struct ShowLocksResponse { + 1: list locks, +} + +struct HeartbeatRequest { + 1: optional i64 lockid, + 2: optional i64 txnid +} + +struct CompactionRequest { + 1: required string dbname, + 2: required string tablename, + 3: optional string partitionname, + 4: required CompactionType type, + 5: optional string runas, +} + +struct ShowCompactRequest { +} + +struct ShowCompactResponseElement { + 1: required string dbname, + 2: required string tablename, + 3: required string partitionname, + 4: required CompactionType type, + 5: required string state, + 6: required string workerid, + 7: required i64 start, + 8: required string runAs, +} + +struct ShowCompactResponse { + 1: required list compacts, +} + exception MetaException { 1: string message } @@ -431,6 +578,23 @@ exception InvalidInputException { 1: string message } +// Transaction and lock exceptions +exception NoSuchTxnException { + 1: string message +} + +exception TxnAbortedException { + 1: string message +} + +exception TxnOpenException { + 1: string message +} + +exception NoSuchLockException { + 1: string message +} + /** * This interface is live. */ @@ -776,6 +940,23 @@ service ThriftHiveMetastore extends fb303.FacebookService // method to cancel delegation token obtained from metastore server void cancel_delegation_token(1:string token_str_form) throws (1:MetaException o1) + + // Transaction and lock management calls + // Get just list of open transactions + GetOpenTxnsResponse get_open_txns() + // Get list of open transactions with state (open, aborted) + GetOpenTxnsInfoResponse get_open_txns_info() + OpenTxnsResponse open_txns(1:OpenTxnRequest rqst) + void abort_txn(1:AbortTxnRequest rqst) throws (1:NoSuchTxnException o1) + void commit_txn(1:CommitTxnRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) + LockResponse lock(1:LockRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2) + LockResponse check_lock(1:CheckLockRequest rqst) + throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:NoSuchLockException o3) + void unlock(1:UnlockRequest rqst) throws (1:NoSuchLockException o1, 2:TxnOpenException o2) + ShowLocksResponse show_locks(1:ShowLocksRequest rqst) + void heartbeat(1:HeartbeatRequest ids) throws (1:NoSuchLockException o1, 2:NoSuchTxnException o2, 3:TxnAbortedException o3) + void compact(1:CompactionRequest rqst) + ShowCompactResponse show_compact(1:ShowCompactRequest rqst) } // * Note about the DDL_TIME: When creating or altering a table or a partition, diff --git metastore/pom.xml metastore/pom.xml index 5f7b4ad..98878eb 100644 --- metastore/pom.xml +++ metastore/pom.xml @@ -54,7 +54,6 @@ com.jolbox bonecp ${bonecp.version} - runtime commons-cli diff --git metastore/scripts/upgrade/derby/hive-schema-0.13.0.derby.sql metastore/scripts/upgrade/derby/hive-schema-0.13.0.derby.sql index f70a7c0..a2b5980 100644 --- metastore/scripts/upgrade/derby/hive-schema-0.13.0.derby.sql +++ metastore/scripts/upgrade/derby/hive-schema-0.13.0.derby.sql @@ -297,3 +297,8 @@ ALTER TABLE "APP"."IDXS" ADD CONSTRAINT "SQL110318025504980" CHECK (DEFERRED_REB ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED IN ('Y','N')); INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '0.13.0', 'Hive release version 0.13.0'); + +------------------------------ +-- Transaction and Lock Tables +------------------------------ +RUN 'hive-txn-schema-0.13.0.derby.sql'; diff --git metastore/scripts/upgrade/derby/upgrade-0.12.0-to-0.13.0.derby.sql metastore/scripts/upgrade/derby/upgrade-0.12.0-to-0.13.0.derby.sql index 1f1e830..3b481e3 100644 --- metastore/scripts/upgrade/derby/upgrade-0.12.0-to-0.13.0.derby.sql +++ metastore/scripts/upgrade/derby/upgrade-0.12.0-to-0.13.0.derby.sql @@ -1,2 +1,3 @@ -- Upgrade MetaStore schema from 0.11.0 to 0.12.0 +RUN 'hive-txn-schema-0.13.0.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='0.13.0', VERSION_COMMENT='Hive release version 0.13.0' where VER_ID=1; diff --git metastore/scripts/upgrade/mysql/hive-schema-0.13.0.mysql.sql metastore/scripts/upgrade/mysql/hive-schema-0.13.0.mysql.sql index 2cd8db8..3d4e54d 100644 --- metastore/scripts/upgrade/mysql/hive-schema-0.13.0.mysql.sql +++ metastore/scripts/upgrade/mysql/hive-schema-0.13.0.mysql.sql @@ -761,7 +761,12 @@ CREATE TABLE IF NOT EXISTS `VERSION` ( PRIMARY KEY (`VER_ID`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; -INSERT INTO VERSION (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '0.13.0', 'Hive release version 0.13.0'); +INSERT INTO VERSION (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '0.12.0', 'Hive release version 0.12.0'); + +------------------------------ +-- Transaction and Lock Tables +------------------------------ +SOURCE hive-txn-schema-0.13.0.mysql.sql; /*!40101 SET character_set_client = @saved_cs_client */; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; diff --git metastore/scripts/upgrade/mysql/upgrade-0.12.0-to-0.13.0.mysql.sql metastore/scripts/upgrade/mysql/upgrade-0.12.0-to-0.13.0.mysql.sql index 0296ac5..c58a6ad 100644 --- metastore/scripts/upgrade/mysql/upgrade-0.12.0-to-0.13.0.mysql.sql +++ metastore/scripts/upgrade/mysql/upgrade-0.12.0-to-0.13.0.mysql.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 0.12.0 to 0.13.0' AS ' '; SOURCE 015-HIVE-5700.mysql.sql; +SOURCE hive-txn-schema-0.13.0.mysql.sql; UPDATE VERSION SET SCHEMA_VERSION='0.13.0', VERSION_COMMENT='Hive release version 0.13.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 0.12.0 to 0.13.0' AS ' '; diff --git metastore/scripts/upgrade/oracle/hive-schema-0.13.0.oracle.sql metastore/scripts/upgrade/oracle/hive-schema-0.13.0.oracle.sql index fa483bf..f475231 100644 --- metastore/scripts/upgrade/oracle/hive-schema-0.13.0.oracle.sql +++ metastore/scripts/upgrade/oracle/hive-schema-0.13.0.oracle.sql @@ -716,3 +716,7 @@ CREATE INDEX PARTITIONEVENTINDEX ON PARTITION_EVENTS (PARTITION_NAME); INSERT INTO VERSION (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES (1, '0.13.0', 'Hive release version 0.13.0'); +------------------------------ +-- Transaction and lock tables +------------------------------ +@hive-txn-schema-0.13.0.oracle.sql; diff --git metastore/scripts/upgrade/oracle/upgrade-0.12.0-to-0.13.0.oracle.sql metastore/scripts/upgrade/oracle/upgrade-0.12.0-to-0.13.0.oracle.sql index 4898537..703566b 100644 --- metastore/scripts/upgrade/oracle/upgrade-0.12.0-to-0.13.0.oracle.sql +++ metastore/scripts/upgrade/oracle/upgrade-0.12.0-to-0.13.0.oracle.sql @@ -1,6 +1,7 @@ SELECT 'Upgrading MetaStore schema from 0.12.0 to 0.13.0' AS Status from dual; @015-HIVE-5700.oracle.sql; +@hive-txn-schema-0.13.0.oracle.sql; UPDATE VERSION SET SCHEMA_VERSION='0.13.0', VERSION_COMMENT='Hive release version 0.13.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 0.12.0 to 0.13.0' AS Status from dual; diff --git metastore/scripts/upgrade/postgres/hive-schema-0.13.0.postgres.sql metastore/scripts/upgrade/postgres/hive-schema-0.13.0.postgres.sql index 678d876..e06f7c7 100644 --- metastore/scripts/upgrade/postgres/hive-schema-0.13.0.postgres.sql +++ metastore/scripts/upgrade/postgres/hive-schema-0.13.0.postgres.sql @@ -1404,3 +1404,7 @@ INSERT INTO "VERSION" ("VER_ID", "SCHEMA_VERSION", "VERSION_COMMENT") VALUES (1, -- PostgreSQL database dump complete -- +------------------------------ +-- Transaction and lock tables +------------------------------ +\i hive-txn-schema-0.13.0.postgres.sql; diff --git metastore/scripts/upgrade/postgres/upgrade-0.12.0-to-0.13.0.postgres.sql metastore/scripts/upgrade/postgres/upgrade-0.12.0-to-0.13.0.postgres.sql index e96d4fd..fe7cf74 100644 --- metastore/scripts/upgrade/postgres/upgrade-0.12.0-to-0.13.0.postgres.sql +++ metastore/scripts/upgrade/postgres/upgrade-0.12.0-to-0.13.0.postgres.sql @@ -1,8 +1,7 @@ SELECT 'Upgrading MetaStore schema from 0.12.0 to 0.13.0'; +\i hive-txn-schema-0.13.0.postgres.sql; \i 015-HIVE-5700.postgres.sql; UPDATE "VERSION" SET "SCHEMA_VERSION"='0.13.0', "VERSION_COMMENT"='Hive release version 0.13.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 0.12.0 to 0.13.0'; - - diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 7c74e79..829a21b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.metastore; +import com.facebook.fb303.FacebookBase; +import com.facebook.fb303.fb_status; import static org.apache.commons.lang.StringUtils.join; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT; import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; @@ -60,12 +62,16 @@ import org.apache.hadoop.hive.common.metrics.Metrics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; @@ -74,6 +80,9 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; @@ -83,8 +92,14 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; @@ -97,16 +112,24 @@ import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableStatsRequest; import org.apache.hadoop.hive.metastore.api.TableStatsResult; -import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterTableEvent; @@ -149,14 +172,8 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.*; -import com.facebook.fb303.FacebookBase; -import com.facebook.fb303.fb_status; import com.google.common.base.Splitter; import com.google.common.collect.Lists; @@ -218,6 +235,13 @@ protected synchronized RawStore initialValue() { } }; + private final ThreadLocal threadLocalTxn = new ThreadLocal() { + @Override + protected synchronized TxnHandler initialValue() { + return null; + } + }; + // Thread local configuration is needed as many threads could make changes // to the conf using the connection hook private final ThreadLocal threadLocalConf = @@ -443,6 +467,15 @@ public RawStore getMS() throws MetaException { return ms; } + private TxnHandler getTxnHandler() { + TxnHandler txn = threadLocalTxn.get(); + if (txn == null) { + txn = new TxnHandler(hiveConf); + threadLocalTxn.set(txn); + } + return txn; + } + private RawStore newRawStore() throws MetaException { LOG.info(addPrefix("Opening raw store with implemenation class:" + rawStoreClassName)); @@ -4703,6 +4736,70 @@ public Function get_function(String dbName, String funcName) return func; } + + // Transaction and locking methods + @Override + public GetOpenTxnsResponse get_open_txns() { + return getTxnHandler().getOpenTxns(); + } + + // Transaction and locking methods + @Override + public GetOpenTxnsInfoResponse get_open_txns_info() { + return getTxnHandler().getOpenTxnsInfo(); + } + + @Override + public OpenTxnsResponse open_txns(OpenTxnRequest rqst) { + return getTxnHandler().openTxns(rqst); + } + + @Override + public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException { + getTxnHandler().abortTxn(rqst); + } + + @Override + public void commit_txn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException { + getTxnHandler().commitTxn(rqst); + } + + @Override + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException { + return getTxnHandler().lock(rqst); + } + + @Override + public LockResponse check_lock(CheckLockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, NoSuchLockException { + return getTxnHandler().checkLock(rqst); + } + + @Override + public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException { + getTxnHandler().unlock(rqst); + } + + @Override + public ShowLocksResponse show_locks(ShowLocksRequest rqst) { + return getTxnHandler().showLocks(rqst); + } + + @Override + public void heartbeat(HeartbeatRequest ids) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException { + getTxnHandler().heartbeat(ids); + } + + @Override + public void compact(CompactionRequest rqst) { + getTxnHandler().compact(rqst); + } + + @Override + public ShowCompactResponse show_compact(ShowCompactRequest rqst) { + return getTxnHandler().showCompact(rqst); + } } public static IHMSHandler newHMSHandler(String name, HiveConf hiveConf) throws MetaException { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 51464cf..8d5b225 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -26,8 +26,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; -import java.net.URI; -import java.net.URISyntaxException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -46,11 +46,16 @@ import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest; import org.apache.hadoop.hive.metastore.api.AddPartitionsResult; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.DropPartitionsExpr; @@ -59,6 +64,9 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatRequest; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.Index; @@ -66,8 +74,14 @@ import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest; @@ -78,13 +92,20 @@ import org.apache.hadoop.hive.metastore.api.PrivilegeBag; import org.apache.hadoop.hive.metastore.api.RequestPartsSpec; import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableStatsRequest; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.Type; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; @@ -98,6 +119,12 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME; +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.isIndexTable; + /** * Hive Metastore Client. */ @@ -665,7 +692,6 @@ public boolean dropPartition(String db_name, String tbl_name, List part_ * @param name * @param dbname * @throws NoSuchObjectException - * @throws ExistingDependentsException * @throws MetaException * @throws TException * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String, @@ -689,7 +715,6 @@ public void dropTable(String tableName, boolean deleteData) * @param deleteData * delete the underlying data or just delete the table in metadata * @throws NoSuchObjectException - * @throws ExistingDependentsException * @throws MetaException * @throws TException * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String, @@ -920,7 +945,8 @@ public Partition getPartitionWithAuthInfo(String db_name, String tbl_name, List part_vals, String user_name, List group_names) throws MetaException, UnknownTableException, NoSuchObjectException, TException { - return deepCopy(client.get_partition_with_auth(db_name, tbl_name, part_vals, user_name, group_names)); + return deepCopy(client.get_partition_with_auth(db_name, tbl_name, part_vals, user_name, + group_names)); } /** @@ -1444,6 +1470,167 @@ public void cancelDelegationToken(String tokenStrForm) throws MetaException, TEx client.cancel_delegation_token(tokenStrForm); } + public static class ValidTxnListImpl implements ValidTxnList { + + private GetOpenTxnsResponse txns; + + public ValidTxnListImpl() { + } + + public ValidTxnListImpl(GetOpenTxnsResponse t) { + txns = t; + } + + @Override + public boolean isTxnCommitted(long txnid) { + if (txns.getTxn_high_water_mark() < txnid) return false; + return !txns.getOpen_txns().contains(txnid); + } + + @Override + public RangeResponse isTxnRangeCommitted(long minTxnId, long maxTxnId) { + if (txns.getTxn_high_water_mark() < minTxnId) return RangeResponse.NONE; + + RangeResponse rc = RangeResponse.ALL; + boolean foundCommitted = false; + for (long id = minTxnId; id <= maxTxnId; id++) { + if (isTxnCommitted(id)) foundCommitted = true; + else rc = RangeResponse.SOME; + } + if (!foundCommitted) rc = RangeResponse.NONE; + return rc; + } + + @Override + public GetOpenTxnsResponse getOpenTxns() { + return txns; + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append(getOpenTxns().getTxn_high_water_mark()); + Set openTxns = getOpenTxns().getOpen_txns(); + if (openTxns != null && openTxns.size() > 0) { + for (long txn : openTxns) { + buf.append(':'); + buf.append(txn); + } + } else { + buf.append(':'); + } + return buf.toString(); + } + + @Override + public void fromString(String src) { + // Make sure we have a non-null value in txns so that any future calls to this don't NPE. + txns = new GetOpenTxnsResponse(); + if (src == null) { + txns.setTxn_high_water_mark(Long.MAX_VALUE); + txns.setOpen_txns(new HashSet()); + return; + } + + String[] tString = src.split(":"); + txns.setTxn_high_water_mark(Long.valueOf(tString[0])); + Set openTxns = new HashSet(); + for (int i = 1; i < tString.length; i++) openTxns.add(Long.valueOf(tString[i])); + txns.setOpen_txns(openTxns); + } + } + + @Override + public ValidTxnList getValidTxns() throws TException { + GetOpenTxnsResponse txns = client.get_open_txns(); + return new ValidTxnListImpl(txns); + } + + @Override + public long openTxn(String user) throws TException { + OpenTxnsResponse txns = openTxns(user, 1); + return txns.getTxn_ids().get(0); + } + + @Override + public OpenTxnsResponse openTxns(String user, int numTxns) throws TException { + String hostname = null; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + return client.open_txns(new OpenTxnRequest(numTxns, user, hostname)); + } + + @Override + public void rollbackTxn(long txnid) throws NoSuchTxnException, TException { + client.abort_txn(new AbortTxnRequest(txnid)); + } + + @Override + public void commitTxn(long txnid) + throws NoSuchTxnException, TxnAbortedException, TException { + client.commit_txn(new CommitTxnRequest(txnid)); + } + + @Override + public GetOpenTxnsInfoResponse showTxns() throws TException { + return client.get_open_txns_info(); + } + + @Override + public LockResponse lock(LockRequest request) + throws NoSuchTxnException, TxnAbortedException, TException { + return client.lock(request); + } + + @Override + public LockResponse checkLock(long lockid) + throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, + TException { + return client.check_lock(new CheckLockRequest(lockid)); + } + + @Override + public void unlock(long lockid) + throws NoSuchLockException, TxnOpenException, TException { + client.unlock(new UnlockRequest(lockid)); + } + + @Override + public ShowLocksResponse showLocks() throws TException { + return client.show_locks(new ShowLocksRequest()); + } + + @Override + public void heartbeat(long txnid, long lockid) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, + TException { + HeartbeatRequest hb = new HeartbeatRequest(); + hb.setLockid(lockid); + hb.setTxnid(txnid); + client.heartbeat(hb); + } + + @Override + public void compact(String dbname, String tableName, String partitionName, CompactionType type) + throws TException { + CompactionRequest cr = new CompactionRequest(); + if (dbname == null) cr.setDbname(DEFAULT_DATABASE_NAME); + else cr.setDbname(dbname); + cr.setTablename(tableName); + if (partitionName != null) cr.setPartitionname(partitionName); + cr.setType(type); + client.compact(cr); + } + + @Override + public ShowCompactResponse showCompactions() throws TException { + return client.show_compact(new ShowCompactRequest()); + } + /** * Creates a synchronized wrapper for any {@link IMetaStoreClient}. * This may be used by multi-threaded applications until we have @@ -1483,7 +1670,8 @@ public Object invoke(Object proxy, Method method, Object [] args) @Override public void markPartitionForEvent(String db_name, String tbl_name, Map partKVs, PartitionEventType eventType) - throws MetaException, TException, NoSuchObjectException, UnknownDBException, UnknownTableException, + throws MetaException, TException, NoSuchObjectException, UnknownDBException, + UnknownTableException, InvalidPartitionException, UnknownPartitionException { assert db_name != null; assert tbl_name != null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 3515ae5..612a1a7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -18,6 +18,20 @@ package org.apache.hadoop.hive.metastore; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.thrift.TException; + import java.util.List; import java.util.Map; @@ -159,7 +173,6 @@ * The table wasn't found. * @throws TException * A thrift communication error occurred - * @throws ExistingDependentsException */ public void dropTable(String dbname, String tableName, boolean deleteData, boolean ignoreUknownTab) throws MetaException, TException, @@ -361,7 +374,9 @@ public Partition getPartition(String tblName, String dbName, List partVals) throws NoSuchObjectException, MetaException, TException; /** - * @param partition + * @param partitionSpecs + * @param sourceDb + * @param sourceTable * @param destdb * @param destTableName * @return partition object @@ -1020,6 +1035,262 @@ public Function getFunction(String dbName, String funcName) public List getFunctions(String dbName, String pattern) throws MetaException, TException; + // Transaction and locking methods + public interface ValidTxnList { + + /** + * Key used to store valid txn list in a {@link org.apache.hadoop.conf.Configuration} object. + */ + public static final String VALID_TXNS_KEY = "hive.txn.valid.txns"; + + /** + * The response to a range query. NONE means no values in this range match, + * SOME mean that some do, and ALL means that every value does. + */ + public enum RangeResponse {NONE, SOME, ALL}; + + /** + * Indicates whether a given transaction has been committed and should be + * viewed as valid for read. + * @param txnid id for the transaction + * @return true if committed, false otherwise + */ + public boolean isTxnCommitted(long txnid); + + /** + * Find out if a range of transaction ids have been committed. + * @param minTxnId minimum txnid to look for, inclusive + * @param maxTxnId maximum txnid to look for, inclusive + * @return Indicate whether none, some, or all of these transactions have been committed. + */ + public RangeResponse isTxnRangeCommitted(long minTxnId, long maxTxnId); + + /** + * Get at the underlying OpenTxn structure. This is useful if the user + * wishes to get a list of all open transactions for more efficient + * filtering. + * @return open transactions + */ + public GetOpenTxnsResponse getOpenTxns(); + + /** + * Write this validTxnList into a string. Obviously all implementations will already + * implement this, but it is being called out specifically here to make clear that the + * implementation needs to override the default implementation. This should produce a string + * that can be used by {@link #fromString(String)} to populate a validTxnsList. + */ + public String toString(); + + /** + * Populate this validTxnList from the string. It is assumed that the string was created via + * {@link #toString()}. + * @param src source string. + */ + public void fromString(String src); + } + + /** + * Get a structure that details valid transactions. + * @return list of valid transactions + * @throws TException + */ + public ValidTxnList getValidTxns() throws TException; + + /** + * Initiate a transaction. + * @param user User who is opening this transaction. This is the Hive user, + * not necessarily the OS user. It is assumed that this user has already been + * authenticated and authorized at this point. + * @return transaction identifier + * @throws TException + */ + public long openTxn(String user) throws TException; + + /** + * Initiate a batch of transactions. It is not guaranteed that the + * requested number of transactions will be instantiated. The system has a + * maximum number instantiated per request, controlled by hive.txn.max + * .batch.open in hive-site.xml. If the user requests more than this + * value, only the configured max will be returned. + * + *

Increasing the number of transactions requested in the batch will + * allow applications that stream data into Hive to place more commits in a + * single file, thus reducing load on the namenode and making reads of the + * data more efficient. However, opening more transactions in a batch will + * also result in readers needing to keep a larger list of open + * transactions to ignore, potentially slowing their reads. Users will + * need to test in their system to understand the optimal number of + * transactions to request in a batch. + *

+ * @param user User who is opening this transaction. This is the Hive user, + * not necessarily the OS user. It is assumed that this user has already been + * authenticated and authorized at this point. + * @param numTxns number of requested transactions to open + * @return list of opened txn ids. As noted above, this may be less than + * requested, so the user should check how many were returned rather than + * optimistically assuming that the result matches the request. + * @throws TException + */ + public OpenTxnsResponse openTxns(String user, int numTxns) throws TException; + + /** + * Rollback a transaction. This will also unlock any locks associated with + * this transaction. + * @param txnid id of transaction to be rolled back. + * @throws NoSuchTxnException if the requested transaction does not exist. + * Note that this can result from the transaction having timed out and been + * deleted. + * @throws TException + */ + public void rollbackTxn(long txnid) throws NoSuchTxnException, TException; + + /** + * Commit a transaction. This will also unlock any locks associated with + * this transaction. + * @param txnid id of transaction to be committed. + * @throws NoSuchTxnException if the requested transaction does not exist. + * This can result fro the transaction having timed out and been deleted by + * the compactor. + * @throws TxnAbortedException if the requested transaction has been + * aborted. This can result from the transaction timing out. + * @throws TException + */ + public void commitTxn(long txnid) + throws NoSuchTxnException, TxnAbortedException, TException; + + /** + * Show the list of currently open transactions. This is for use by "show transactions" in the + * grammar, not for applications that want to find a list of current transactions to work with. + * Those wishing the latter should call {@link #getValidTxns()}. + * @return List of currently opened transactions, included aborted ones. + * @throws TException + */ + public GetOpenTxnsInfoResponse showTxns() throws TException; + + /** + * Request a set of locks. All locks needed for a particular query, DML, + * or DDL operation should be batched together and requested in one lock + * call. This avoids deadlocks. It also avoids blocking other users who + * only require some of the locks required by this user. + * + *

If the operation requires a transaction (INSERT, UPDATE, + * or DELETE) that transaction id must be provided as part this lock + * request. All locks associated with a transaction will be released when + * that transaction is committed or rolled back.

+ * * + *

Once a lock is acquired, {@link #heartbeat(long, long)} must be called + * on a regular basis to avoid the lock being timed out by the system.

+ * @param request The lock request. {@link LockRequestBuilder} can be used + * construct this request. + * @return a lock response, which will provide two things, + * the id of the lock (to be used in all further calls regarding this lock) + * as well as a state of the lock. If the state is ACQUIRED then the user + * can proceed. If it is WAITING the user should wait and call + * {@link #checkLock(long)} before proceeding. All components of the lock + * will have the same state. + * @throws NoSuchTxnException if the requested transaction does not exist. + * This can result fro the transaction having timed out and been deleted by + * the compactor. + * @throws TxnAbortedException if the requested transaction has been + * aborted. This can result from the transaction timing out. + * @throws TException + */ + public LockResponse lock(LockRequest request) + throws NoSuchTxnException, TxnAbortedException, TException; + + /** + * Check the status of a set of locks requested via a + * {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} call. + * Once a lock is acquired, {@link #heartbeat(long, long)} must be called + * on a regular basis to avoid the lock being timed out by the system. + * @param lockid lock id returned by lock(). + * @return a lock response, which will provide two things, + * the id of the lock (to be used in all further calls regarding this lock) + * as well as a state of the lock. If the state is ACQUIRED then the user + * can proceed. If it is WAITING the user should wait and call + * this method again before proceeding. All components of the lock + * will have the same state. + * @throws NoSuchTxnException if the requested transaction does not exist. + * This can result fro the transaction having timed out and been deleted by + * the compactor. + * @throws TxnAbortedException if the requested transaction has been + * aborted. This can result from the transaction timing out. + * @throws NoSuchLockException if the requested lockid does not exist. + * This can result from the lock timing out and being unlocked by the system. + * @throws TException + */ + public LockResponse checkLock(long lockid) + throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, + TException; + + /** + * Unlock a set of locks. This can only be called when the locks are not + * assocaited with a transaction. + * @param lockid lock id returned by + * {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} + * @throws NoSuchLockException if the requested lockid does not exist. + * This can result from the lock timing out and being unlocked by the system. + * @throws TxnOpenException if the locks are are associated with a + * transaction. + * @throws TException + */ + public void unlock(long lockid) + throws NoSuchLockException, TxnOpenException, TException; + + /** + * Show all currently held and waiting locks. + * @return List of currently held and waiting locks. + * @throws TException + */ + public ShowLocksResponse showLocks() throws TException; + + /** + * Send a heartbeat to indicate that the client holding these locks (if + * any) and that opened this transaction (if one exists) is still alive. + * The default timeout for transactions and locks is 300 seconds, + * though it is configurable. To determine how often to heartbeat you will + * need to ask your system administrator how the metastore thrift service + * has been configured. + * @param txnid the id of the open transaction. If no transaction is open + * (it is a DDL or query) then this can be set to 0. + * @param lockid the id of the locks obtained. If no locks have been + * obtained then this can be set to 0. + * @throws NoSuchTxnException if the requested transaction does not exist. + * This can result fro the transaction having timed out and been deleted by + * the compactor. + * @throws TxnAbortedException if the requested transaction has been + * aborted. This can result from the transaction timing out. + * @throws NoSuchLockException if the requested lockid does not exist. + * This can result from the lock timing out and being unlocked by the system. + * @throws TException + */ + public void heartbeat(long txnid, long lockid) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, + TException; + + /** + * Send a request to compact a table or partition. This will not block until the compaction is + * complete. It will instead put a request on the queue for that table or partition to be + * compacted. No checking is done on the dbname, tableName, or partitionName to make sure they + * refer to valid objects. It is assumed this has already been done by the caller. + * @param dbname Name of the database the table is in. If null, this will be assumed to be + * 'default'. + * @param tableName Name of the table to be compacted. This cannot be null. If partitionName + * is null, this must be a non-partitioned table. + * @param partitionName Name of the partition to be compacted + * @param type Whether this is a major or minor compaction. + * @throws TException + */ + public void compact(String dbname, String tableName, String partitionName, CompactionType type) + throws TException; + + /** + * Get a list of all current compactions. + * @return List of all current compactions. This includes compactions waiting to happen, + * in progress, and finished but waiting to clean the existing files. + * @throws TException + */ + public ShowCompactResponse showCompactions() throws TException; public class IncompatibleMetastoreException extends MetaException { public IncompatibleMetastoreException(String message) { diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index e699c6b..b34978c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -18,18 +18,6 @@ package org.apache.hadoop.hive.ql; -import java.io.DataInput; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; - import org.antlr.runtime.TokenRewriteStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,10 +34,23 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; +import java.io.DataInput; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for * each query should call clear() at end of use to remove temporary folders @@ -93,6 +94,9 @@ protected List hiveLocks; protected HiveLockManager hiveLockMgr; + // Transaction manager for this query + protected HiveTxnManager hiveTxnManager; + private boolean needLockMgr; // Keep track of the mapping from load table desc to the output and the lock @@ -533,15 +537,12 @@ public void setHiveLocks(List hiveLocks) { this.hiveLocks = hiveLocks; } - public HiveLockManager getHiveLockMgr() { - if (hiveLockMgr != null) { - hiveLockMgr.refresh(); - } - return hiveLockMgr; + public HiveTxnManager getHiveTxnManager() { + return hiveTxnManager; } - public void setHiveLockMgr(HiveLockManager hiveLockMgr) { - this.hiveLockMgr = hiveLockMgr; + public void setHiveTxnManager(HiveTxnManager txnMgr) { + hiveTxnManager = txnMgr; } public void setOriginalTracker(String originalTracker) { diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 332cadb..efb8439 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -19,40 +19,18 @@ package org.apache.hadoop.hive.ql; -import java.io.DataInput; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.TaskResult; -import org.apache.hadoop.hive.ql.exec.TaskRunner; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; @@ -63,39 +41,15 @@ import org.apache.hadoop.hive.ql.hooks.PreExecute; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.AuthorizationException; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; -import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; -import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.ParseDriver; -import org.apache.hadoop.hive.ql.parse.ParseUtils; -import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.VariableSubstitution; +import org.apache.hadoop.hive.ql.parse.*; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -112,7 +66,13 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; + +import java.io.DataInput; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + public class Driver implements CommandProcessor { @@ -130,11 +90,10 @@ private Context ctx; private QueryPlan plan; private Schema schema; - private HiveLockManager hiveLockMgr; - private String errorMessage; private String SQLState; private Throwable downstreamError; + private HiveTxnManager txnMgr; // A limit on the number of threads that can be launched private int maxthreads; @@ -143,46 +102,35 @@ private String userName; + private boolean createTxnManager() { + if (txnMgr == null) { + try { + txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + } catch (LockException e) { + errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return false; + } + } + return true; + } + private boolean checkConcurrency() throws SemanticException { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { LOG.info("Concurrency mode is disabled, not creating a lock manager"); return false; } - createLockManager(); - // the reason that we set the lock manager for the cxt here is because each - // query has its own ctx object. The hiveLockMgr is shared accross the + createTxnManager(); + // the reason that we set the txn manager for the cxt here is because each + // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - ctx.setHiveLockMgr(hiveLockMgr); - return true; - } + ctx.setHiveTxnManager(txnMgr); - private void createLockManager() throws SemanticException { - if (hiveLockMgr != null) { - return; - } - String lockMgr = conf.getVar(HiveConf.ConfVars.HIVE_LOCK_MANAGER); - LOG.info("Creating lock manager of type " + lockMgr); - if ((lockMgr == null) || (lockMgr.isEmpty())) { - throw new SemanticException(ErrorMsg.LOCKMGR_NOT_SPECIFIED.getMsg()); - } - try { - hiveLockMgr = (HiveLockManager) ReflectionUtils.newInstance(conf.getClassByName(lockMgr), - conf); - hiveLockMgr.setContext(new HiveLockManagerCtx(conf)); - } catch (Exception e1) { - // set hiveLockMgr to null just in case this invalid manager got set to - // next query's ctx. - if (hiveLockMgr != null) { - try { - hiveLockMgr.close(); - } catch (LockException e2) { - //nothing can do here - } - hiveLockMgr = null; - } - throw new SemanticException(ErrorMsg.LOCKMGR_NOT_INITIALIZED.getMsg() + e1.getMessage(), e1); - } + return true; } @Override @@ -858,25 +806,20 @@ public QueryPlan getPlan() { return locks; } - /** - * Dedup the list of lock objects so that there is only one lock per table/partition. - * If there is both a shared and exclusive lock for the same object, this will deduped - * to just a single exclusive lock. - * @param lockObjects - */ - static void dedupLockObjects(List lockObjects) { - Map lockMap = new HashMap(); - for (HiveLockObj lockObj : lockObjects) { - String lockName = lockObj.getName(); - HiveLockObj foundLock = lockMap.get(lockName); - if (foundLock == null || lockObj.getMode() == HiveLockMode.EXCLUSIVE) { - lockMap.put(lockName, lockObj); - } - } - // copy set of deduped locks back to original list - lockObjects.clear(); - for (HiveLockObj lockObj : lockMap.values()) { - lockObjects.add(lockObj); + // Write the current set of valid transactions into the conf file so that it can be read by + // the input format. + private int recordValidTxns() { + try { + IMetaStoreClient.ValidTxnList txns = txnMgr.getValidTxns(); + ctx.getConf().set(IMetaStoreClient.ValidTxnList.VALID_TXNS_KEY, txns.toString()); + return 0; + } catch (LockException e) { + errorMessage = "FAILED: Error in determing valid transactions: " + e.getMessage(); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage, "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return 10; } } @@ -886,96 +829,21 @@ static void dedupLockObjects(List lockObjects) { * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making * sure that the locks are lexicographically sorted. **/ - public int acquireReadWriteLocks() { + private int acquireReadWriteLocks() { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); - try { - boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); - if (!supportConcurrency) { - return 0; - } - - List lockObjects = new ArrayList(); - - // Sort all the inputs, outputs. - // If a lock needs to be acquired on any partition, a read lock needs to be acquired on all - // its parents also - for (ReadEntity input : plan.getInputs()) { - if (input.getType() == ReadEntity.Type.DATABASE) { - lockObjects.addAll(getLockObjects(input.getDatabase(), null, null, HiveLockMode.SHARED)); - } else if (input.getType() == ReadEntity.Type.TABLE) { - lockObjects.addAll(getLockObjects(null, input.getTable(), null, HiveLockMode.SHARED)); - } else { - lockObjects.addAll(getLockObjects(null, null, input.getPartition(), HiveLockMode.SHARED)); - } - } - - for (WriteEntity output : plan.getOutputs()) { - List lockObj = null; - if (output.getType() == WriteEntity.Type.DATABASE) { - lockObjects.addAll(getLockObjects(output.getDatabase(), null, null, - output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED)); - } else if (output.getTyp() == WriteEntity.Type.TABLE) { - lockObj = getLockObjects(null, output.getTable(), null, - output.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED); - } else if (output.getTyp() == WriteEntity.Type.PARTITION) { - lockObj = getLockObjects(null, null, output.getPartition(), HiveLockMode.EXCLUSIVE); - } - // In case of dynamic queries, it is possible to have incomplete dummy partitions - else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { - lockObj = getLockObjects(null, null, output.getPartition(), HiveLockMode.SHARED); - } - - if(lockObj != null) { - lockObjects.addAll(lockObj); - ctx.getOutputLockObjects().put(output, lockObj); - } - } - if (lockObjects.isEmpty() && !ctx.isNeedLockMgr()) { - return 0; - } - - HiveLockObjectData lockData = - new HiveLockObjectData(plan.getQueryId(), - String.valueOf(System.currentTimeMillis()), - "IMPLICIT", - plan.getQueryStr()); - - // Lock the database also - String currentDb = SessionState.get().getCurrentDatabase(); - lockObjects.add( - new HiveLockObj( - new HiveLockObject(currentDb, lockData), - HiveLockMode.SHARED - ) - ); - - dedupLockObjects(lockObjects); - List hiveLocks = ctx.getHiveLockMgr().lock(lockObjects, false); - - if (hiveLocks == null) { - throw new SemanticException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); - } else { - ctx.setHiveLocks(hiveLocks); - } - - return (0); - } catch (SemanticException e) { - errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); - SQLState = ErrorMsg.findSQLState(e.getMessage()); - downstreamError = e; - console.printError(errorMessage, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return (10); + try { + txnMgr.acquireLocks(plan, ctx); + return 0; } catch (LockException e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); downstreamError = e; console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return (10); + return 10; } finally { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); } @@ -986,12 +854,12 @@ else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { * list of hive locks to be released Release all the locks specified. If some of the * locks have already been released, ignore them **/ - private void releaseLocks(List hiveLocks) { + private void releaseLocks(List hiveLocks) throws LockException { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); if (hiveLocks != null) { - ctx.getHiveLockMgr().releaseLocks(hiveLocks); + ctx.getHiveTxnManager().getLockManager().releaseLocks(hiveLocks); } ctx.setHiveLocks(null); @@ -1078,7 +946,12 @@ private int compileInternal(String command) { ret = compile(command); } if (ret != 0) { - releaseLocks(ctx.getHiveLocks()); + try { + releaseLocks(ctx.getHiveLocks()); + } catch (LockException e) { + LOG.warn("Exception in releasing locks. " + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } } return ret; } @@ -1163,10 +1036,17 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp } } + ret = recordValidTxns(); + if (ret != 0) return new CommandProcessorResponse(ret, errorMessage, SQLState); + if (requireLock) { ret = acquireReadWriteLocks(); if (ret != 0) { - releaseLocks(ctx.getHiveLocks()); + try { + releaseLocks(ctx.getHiveLocks()); + } catch (LockException e) { + // Not much to do here + } return new CommandProcessorResponse(ret, errorMessage, SQLState); } } @@ -1174,12 +1054,25 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp ret = execute(); if (ret != 0) { //if needRequireLock is false, the release here will do nothing because there is no lock - releaseLocks(ctx.getHiveLocks()); + try { + releaseLocks(ctx.getHiveLocks()); + } catch (LockException e) { + // Nothing to do here + } return new CommandProcessorResponse(ret, errorMessage, SQLState); } //if needRequireLock is false, the release here will do nothing because there is no lock - releaseLocks(ctx.getHiveLocks()); + try { + releaseLocks(ctx.getHiveLocks()); + } catch (LockException e) { + errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); + SQLState = ErrorMsg.findSQLState(e.getMessage()); + downstreamError = e; + console.printError(errorMessage + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return new CommandProcessorResponse(12, errorMessage, SQLState); + } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.close(LOG, plan); @@ -1731,17 +1624,16 @@ public int close() { public void destroy() { if (ctx != null) { - releaseLocks(ctx.getHiveLocks()); - } - - if (hiveLockMgr != null) { try { - hiveLockMgr.close(); - } catch(LockException e) { - LOG.warn("Exception in closing hive lock manager. " - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + releaseLocks(ctx.getHiveLocks()); + } catch (LockException e) { + LOG.warn("Exception when releasing locking in destroy: " + + e.getMessage()); } } + if (txnMgr != null) { + txnMgr.closeTxnManager(); + } } public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1ddee49..ecd4c5d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hive.ql; +import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ASTNodeOrigin; + import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; @@ -371,12 +376,35 @@ INVALID_HDFS_URI(10251, "{0} is not a hdfs uri", true), INVALID_DIR(10252, "{0} is not a directory", true), NO_VALID_LOCATIONS(10253, "Could not find any valid location to place the jars. " + - "Please update hive.jar.directory or hive.user.install.directory with a valid location", false), + "Please update hive.jar.directory or hive.user.install.directory with a valid location", false), UNNSUPPORTED_AUTHORIZATION_PRINCIPAL_TYPE_GROUP(10254, "Principal type GROUP is not supported in this authorization setting", "28000"), INVALID_TABLE_NAME(10255, "Invalid table name {0}", true), INSERT_INTO_IMMUTABLE_TABLE(10256, "Inserting into a non-empty immutable table is not allowed"), + TXNMGR_NOT_SPECIFIED(10260, "Transaction manager not specified correctly, " + + "set hive.txn.manager"), + TXNMGR_NOT_INSTANTIATED(10261, "Transaction manager could not be " + + "instantiated, check hive.txn.manager"), + TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction could be found, " + + "may have timed out"), + TXN_ABORTED(10263, "Transaction manager has aborted the transaction."), + + LOCK_NO_SUCH_LOCK(10270, "No record of lock could be found, " + + "may have timed out"), + LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " + + "support explicit lock requests. Transaction manager: "), + + METASTORE_COMMUNICATION_FAILED(10280, "Error communicating with the " + + "metastore"), + METASTORE_COULD_NOT_INITIATE(10281, "Unable to initiate connection to the " + + "metastore."), + INVALID_COMPACTION_TYPE(10282, "Invalid compaction type, supported values are 'major' and " + + "'minor'"), + NO_COMPACTION_PARTITION(10283, "You must specify a partition to compact for partitioned tables"), + TOO_MANY_COMPACTION_PARTITIONS(10284, "Compaction can only be requested on one partition at a " + + "time."), + //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 03ba50b..f435ff9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -47,11 +47,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -59,24 +56,7 @@ import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; -import org.apache.hadoop.hive.metastore.api.HiveObjectRef; -import org.apache.hadoop.hive.metastore.api.HiveObjectType; -import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.PrivilegeBag; -import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; -import org.apache.hadoop.hive.metastore.api.Role; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -88,29 +68,16 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; -import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.metadata.CheckResult; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreChecker; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter; import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.AlterIndexDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes; import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition; import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc; @@ -1054,7 +1021,7 @@ private int createIndex(Hive db, CreateIndexDesc crtIndex) throws HiveException MetaStoreUtils.getIndexTableName(SessionState.get().getCurrentDatabase(), crtIndex.getTableName(), crtIndex.getIndexName()); Table indexTable = db.getTable(indexTableName); - work.getOutputs().add(new WriteEntity(indexTable)); + work.getOutputs().add(new WriteEntity(indexTable, WriteEntity.WriteType.DDL)); } return 0; } @@ -1147,7 +1114,7 @@ private int alterIndex(Hive db, AlterIndexDesc alterIndex) throws HiveException private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException { List parts = db.createPartitions(addPartitionDesc); for (Partition part : parts) { - work.getOutputs().add(new WriteEntity(part)); + work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.INSERT)); } return 0; } @@ -1173,7 +1140,7 @@ private int renamePartition(Hive db, RenamePartitionDesc renamePartitionDesc) th Partition newPart = db .getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false); work.getInputs().add(new ReadEntity(oldPart)); - work.getOutputs().add(new WriteEntity(newPart)); + work.getOutputs().add(new WriteEntity(newPart, WriteEntity.WriteType.DDL)); return 0; } @@ -1215,7 +1182,7 @@ private int alterTableAlterPart(Hive db, AlterTableAlterPartDesc alterPartitionD } work.getInputs().add(new ReadEntity(tbl)); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); return 0; } @@ -1244,7 +1211,7 @@ private int touch(Hive db, AlterTableSimpleDesc touchDesc) throw new HiveException("Uable to update table"); } work.getInputs().add(new ReadEntity(tbl)); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_METADATA_ONLY)); } else { Partition part = db.getPartition(tbl, touchDesc.getPartSpec(), false); if (part == null) { @@ -1256,7 +1223,7 @@ private int touch(Hive db, AlterTableSimpleDesc touchDesc) throw new HiveException(e); } work.getInputs().add(new ReadEntity(part)); - work.getOutputs().add(new WriteEntity(part)); + work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL_METADATA_ONLY)); } return 0; } @@ -2496,7 +2463,11 @@ private int showFunctions(ShowFunctionsDesc showFuncs) throws HiveException { */ private int showLocks(ShowLocksDesc showLocks) throws HiveException { Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveTxnManager txnManager = ctx.getHiveTxnManager(); + HiveLockManager lockMgr = txnManager.getLockManager(); + + if (txnManager.useNewShowLocksFormat()) return showLocksNewFormat(showLocks, lockMgr); + boolean isExt = showLocks.isExt(); if (lockMgr == null) { throw new HiveException("show Locks LockManager not specified"); @@ -2511,9 +2482,12 @@ private int showLocks(ShowLocksDesc showLocks) throws HiveException { List locks = null; if (showLocks.getTableName() == null) { + // TODO should be doing security check here. Users should not be + // able to see each other's locks. locks = lockMgr.getLocks(false, isExt); } else { + // TODO make this work locks = lockMgr.getLocks(getHiveObject(showLocks.getTableName(), showLocks.getPartSpec()), true, isExt); @@ -2577,7 +2551,95 @@ public int compare(HiveLock o1, HiveLock o2) { return 0; } - /** + private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) + throws HiveException { + + DbLockManager lockMgr; + if (!(lm instanceof DbLockManager)) { + throw new RuntimeException("New lock format only supported with db lock manager."); + } + lockMgr = (DbLockManager)lm; + + ShowLocksResponse rsp = lockMgr.getLocks(); + + // write the results in the file + DataOutputStream os = null; + try { + Path resFile = new Path(showLocks.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + os = fs.create(resFile); + + // Write a header + os.writeBytes("Lock ID"); + os.write(separator); + os.writeBytes("Database"); + os.write(separator); + os.writeBytes("Table"); + os.write(separator); + os.writeBytes("Partition"); + os.write(separator); + os.writeBytes("State"); + os.write(separator); + os.writeBytes("Type"); + os.write(separator); + os.writeBytes("Transaction ID"); + os.write(separator); + os.writeBytes("Last Hearbeat"); + os.write(separator); + os.writeBytes("Acquired At"); + os.write(separator); + os.writeBytes("User"); + os.write(separator); + os.writeBytes("Hostname"); + os.write(terminator); + + List locks = rsp.getLocks(); + if (locks != null) { + for (ShowLocksResponseElement lock : locks) { + os.writeBytes(Long.toString(lock.getLockid())); + os.write(separator); + os.writeBytes(lock.getDbname()); + os.write(separator); + os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); + os.write(separator); + os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); + os.write(separator); + os.writeBytes(lock.getState().toString()); + os.write(separator); + os.writeBytes(lock.getType().toString()); + os.write(separator); + os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); + os.write(separator); + os.writeBytes(Long.toString(lock.getLastheartbeat())); + os.write(separator); + os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); + os.write(separator); + os.writeBytes(lock.getUser()); + os.write(separator); + os.writeBytes(lock.getHostname()); + os.write(separator); + os.write(terminator); + } + + } + + os.close(); + os = null; + } catch (FileNotFoundException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (IOException e) { + LOG.warn("show function: " + stringifyException(e)); + return 1; + } catch (Exception e) { + throw new HiveException(e.toString()); + } finally { + IOUtils.closeStream((FSDataOutputStream) os); + } + return 0; + } + + /** * Lock the table/partition specified * * @param lockTbl @@ -2588,7 +2650,12 @@ public int compare(HiveLock o1, HiveLock o2) { */ private int lockTable(LockTableDesc lockTbl) throws HiveException { Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveTxnManager txnManager = ctx.getHiveTxnManager(); + if (!txnManager.supportsExplicitLock()) { + throw new HiveException(ErrorMsg.LOCK_REQUEST_UNSUPPORTED, + conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER)); + } + HiveLockManager lockMgr = txnManager.getLockManager(); if (lockMgr == null) { throw new HiveException("lock Table LockManager not specified"); } @@ -2637,7 +2704,12 @@ private int lockTable(LockTableDesc lockTbl) throws HiveException { */ private int lockDatabase(LockDatabaseDesc lockDb) throws HiveException { Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveTxnManager txnManager = ctx.getHiveTxnManager(); + if (!txnManager.supportsExplicitLock()) { + throw new HiveException(ErrorMsg.LOCK_REQUEST_UNSUPPORTED, + conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER)); + } + HiveLockManager lockMgr = txnManager.getLockManager(); if (lockMgr == null) { throw new HiveException("lock Database LockManager not specified"); } @@ -2673,7 +2745,12 @@ private int lockDatabase(LockDatabaseDesc lockDb) throws HiveException { */ private int unlockDatabase(UnlockDatabaseDesc unlockDb) throws HiveException { Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveTxnManager txnManager = ctx.getHiveTxnManager(); + if (!txnManager.supportsExplicitLock()) { + throw new HiveException(ErrorMsg.LOCK_REQUEST_UNSUPPORTED, + conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER)); + } + HiveLockManager lockMgr = txnManager.getLockManager(); if (lockMgr == null) { throw new HiveException("unlock Database LockManager not specified"); } @@ -2731,7 +2808,12 @@ private HiveLockObject getHiveObject(String tabName, */ private int unlockTable(UnlockTableDesc unlockTbl) throws HiveException { Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveTxnManager txnManager = ctx.getHiveTxnManager(); + if (!txnManager.supportsExplicitLock()) { + throw new HiveException(ErrorMsg.LOCK_REQUEST_UNSUPPORTED, + conf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER)); + } + HiveLockManager lockMgr = txnManager.getLockManager(); if (lockMgr == null) { throw new HiveException("unlock Table LockManager not specified"); } @@ -3475,7 +3557,7 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { } tbl.setNumBuckets(alterTbl.getNumberBuckets()); } - } else { + } else { throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString()); } @@ -3512,17 +3594,17 @@ private int alterTable(Hive db, AlterTableDesc alterTbl) throws HiveException { // passed if(part != null) { work.getInputs().add(new ReadEntity(part)); - work.getOutputs().add(new WriteEntity(part)); + work.getOutputs().add(new WriteEntity(part, WriteEntity.WriteType.DDL)); } else if (allPartitions != null ){ for (Partition tmpPart: allPartitions) { work.getInputs().add(new ReadEntity(tmpPart)); - work.getOutputs().add(new WriteEntity(tmpPart)); + work.getOutputs().add(new WriteEntity(tmpPart, WriteEntity.WriteType.DDL)); } } else { work.getInputs().add(new ReadEntity(oldTbl)); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); } return 0; } @@ -3560,7 +3642,7 @@ private void dropPartitions(Hive db, Table tbl, DropTableDesc dropTbl) throws Hi dropTbl.getPartSpecs(), true, dropTbl.getIgnoreProtection(), true); for (Partition partition : droppedParts) { console.printInfo("Dropped the partition " + partition.getName()); - work.getOutputs().add(new WriteEntity(partition)); + work.getOutputs().add(new WriteEntity(partition, WriteEntity.WriteType.DDL)); }; } @@ -3614,7 +3696,7 @@ private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws HiveExc // drop the table db.dropTable(dropTbl.getTableName()); if (tbl != null) { - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); } } @@ -3889,7 +3971,7 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { // create the table db.createTable(tbl, crtTbl.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); return 0; } @@ -3997,7 +4079,7 @@ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws HiveExce // create the table db.createTable(tbl, crtTbl.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); return 0; } @@ -4033,7 +4115,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { } catch (InvalidOperationException e) { throw new HiveException(e); } - work.getOutputs().add(new WriteEntity(oldview)); + work.getOutputs().add(new WriteEntity(oldview, WriteEntity.WriteType.DDL)); } else { // create new view Table tbl = db.newTable(crtView.getViewName()); @@ -4060,7 +4142,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { } db.createTable(tbl, crtView.getIfNotExists()); - work.getOutputs().add(new WriteEntity(tbl)); + work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL)); } return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ed7787d..a190155 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -18,16 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.security.AccessControlException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -56,17 +46,16 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol; import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.LoadFileDesc; -import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; +import java.io.Serializable; +import java.security.AccessControlException; +import java.util.*; + /** * MoveTask implementation. **/ @@ -175,7 +164,7 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { } Context ctx = driverContext.getCtx(); - HiveLockManager lockMgr = ctx.getHiveLockMgr(); + HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager(); WriteEntity output = ctx.getLoadTableOutputMap().get(ltd); List lockObjects = ctx.getOutputLockObjects().get(output); if (lockObjects == null) { @@ -284,7 +273,9 @@ public int execute(DriverContext driverContext) { db.loadTable(tbd.getSourcePath(), tbd.getTable() .getTableName(), tbd.getReplace(), tbd.getHoldDDLTime()); if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(table)); + work.getOutputs().add(new WriteEntity(table, + (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT))); } } else { LOG.info("Partition is: " + tbd.getPartitionSpec().toString()); @@ -376,7 +367,9 @@ public int execute(DriverContext driverContext) { updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols); } - WriteEntity enty = new WriteEntity(partn); + WriteEntity enty = new WriteEntity(partn, + (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT)); if (work.getOutputs() != null) { work.getOutputs().add(enty); } @@ -417,7 +410,9 @@ public int execute(DriverContext driverContext) { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // add this partition to post-execution hook if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(partn)); + work.getOutputs().add(new WriteEntity(partn, + (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE + : WriteEntity.WriteType.INSERT))); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index cc840be..0c737b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2090,6 +2090,9 @@ public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) { for (Map.Entry entry : jobProperties.entrySet()) { job.set(entry.getKey(), entry.getValue()); } + // copy the bucket count + job.set(hive_metastoreConstants.BUCKET_COUNT, + tbl.getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT)); } private static final Object INPUT_SUMMARY_LOCK = new Object(); diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index ae8b4bb..44a3924 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.hooks; -import java.io.Serializable; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import java.io.Serializable; + /** * This class encapsulates an object that is being written to by the query. This * object may be a table, partition, dfs directory or a local directory. @@ -34,6 +34,18 @@ private boolean isTempURI = false; + public static enum WriteType { + DDL, // for use in DDL statements that will touch data, + // will result in an exclusive lock, + DDL_METADATA_ONLY, // for use in DDL statements that touch only + // metadata and don't need a lock + INSERT, + INSERT_OVERWRITE, + UPDATE, + DELETE}; + + private WriteType writeType; + /** * Only used by serialization. */ @@ -41,8 +53,9 @@ public WriteEntity() { super(); } - public WriteEntity(Database database) { + public WriteEntity(Database database, WriteType type) { super(database, true); + writeType = type; } /** @@ -51,12 +64,14 @@ public WriteEntity(Database database) { * @param t * Table that is written to. */ - public WriteEntity(Table t) { - this(t, true); + public WriteEntity(Table t, WriteType type) { + super(t, true); + writeType = type; } - public WriteEntity(Table t, boolean complete) { + public WriteEntity(Table t, WriteType type, boolean complete) { super(t, complete); + writeType = type; } /** @@ -65,12 +80,14 @@ public WriteEntity(Table t, boolean complete) { * @param p * Partition that is written to. */ - public WriteEntity(Partition p) { + public WriteEntity(Partition p, WriteType type) { super(p, true); + writeType = type; } - public WriteEntity(DummyPartition p, boolean complete) { + public WriteEntity(DummyPartition p, WriteType type, boolean complete) { super(p, complete); + writeType = type; } /** @@ -101,6 +118,15 @@ public WriteEntity(Path d, boolean islocal, boolean isTemp) { } /** + * Determine which type of write this is. This is needed by the lock + * manager so it can understand what kind of lock to acquire. + * @return write type + */ + public WriteType getWriteType() { + return writeType; + } + + /** * Equals function. */ @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java index 9550001..11434a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java @@ -18,24 +18,14 @@ package org.apache.hadoop.hive.ql.lockmgr; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.*; + +import java.util.*; +import java.util.concurrent.locks.ReentrantLock; /** * shared lock manager for dedicated hive server. all locks are managed in memory diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java index eb15993..b2eb997 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java @@ -20,6 +20,11 @@ import java.util.List; +/** + * Manager for locks in Hive. Users should not instantiate a lock manager + * directly. Instead they should get an instance from their instance of + * {@link HiveTxnManager}. + */ public interface HiveLockManager { public void setContext(HiveLockManagerCtx ctx) throws LockException; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java index a8fbf26..37af243 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockMode.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.lockmgr; public enum HiveLockMode { - SHARED, EXCLUSIVE; + SHARED, + EXCLUSIVE, + SEMI_SHARED; // SEMI_SHARED can share with SHARED but not with itself } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 093be45..65b9136 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -18,36 +18,14 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.lockmgr.LockException; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -55,7 +33,11 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class ZooKeeperHiveLockManager implements HiveLockManager { HiveLockManagerCtx ctx; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 2484576..5450228 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -70,9 +70,10 @@ import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; +import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory; @@ -703,7 +704,7 @@ private void analyzeDropDatabase(ASTNode ast) throws SemanticException { } inputs.add(new ReadEntity(database)); - outputs.add(new WriteEntity(database)); + outputs.add(new WriteEntity(database, WriteEntity.WriteType.DDL)); DropDatabaseDesc dropDatabaseDesc = new DropDatabaseDesc(dbName, ifExists, ifCascade); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropDatabaseDesc), conf)); @@ -729,7 +730,7 @@ private void analyzeDropTable(ASTNode ast, boolean expectView) Table tab = getTable(tableName, throwException); if (tab != null) { inputs.add(new ReadEntity(tab)); - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL)); } DropTableDesc dropTblDesc = new DropTableDesc(tableName, expectView, ifExists); @@ -754,19 +755,19 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { Map partSpec = getPartSpec((ASTNode) root.getChild(1)); if (partSpec == null) { if (!table.isPartitioned()) { - outputs.add(new WriteEntity(table)); + outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL)); } else { for (Partition partition : getPartitions(table, null, false)) { - outputs.add(new WriteEntity(partition)); + outputs.add(new WriteEntity(partition, WriteEntity.WriteType.DDL)); } } } else { if (isFullSpec(table, partSpec)) { Partition partition = getPartition(table, partSpec, true); - outputs.add(new WriteEntity(partition)); + outputs.add(new WriteEntity(partition, WriteEntity.WriteType.DDL)); } else { for (Partition partition : getPartitions(table, partSpec, false)) { - outputs.add(new WriteEntity(partition)); + outputs.add(new WriteEntity(partition, WriteEntity.WriteType.DDL)); } } } @@ -1305,17 +1306,17 @@ private void addInputsOutputsAlterTable(String tableName, Map pa Table tab = getTable(tableName, true); if (partSpec == null || partSpec.isEmpty()) { inputs.add(new ReadEntity(tab)); - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL)); } else { inputs.add(new ReadEntity(tab)); if (desc == null || desc.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) { Partition part = getPartition(tab, partSpec, true); - outputs.add(new WriteEntity(part)); + outputs.add(new WriteEntity(part, WriteEntity.WriteType.DDL)); } else { for (Partition part : getPartitions(tab, partSpec, true)) { - outputs.add(new WriteEntity(part)); + outputs.add(new WriteEntity(part, WriteEntity.WriteType.DDL)); } } } @@ -2194,7 +2195,7 @@ private void analyzeShowDbLocks(ASTNode ast) throws SemanticException { ctx.setNeedLockMgr(true); } - /** + /** * Add the task according to the parsed command tree. This is used for the CLI * command "LOCK TABLE ..;". * @@ -2524,7 +2525,7 @@ private void analyzeAlterTableAddParts(CommonTree ast, boolean expectView) Table tab = getTable(tblName, true); boolean isView = tab.isView(); validateAlterTableType(tab, AlterTableTypes.ADDPARTITION, expectView); - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL)); int numCh = ast.getChildCount(); int start = ifNotExists ? 2 : 1; @@ -2624,7 +2625,7 @@ private Partition getPartitionForOutput(Table tab, Map currentPa try { Partition partition = db.getPartition(tab, currentPart, false); if (partition != null) { - outputs.add(new WriteEntity(partition)); + outputs.add(new WriteEntity(partition, WriteEntity.WriteType.INSERT)); } return partition; } catch (HiveException e) { @@ -2658,7 +2659,7 @@ private void analyzeAlterTableTouch(CommonTree ast) AlterTableSimpleDesc touchDesc = new AlterTableSimpleDesc( SessionState.get().getCurrentDatabase(), tblName, null, AlterTableDesc.AlterTableTypes.TOUCH); - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_METADATA_ONLY)); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), touchDesc), conf)); } else { @@ -2923,7 +2924,7 @@ private void addTablePartsOutputs(String tblName, List> part } } for (Partition p : parts) { - outputs.add(new WriteEntity(p)); + outputs.add(new WriteEntity(p, WriteEntity.WriteType.DDL)); } } } @@ -2967,7 +2968,7 @@ private void addTableDropPartsOutputs(Table tab, throw new SemanticException( ErrorMsg.DROP_COMMAND_NOT_ALLOWED_FOR_PARTITION.getMsg(p.getCompleteName())); } - outputs.add(new WriteEntity(p)); + outputs.add(new WriteEntity(p, WriteEntity.WriteType.DELETE)); } } } @@ -2991,7 +2992,7 @@ private void analyzeAltertableSkewedby(ASTNode ast) throws SemanticException { Table tab = getTable(tableName, true); inputs.add(new ReadEntity(tab)); - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL)); validateAlterTableType(tab, AlterTableTypes.ADDSKEWEDBY); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index ceb4c8a..92ec334 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -18,18 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.antlr.runtime.tree.Tree; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.fs.FileStatus; @@ -50,15 +38,15 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.CopyWork; -import org.apache.hadoop.hive.ql.plan.CreateTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; -import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; + /** * ImportSemanticAnalyzer. * @@ -237,7 +225,8 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { .toString())); loadTable(fromURI, table); } - outputs.add(new WriteEntity(table)); + // Set this to read because we can't overwrite any existing partitions + outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_METADATA_ONLY)); } catch (InvalidTableException e) { LOG.debug("table " + tblDesc.getTableName() + " does not exist"); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 59aeb96..9ec22ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -18,14 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - import org.antlr.runtime.tree.Tree; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -47,6 +39,14 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + /** * LoadSemanticAnalyzer. * @@ -238,7 +238,9 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Map partSpec = ts.getPartSpec(); if (partSpec == null) { partSpec = new LinkedHashMap(); - outputs.add(new WriteEntity(ts.tableHandle)); + outputs.add(new WriteEntity(ts.tableHandle, + (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT))); } else { try{ Partition part = Hive.get().getPartition(ts.tableHandle, partSpec, false); @@ -247,9 +249,13 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION. getMsg(ts.tableName + ":" + part.getName())); } - outputs.add(new WriteEntity(part)); + outputs.add(new WriteEntity(part, + (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT))); } else { - outputs.add(new WriteEntity(ts.tableHandle)); + outputs.add(new WriteEntity(ts.tableHandle, + (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT))); } } catch(HiveException e) { throw new SemanticException(e); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a01aa0e..6420f91 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -5544,7 +5544,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) // in the case of DP, we will register WriteEntity in MoveTask when the // list of dynamically created partitions are known. if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { - output = new WriteEntity(dest_tab); + output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable)); if (!outputs.add(output)) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName())); @@ -5553,7 +5553,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) { // No static partition specified if (dpCtx.getNumSPCols() == 0) { - output = new WriteEntity(dest_tab, false); + output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false); outputs.add(output); } // part of the partition specified @@ -5567,7 +5567,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) new DummyPartition(dest_tab, dest_tab.getDbName() + "@" + dest_tab.getTableName() + "@" + ppath, partSpec); - output = new WriteEntity(p, false); + output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false); outputs.add(output); } catch (HiveException e) { throw new SemanticException(e.getMessage(), e); @@ -5654,7 +5654,9 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) ltd.setHoldDDLTime(true); } loadTableWork.add(ltd); - if (!outputs.add(new WriteEntity(dest_part))) { + if (!outputs.add(new WriteEntity(dest_part, (ltd.getReplace() ? + WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT)))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); } @@ -8751,7 +8753,7 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String tsDesc.setStatsAggPrefix(tab.getDbName()+"."+k); // set up WritenEntity for replication - outputs.add(new WriteEntity(tab)); + outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_METADATA_ONLY)); // add WriteEntity for each matching partition if (tab.isPartitioned()) { @@ -8762,7 +8764,7 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String if (partitions != null) { for (Partition partn : partitions) { // inputs.add(new ReadEntity(partn)); // is this needed at all? - outputs.add(new WriteEntity(partn)); + outputs.add(new WriteEntity(partn, WriteEntity.WriteType.DDL_METADATA_ONLY)); } } } @@ -9809,7 +9811,7 @@ private ASTNode analyzeCreateTable(ASTNode ast, QB qb) String[] qualified = Hive.getQualifiedNames(tableName); String dbName = qualified.length == 1 ? SessionState.get().getCurrentDatabase() : qualified[0]; Database database = getDatabase(dbName); - outputs.add(new WriteEntity(database)); + outputs.add(new WriteEntity(database, WriteEntity.WriteType.DDL_METADATA_ONLY)); // Handle different types of CREATE TABLE command CreateTableDesc crtTblDesc = null; switch (command_type) { @@ -11187,4 +11189,13 @@ private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo, gByRR.put(tab_alias, col_alias, colInfo); } } + + private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) { + // Don't know the characteristics of non-native tables, + // and don't have a rational way to guess, so assume the most + // conservative case. + if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE; + else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/authorization/HiveAuthorizationTaskFactoryImpl.java ql/src/java/org/apache/hadoop/hive/ql/parse/authorization/HiveAuthorizationTaskFactoryImpl.java index 4dbe78c..ec31926 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/authorization/HiveAuthorizationTaskFactoryImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/authorization/HiveAuthorizationTaskFactoryImpl.java @@ -273,9 +273,9 @@ private PrivilegeObjectDesc analyzePrivilegeObject(ASTNode ast, Table tbl = getTable(SessionState.get().getCurrentDatabase(), subject.getObject()); if (subject.getPartSpec() != null) { Partition part = getPartition(tbl, subject.getPartSpec()); - outputs.add(new WriteEntity(part)); + outputs.add(new WriteEntity(part, WriteEntity.WriteType.DDL_METADATA_ONLY)); } else { - outputs.add(new WriteEntity(tbl)); + outputs.add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_METADATA_ONLY)); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java deleted file mode 100644 index cadb293..0000000 --- ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql; - -import java.util.*; -import junit.framework.TestCase; - -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; - -public class TestDriver extends TestCase { - public void testDedupLockObjects() { - List lockObjs = new ArrayList(); - String path1 = "path1"; - String path2 = "path2"; - HiveLockObjectData lockData1 = new HiveLockObjectData( - "query1", "1", "IMPLICIT", "drop table table1"); - HiveLockObjectData lockData2 = new HiveLockObjectData( - "query1", "1", "IMPLICIT", "drop table table1"); - - // Start with the following locks: - // [path1, shared] - // [path1, exclusive] - // [path2, shared] - // [path2, shared] - // [path2, shared] - lockObjs.add(new HiveLockObj(new HiveLockObject(path1, lockData1), HiveLockMode.SHARED)); - String name1 = lockObjs.get(lockObjs.size() - 1).getName(); - lockObjs.add(new HiveLockObj(new HiveLockObject(path1, lockData1), HiveLockMode.EXCLUSIVE)); - lockObjs.add(new HiveLockObj(new HiveLockObject(path2, lockData2), HiveLockMode.SHARED)); - String name2 = lockObjs.get(lockObjs.size() - 1).getName(); - lockObjs.add(new HiveLockObj(new HiveLockObject(path2, lockData2), HiveLockMode.SHARED)); - lockObjs.add(new HiveLockObj(new HiveLockObject(path2, lockData2), HiveLockMode.SHARED)); - - Driver.dedupLockObjects(lockObjs); - - // After dedup we should be left with 2 locks: - // [path1, exclusive] - // [path2, shared] - assertEquals("Locks should be deduped", 2, lockObjs.size()); - - Comparator cmp = new Comparator() { - public int compare(HiveLockObj lock1, HiveLockObj lock2) { - return lock1.getName().compareTo(lock2.getName()); - } - }; - Collections.sort(lockObjs, cmp); - - HiveLockObj lockObj = lockObjs.get(0); - assertEquals(name1, lockObj.getName()); - assertEquals(HiveLockMode.EXCLUSIVE, lockObj.getMode()); - - lockObj = lockObjs.get(1); - assertEquals(name2, lockObj.getName()); - assertEquals(HiveLockMode.SHARED, lockObj.getMode()); - } -}