diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5360ed4..7ff8ea8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1763,6 +1763,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), + WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", + new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, diff --git metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql new file mode 100644 index 0000000..df33b95 --- /dev/null +++ metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql @@ -0,0 +1,11 @@ +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); +ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1); + + diff --git metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql index 1d00499..dc27afc 100644 --- metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql +++ metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql @@ -338,7 +338,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED -- ---------------------------- -- Transaction and Lock Tables -- ---------------------------- -RUN 'hive-txn-schema-2.0.0.derby.sql'; +RUN 'hive-txn-schema-2.1.0.derby.sql'; -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql index 13f3340..480c19e 100644 --- metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql +++ metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql @@ -32,7 +32,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, TC_TABLE varchar(128), - TC_PARTITION varchar(767) + TC_PARTITION varchar(767), + TC_OPERATION_TYPE char(1) NOT NULL ); CREATE TABLE COMPLETED_TXN_COMPONENTS ( @@ -117,3 +118,11 @@ CREATE TABLE AUX_TABLE ( PRIMARY KEY(MT_KEY1, MT_KEY2) ); +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); diff --git metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql new file mode 100644 index 0000000..11d86ca --- /dev/null +++ metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql @@ -0,0 +1,130 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the License); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an AS IS BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- +-- Tables for transaction management +-- +CREATE TABLE TXNS ( + TXN_ID bigint PRIMARY KEY, + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER varchar(128) NOT NULL, + TXN_HOST varchar(128) NOT NULL, + TXN_AGENT_INFO varchar(128), + TXN_META_INFO varchar(128), + TXN_HEARTBEAT_COUNT integer +); + +CREATE TABLE TXN_COMPONENTS ( + TC_TXNID bigint REFERENCES TXNS (TXN_ID), + TC_DATABASE varchar(128) NOT NULL, + TC_TABLE varchar(128), + TC_PARTITION varchar(767), + TC_OPERATION_TYPE char(1) NOT NULL +); + +CREATE TABLE COMPLETED_TXN_COMPONENTS ( + CTC_TXNID bigint, + CTC_DATABASE varchar(128) NOT NULL, + CTC_TABLE varchar(128), + CTC_PARTITION varchar(767) +); + +CREATE TABLE NEXT_TXN_ID ( + NTXN_NEXT bigint NOT NULL +); +INSERT INTO NEXT_TXN_ID VALUES(1); + +CREATE TABLE HIVE_LOCKS ( + HL_LOCK_EXT_ID bigint NOT NULL, + HL_LOCK_INT_ID bigint NOT NULL, + HL_TXNID bigint, + HL_DB varchar(128) NOT NULL, + HL_TABLE varchar(128), + HL_PARTITION varchar(767), + HL_LOCK_STATE char(1) NOT NULL, + HL_LOCK_TYPE char(1) NOT NULL, + HL_LAST_HEARTBEAT bigint NOT NULL, + HL_ACQUIRED_AT bigint, + HL_USER varchar(128) NOT NULL, + HL_HOST varchar(128) NOT NULL, + HL_HEARTBEAT_COUNT integer, + HL_AGENT_INFO varchar(128), + HL_BLOCKEDBY_EXT_ID bigint, + HL_BLOCKEDBY_INT_ID bigint, + PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +); + +CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID); + +CREATE TABLE NEXT_LOCK_ID ( + NL_NEXT bigint NOT NULL +); +INSERT INTO NEXT_LOCK_ID VALUES(1); + +CREATE TABLE COMPACTION_QUEUE ( + CQ_ID bigint PRIMARY KEY, + CQ_DATABASE varchar(128) NOT NULL, + CQ_TABLE varchar(128) NOT NULL, + CQ_PARTITION varchar(767), + CQ_STATE char(1) NOT NULL, + CQ_TYPE char(1) NOT NULL, + CQ_WORKER_ID varchar(128), + CQ_START bigint, + CQ_RUN_AS varchar(128), + CQ_HIGHEST_TXN_ID bigint, + CQ_META_INFO varchar(2048) for bit data, + CQ_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( + NCQ_NEXT bigint NOT NULL +); +INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); + +CREATE TABLE COMPLETED_COMPACTIONS ( + CC_ID bigint PRIMARY KEY, + CC_DATABASE varchar(128) NOT NULL, + CC_TABLE varchar(128) NOT NULL, + CC_PARTITION varchar(767), + CC_STATE char(1) NOT NULL, + CC_TYPE char(1) NOT NULL, + CC_WORKER_ID varchar(128), + CC_START bigint, + CC_END bigint, + CC_RUN_AS varchar(128), + CC_HIGHEST_TXN_ID bigint, + CC_META_INFO varchar(2048) for bit data, + CC_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE AUX_TABLE ( + MT_KEY1 varchar(128) NOT NULL, + MT_KEY2 bigint NOT NULL, + MT_COMMENT varchar(255), + PRIMARY KEY(MT_KEY1, MT_KEY2) +); + +--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK +--This is a good candidate for Index orgainzed table +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); diff --git metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql index 74ecac2..6b90b73 100644 --- metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql +++ metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql @@ -10,5 +10,6 @@ RUN '029-HIVE-12822.derby.sql'; RUN '030-HIVE-12823.derby.sql'; RUN '031-HIVE-12831.derby.sql'; RUN '032-HIVE-12832.derby.sql'; +RUN '035-HIVE-13395.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1; diff --git metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql index dde8c45..94c686b 100644 --- metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql +++ metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql @@ -1,5 +1,6 @@ -- Upgrade MetaStore schema from 2.0.0 to 2.1.0 RUN '033-HIVE-12892.derby.sql'; RUN '034-HIVE-13076.derby.sql'; +RUN '035-HIVE-13395.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1; diff --git metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql new file mode 100644 index 0000000..281014c --- /dev/null +++ metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql @@ -0,0 +1,9 @@ +CREATE TABLE WRITE_SET ( + WS_DATABASE nvarchar(128) NOT NULL, + WS_TABLE nvarchar(128) NOT NULL, + WS_PARTITION nvarchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); +ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1) NULL; \ No newline at end of file diff --git metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql index 57d2343..a184f24 100644 --- metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql +++ metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql @@ -964,7 +964,8 @@ CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NULL, TC_DATABASE nvarchar(128) NOT NULL, TC_TABLE nvarchar(128) NULL, - TC_PARTITION nvarchar(767) NULL + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL ); ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); @@ -980,6 +981,15 @@ CREATE TABLE AUX_TABLE ( ) ); +CREATE TABLE WRITE_SET ( + WS_DATABASE nvarchar(128) NOT NULL, + WS_TABLE nvarchar(128) NOT NULL, + WS_PARTITION nvarchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql index 2d9cf76..d9194ff 100644 --- metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql +++ metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql @@ -977,7 +977,8 @@ CREATE TABLE TXN_COMPONENTS( TC_TXNID bigint NULL, TC_DATABASE nvarchar(128) NOT NULL, TC_TABLE nvarchar(128) NULL, - TC_PARTITION nvarchar(767) NULL + TC_PARTITION nvarchar(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL ); ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID); @@ -1011,6 +1012,15 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (CONSTRAIN CREATE INDEX CONSTRAINTS_PARENT_TBL_ID__INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID); +CREATE TABLE WRITE_SET ( + WS_DATABASE nvarchar(128) NOT NULL, + WS_TABLE nvarchar(128) NOT NULL, + WS_PARTITION nvarchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql index b0f28bb..251e621 100644 --- metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql +++ metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql @@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE; :r 015-HIVE-12823.mssql.sql; :r 016-HIVE-12831.mssql.sql; :r 017-HIVE-12832.mssql.sql; +:r 020-HIVE-13395.mssql.sql; UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE; diff --git metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql index 3e5cb30..c796126 100644 --- metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql +++ metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql @@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE; :r 018-HIVE-12892.mssql.sql; :r 019-HIVE-13076.mssql.sql; +:r 020-HIVE-13395.mssql.sql; UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE; diff --git metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql new file mode 100644 index 0000000..586caef --- /dev/null +++ metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql @@ -0,0 +1,10 @@ +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1); diff --git metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql index 466e950..a6b783c 100644 --- metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql +++ metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql @@ -839,7 +839,7 @@ CREATE INDEX `CONSTRAINTS_PARENT_TABLE_ID_INDEX` ON KEY_CONSTRAINTS (`PARENT_TBL -- ---------------------------- -- Transaction and Lock Tables -- ---------------------------- -SOURCE hive-txn-schema-2.0.0.mysql.sql; +SOURCE hive-txn-schema-2.1.0.mysql.sql; -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql new file mode 100644 index 0000000..369d6bb --- /dev/null +++ metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql @@ -0,0 +1,131 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- +-- Tables for transaction management +-- + +CREATE TABLE TXNS ( + TXN_ID bigint PRIMARY KEY, + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER varchar(128) NOT NULL, + TXN_HOST varchar(128) NOT NULL, + TXN_AGENT_INFO varchar(128), + TXN_META_INFO varchar(128), + TXN_HEARTBEAT_COUNT int +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE TXN_COMPONENTS ( + TC_TXNID bigint NOT NULL, + TC_DATABASE varchar(128) NOT NULL, + TC_TABLE varchar(128) NOT NULL, + TC_PARTITION varchar(767), + TC_OPERATION_TYPE char(1) NOT NULL, + FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE COMPLETED_TXN_COMPONENTS ( + CTC_TXNID bigint NOT NULL, + CTC_DATABASE varchar(128) NOT NULL, + CTC_TABLE varchar(128), + CTC_PARTITION varchar(767) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE NEXT_TXN_ID ( + NTXN_NEXT bigint NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO NEXT_TXN_ID VALUES(1); + +CREATE TABLE HIVE_LOCKS ( + HL_LOCK_EXT_ID bigint NOT NULL, + HL_LOCK_INT_ID bigint NOT NULL, + HL_TXNID bigint, + HL_DB varchar(128) NOT NULL, + HL_TABLE varchar(128), + HL_PARTITION varchar(767), + HL_LOCK_STATE char(1) not null, + HL_LOCK_TYPE char(1) not null, + HL_LAST_HEARTBEAT bigint NOT NULL, + HL_ACQUIRED_AT bigint, + HL_USER varchar(128) NOT NULL, + HL_HOST varchar(128) NOT NULL, + HL_HEARTBEAT_COUNT int, + HL_AGENT_INFO varchar(128), + HL_BLOCKEDBY_EXT_ID bigint, + HL_BLOCKEDBY_INT_ID bigint, + PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID), + KEY HIVE_LOCK_TXNID_INDEX (HL_TXNID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID); + +CREATE TABLE NEXT_LOCK_ID ( + NL_NEXT bigint NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO NEXT_LOCK_ID VALUES(1); + +CREATE TABLE COMPACTION_QUEUE ( + CQ_ID bigint PRIMARY KEY, + CQ_DATABASE varchar(128) NOT NULL, + CQ_TABLE varchar(128) NOT NULL, + CQ_PARTITION varchar(767), + CQ_STATE char(1) NOT NULL, + CQ_TYPE char(1) NOT NULL, + CQ_WORKER_ID varchar(128), + CQ_START bigint, + CQ_RUN_AS varchar(128), + CQ_HIGHEST_TXN_ID bigint, + CQ_META_INFO varbinary(2048), + CQ_HADOOP_JOB_ID varchar(32) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE COMPLETED_COMPACTIONS ( + CC_ID bigint PRIMARY KEY, + CC_DATABASE varchar(128) NOT NULL, + CC_TABLE varchar(128) NOT NULL, + CC_PARTITION varchar(767), + CC_STATE char(1) NOT NULL, + CC_TYPE char(1) NOT NULL, + CC_WORKER_ID varchar(128), + CC_START bigint, + CC_END bigint, + CC_RUN_AS varchar(128), + CC_HIGHEST_TXN_ID bigint, + CC_META_INFO varbinary(2048), + CC_HADOOP_JOB_ID varchar(32) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( + NCQ_NEXT bigint NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; +INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); + +CREATE TABLE AUX_TABLE ( + MT_KEY1 varchar(128) NOT NULL, + MT_KEY2 bigint NOT NULL, + MT_COMMENT varchar(255), + PRIMARY KEY(MT_KEY1, MT_KEY2) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql index 477c10b..b65aee5 100644 --- metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql +++ metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql @@ -11,6 +11,7 @@ SOURCE 029-HIVE-12822.mysql.sql; SOURCE 030-HIVE-12823.mysql.sql; SOURCE 031-HIVE-12831.mysql.sql; SOURCE 032-HIVE-12832.mysql.sql; +SOURCE 035-HIVE-13395.mysql.sql; UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS ' '; diff --git metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql index eb21f73..c3f83b3 100644 --- metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql +++ metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql @@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' '; SOURCE 033-HIVE-12892.mysql.sql; SOURCE 034-HIVE-13076.mysql.sql; +SOURCE 035-HIVE-12295.mysql.sql; UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' '; diff --git metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql new file mode 100644 index 0000000..ad1bbd9 --- /dev/null +++ metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql @@ -0,0 +1,10 @@ +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar2(128) NOT NULL, + WS_TABLE varchar2(128) NOT NULL, + WS_PARTITION varchar2(767), + WS_TXNID number(19) NOT NULL, + WS_COMMIT_ID number(19) NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + +ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1); diff --git metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql index f57e588..d003a16 100644 --- metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql +++ metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql @@ -808,7 +808,7 @@ CREATE INDEX CONSTRAINTS_PARENT_TBL_ID_INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID); ------------------------------ -- Transaction and lock tables ------------------------------ -@hive-txn-schema-2.0.0.oracle.sql; +@hive-txn-schema-2.1.0.oracle.sql; -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql index 788741a..199ff4c 100644 --- metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql +++ metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql @@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID), TC_DATABASE VARCHAR2(128) NOT NULL, TC_TABLE VARCHAR2(128), - TC_PARTITION VARCHAR2(767) NULL + TC_PARTITION VARCHAR2(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL ) ROWDEPENDENCIES; CREATE TABLE COMPLETED_TXN_COMPONENTS ( @@ -118,3 +119,12 @@ CREATE TABLE AUX_TABLE ( PRIMARY KEY(MT_KEY1, MT_KEY2) ); +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar2(128) NOT NULL, + WS_TABLE varchar2(128) NOT NULL, + WS_PARTITION varchar2(767), + WS_TXNID number(19) NOT NULL, + WS_COMMIT_ID number(19) NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + diff --git metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql new file mode 100644 index 0000000..d39baab --- /dev/null +++ metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql @@ -0,0 +1,129 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the License); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an AS IS BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- +-- Tables for transaction management +-- + +CREATE TABLE TXNS ( + TXN_ID NUMBER(19) PRIMARY KEY, + TXN_STATE char(1) NOT NULL, + TXN_STARTED NUMBER(19) NOT NULL, + TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL, + TXN_USER varchar(128) NOT NULL, + TXN_HOST varchar(128) NOT NULL, + TXN_AGENT_INFO varchar2(128), + TXN_META_INFO varchar2(128), + TXN_HEARTBEAT_COUNT number(10) +) ROWDEPENDENCIES; + +CREATE TABLE TXN_COMPONENTS ( + TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID), + TC_DATABASE VARCHAR2(128) NOT NULL, + TC_TABLE VARCHAR2(128), + TC_PARTITION VARCHAR2(767) NULL, + TC_OPERATION_TYPE char(1) NOT NULL +) ROWDEPENDENCIES; + +CREATE TABLE COMPLETED_TXN_COMPONENTS ( + CTC_TXNID NUMBER(19), + CTC_DATABASE varchar(128) NOT NULL, + CTC_TABLE varchar(128), + CTC_PARTITION varchar(767) +) ROWDEPENDENCIES; + +CREATE TABLE NEXT_TXN_ID ( + NTXN_NEXT NUMBER(19) NOT NULL +); +INSERT INTO NEXT_TXN_ID VALUES(1); + +CREATE TABLE HIVE_LOCKS ( + HL_LOCK_EXT_ID NUMBER(19) NOT NULL, + HL_LOCK_INT_ID NUMBER(19) NOT NULL, + HL_TXNID NUMBER(19), + HL_DB VARCHAR2(128) NOT NULL, + HL_TABLE VARCHAR2(128), + HL_PARTITION VARCHAR2(767), + HL_LOCK_STATE CHAR(1) NOT NULL, + HL_LOCK_TYPE CHAR(1) NOT NULL, + HL_LAST_HEARTBEAT NUMBER(19) NOT NULL, + HL_ACQUIRED_AT NUMBER(19), + HL_USER varchar(128) NOT NULL, + HL_HOST varchar(128) NOT NULL, + HL_HEARTBEAT_COUNT number(10), + HL_AGENT_INFO varchar2(128), + HL_BLOCKEDBY_EXT_ID number(19), + HL_BLOCKEDBY_INT_ID number(19), + PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +) ROWDEPENDENCIES; + +CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID); + +CREATE TABLE NEXT_LOCK_ID ( + NL_NEXT NUMBER(19) NOT NULL +); +INSERT INTO NEXT_LOCK_ID VALUES(1); + +CREATE TABLE COMPACTION_QUEUE ( + CQ_ID NUMBER(19) PRIMARY KEY, + CQ_DATABASE varchar(128) NOT NULL, + CQ_TABLE varchar(128) NOT NULL, + CQ_PARTITION varchar(767), + CQ_STATE char(1) NOT NULL, + CQ_TYPE char(1) NOT NULL, + CQ_WORKER_ID varchar(128), + CQ_START NUMBER(19), + CQ_RUN_AS varchar(128), + CQ_HIGHEST_TXN_ID NUMBER(19), + CQ_META_INFO BLOB, + CQ_HADOOP_JOB_ID varchar2(32) +) ROWDEPENDENCIES; + +CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( + NCQ_NEXT NUMBER(19) NOT NULL +); +INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); + +CREATE TABLE COMPLETED_COMPACTIONS ( + CC_ID NUMBER(19) PRIMARY KEY, + CC_DATABASE varchar(128) NOT NULL, + CC_TABLE varchar(128) NOT NULL, + CC_PARTITION varchar(767), + CC_STATE char(1) NOT NULL, + CC_TYPE char(1) NOT NULL, + CC_WORKER_ID varchar(128), + CC_START NUMBER(19), + CC_END NUMBER(19), + CC_RUN_AS varchar(128), + CC_HIGHEST_TXN_ID NUMBER(19), + CC_META_INFO BLOB, + CC_HADOOP_JOB_ID varchar2(32) +) ROWDEPENDENCIES; + +CREATE TABLE AUX_TABLE ( + MT_KEY1 varchar2(128) NOT NULL, + MT_KEY2 number(19) NOT NULL, + MT_COMMENT varchar2(255), + PRIMARY KEY(MT_KEY1, MT_KEY2) +); + +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar2(128) NOT NULL, + WS_TABLE varchar2(128) NOT NULL, + WS_PARTITION varchar2(767), + WS_TXNID number(19) NOT NULL, + WS_COMMIT_ID number(19) NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); diff --git metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql index 94ee2c4..5939b34 100644 --- metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql +++ metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql @@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual; @030-HIVE-12823.oracle.sql; @031-HIVE-12381.oracle.sql; @032-HIVE-12832.oracle.sql; +@035-HIVE-13395.oracle.sql; UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual; diff --git metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql index 8c065a1..a226d9a 100644 --- metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql +++ metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql @@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual; @033-HIVE-12892.oracle.sql; @034-HIVE-13076.oracle.sql; +@035-HIVE-13395.oracle.sql; UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual; diff --git metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql new file mode 100644 index 0000000..4dda283 --- /dev/null +++ metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql @@ -0,0 +1,10 @@ +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + +ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1); \ No newline at end of file diff --git metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql index e209489..43e984c 100644 --- metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql +++ metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql @@ -1480,7 +1480,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC; ------------------------------ -- Transaction and lock tables ------------------------------ -\i hive-txn-schema-2.0.0.postgres.sql; +\i hive-txn-schema-2.1.0.postgres.sql; -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script diff --git metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql index b2fc1a8..b606f81 100644 --- metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql +++ metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql @@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_TXNID bigint REFERENCES TXNS (TXN_ID), TC_DATABASE varchar(128) NOT NULL, TC_TABLE varchar(128), - TC_PARTITION varchar(767) DEFAULT NULL + TC_PARTITION varchar(767) DEFAULT NULL, + TC_OPERATION_TYPE char(1) NOT NULL ); CREATE TABLE COMPLETED_TXN_COMPONENTS ( @@ -118,4 +119,12 @@ CREATE TABLE AUX_TABLE ( PRIMARY KEY(MT_KEY1, MT_KEY2) ); +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); diff --git metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql new file mode 100644 index 0000000..262b93e --- /dev/null +++ metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql @@ -0,0 +1,129 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- +-- Tables for transaction management +-- + +CREATE TABLE TXNS ( + TXN_ID bigint PRIMARY KEY, + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER varchar(128) NOT NULL, + TXN_HOST varchar(128) NOT NULL, + TXN_AGENT_INFO varchar(128), + TXN_META_INFO varchar(128), + TXN_HEARTBEAT_COUNT integer +); + +CREATE TABLE TXN_COMPONENTS ( + TC_TXNID bigint REFERENCES TXNS (TXN_ID), + TC_DATABASE varchar(128) NOT NULL, + TC_TABLE varchar(128), + TC_PARTITION varchar(767) DEFAULT NULL, + TC_OPERATION_TYPE char(1) NOT NULL +); + +CREATE TABLE COMPLETED_TXN_COMPONENTS ( + CTC_TXNID bigint, + CTC_DATABASE varchar(128) NOT NULL, + CTC_TABLE varchar(128), + CTC_PARTITION varchar(767) +); + +CREATE TABLE NEXT_TXN_ID ( + NTXN_NEXT bigint NOT NULL +); +INSERT INTO NEXT_TXN_ID VALUES(1); + +CREATE TABLE HIVE_LOCKS ( + HL_LOCK_EXT_ID bigint NOT NULL, + HL_LOCK_INT_ID bigint NOT NULL, + HL_TXNID bigint, + HL_DB varchar(128) NOT NULL, + HL_TABLE varchar(128), + HL_PARTITION varchar(767) DEFAULT NULL, + HL_LOCK_STATE char(1) NOT NULL, + HL_LOCK_TYPE char(1) NOT NULL, + HL_LAST_HEARTBEAT bigint NOT NULL, + HL_ACQUIRED_AT bigint, + HL_USER varchar(128) NOT NULL, + HL_HOST varchar(128) NOT NULL, + HL_HEARTBEAT_COUNT integer, + HL_AGENT_INFO varchar(128), + HL_BLOCKEDBY_EXT_ID bigint, + HL_BLOCKEDBY_INT_ID bigint, + PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +); + +CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS USING hash (HL_TXNID); + +CREATE TABLE NEXT_LOCK_ID ( + NL_NEXT bigint NOT NULL +); +INSERT INTO NEXT_LOCK_ID VALUES(1); + +CREATE TABLE COMPACTION_QUEUE ( + CQ_ID bigint PRIMARY KEY, + CQ_DATABASE varchar(128) NOT NULL, + CQ_TABLE varchar(128) NOT NULL, + CQ_PARTITION varchar(767), + CQ_STATE char(1) NOT NULL, + CQ_TYPE char(1) NOT NULL, + CQ_WORKER_ID varchar(128), + CQ_START bigint, + CQ_RUN_AS varchar(128), + CQ_HIGHEST_TXN_ID bigint, + CQ_META_INFO bytea, + CQ_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( + NCQ_NEXT bigint NOT NULL +); +INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); + +CREATE TABLE COMPLETED_COMPACTIONS ( + CC_ID bigint PRIMARY KEY, + CC_DATABASE varchar(128) NOT NULL, + CC_TABLE varchar(128) NOT NULL, + CC_PARTITION varchar(767), + CC_STATE char(1) NOT NULL, + CC_TYPE char(1) NOT NULL, + CC_WORKER_ID varchar(128), + CC_START bigint, + CC_END bigint, + CC_RUN_AS varchar(128), + CC_HIGHEST_TXN_ID bigint, + CC_META_INFO bytea, + CC_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE AUX_TABLE ( + MT_KEY1 varchar(128) NOT NULL, + MT_KEY2 bigint NOT NULL, + MT_COMMENT varchar(255), + PRIMARY KEY(MT_KEY1, MT_KEY2) +); + +CREATE TABLE WRITE_SET ( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); diff --git metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql index 6eb5620..b1bcac0 100644 --- metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql +++ metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql @@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0'; \i 029-HIVE-12823.postgres.sql; \i 030-HIVE-12831.postgres.sql; \i 031-HIVE-12832.postgres.sql; +\i 034-HIVE-13395.postgres.sql; UPDATE "VERSION" SET "SCHEMA_VERSION"='1.3.0', "VERSION_COMMENT"='Hive release version 1.3.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0'; diff --git metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql index e96a6ec..7fc603f 100644 --- metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql +++ metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql @@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0'; \i 032-HIVE-12892.postgres.sql; \i 033-HIVE-13076.postgres.sql; +\i 034-HIVE-13395.postgres.sql; UPDATE "VERSION" SET "SCHEMA_VERSION"='2.1.0', "VERSION_COMMENT"='Hive release version 2.1.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0'; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ed2057a..2ab0133 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6719,6 +6719,7 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception { } startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService")); } private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index c82d23a..9720339 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -21,11 +21,13 @@ import java.sql.Driver; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLTransactionRollbackException; import java.sql.Statement; import java.util.Properties; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -82,7 +84,8 @@ public static void prepDb() throws Exception { " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + " TC_DATABASE varchar(128) NOT NULL," + " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767))"); + " TC_PARTITION varchar(767)," + + " TC_OPERATION_TYPE char(1))"); stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + " CTC_TXNID bigint," + " CTC_DATABASE varchar(128) NOT NULL," + @@ -146,18 +149,24 @@ public static void prepDb() throws Exception { " CC_HADOOP_JOB_ID varchar(32))"); stmt.execute("CREATE TABLE AUX_TABLE (" + - " MT_KEY1 varchar(128) NOT NULL," + - " MT_KEY2 bigint NOT NULL," + - " MT_COMMENT varchar(255)," + - " PRIMARY KEY(MT_KEY1, MT_KEY2)" + - ")"); - - conn.commit(); + " MT_KEY1 varchar(128) NOT NULL," + + " MT_KEY2 bigint NOT NULL," + + " MT_COMMENT varchar(255)," + + " PRIMARY KEY(MT_KEY1, MT_KEY2))"); + + stmt.execute("CREATE TABLE WRITE_SET (" + + " WS_DATABASE varchar(128) NOT NULL," + + " WS_TABLE varchar(128) NOT NULL," + + " WS_PARTITION varchar(767)," + + " WS_TXNID bigint NOT NULL," + + " WS_COMMIT_ID bigint NOT NULL," + + " WS_OPERATION_TYPE char(1) NOT NULL)" + ); } catch (SQLException e) { try { conn.rollback(); } catch (SQLException re) { - System.err.println("Error rolling back: " + re.getMessage()); + LOG.error("Error rolling back: " + re.getMessage()); } // This might be a deadlock, if so, let's retry @@ -174,41 +183,60 @@ public static void prepDb() throws Exception { } public static void cleanDb() throws Exception { - Connection conn = null; - Statement stmt = null; - try { - conn = getConnection(); - stmt = conn.createStatement(); - - // We want to try these, whether they succeed or fail. + int retryCount = 0; + while(++retryCount <= 3) { + boolean success = true; + Connection conn = null; + Statement stmt = null; try { - stmt.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (Exception e) { - System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage()); - } + conn = getConnection(); + stmt = conn.createStatement(); - dropTable(stmt, "TXN_COMPONENTS"); - dropTable(stmt, "COMPLETED_TXN_COMPONENTS"); - dropTable(stmt, "TXNS"); - dropTable(stmt, "NEXT_TXN_ID"); - dropTable(stmt, "HIVE_LOCKS"); - dropTable(stmt, "NEXT_LOCK_ID"); - dropTable(stmt, "COMPACTION_QUEUE"); - dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID"); - dropTable(stmt, "COMPLETED_COMPACTIONS"); - dropTable(stmt, "AUX_TABLE"); - conn.commit(); - } finally { - closeResources(conn, stmt, null); + // We want to try these, whether they succeed or fail. + try { + stmt.execute("DROP INDEX HL_TXNID_INDEX"); + } catch (SQLException e) { + if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { + //42X65/3000 means index doesn't exist + LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() + + "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); + success = false; + } + } + + success &= dropTable(stmt, "TXN_COMPONENTS", retryCount); + success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); + success &= dropTable(stmt, "TXNS", retryCount); + success &= dropTable(stmt, "NEXT_TXN_ID", retryCount); + success &= dropTable(stmt, "HIVE_LOCKS", retryCount); + success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount); + success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount); + success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount); + success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount); + success &= dropTable(stmt, "AUX_TABLE", retryCount); + success &= dropTable(stmt, "WRITE_SET", retryCount); + } finally { + closeResources(conn, stmt, null); + } + if(success) { + return; + } } } - private static void dropTable(Statement stmt, String name) { + private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException { try { stmt.execute("DROP TABLE " + name); - } catch (Exception e) { - System.err.println("Unable to drop table " + name + ": " + e.getMessage()); + return true; + } catch (SQLException e) { + if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) { + //failed because object doesn't exist + return true; + } + LOG.error("Unable to drop table " + name + ": " + e.getMessage() + + " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); } + return false; } /** @@ -259,6 +287,32 @@ public static int countQueryAgent(String countQuery) throws Exception { closeResources(conn, stmt, rs); } } + @VisibleForTesting + public static String queryToString(String query) throws Exception { + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + StringBuilder sb = new StringBuilder(); + try { + conn = getConnection(); + stmt = conn.createStatement(); + rs = stmt.executeQuery(query); + ResultSetMetaData rsmd = rs.getMetaData(); + for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rsmd.getColumnName(colPos)).append(" "); + } + sb.append('\n'); + while(rs.next()) { + for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rs.getObject(colPos)).append(" "); + } + sb.append('\n'); + } + } finally { + closeResources(conn, stmt, rs); + } + return sb.toString(); + } static Connection getConnection() throws Exception { HiveConf conf = new HiveConf(); @@ -272,7 +326,7 @@ static Connection getConnection() throws Exception { prop.setProperty("user", user); prop.setProperty("password", passwd); Connection conn = driver.connect(driverUrl, prop); - conn.setAutoCommit(false); + conn.setAutoCommit(true); return conn; } @@ -281,7 +335,7 @@ static void closeResources(Connection conn, Statement stmt, ResultSet rs) { try { rs.close(); } catch (SQLException e) { - System.err.println("Error closing ResultSet: " + e.getMessage()); + LOG.error("Error closing ResultSet: " + e.getMessage()); } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index df6591f..97fb722 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.jsonexplain.tez.Op; import org.apache.hadoop.hive.metastore.Warehouse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +122,41 @@ static private DataSource connPool; static private boolean doRetryOnConnPool = false; + + private enum OpertaionType { + INSERT('i'), UPDATE('u'), DELETE('d'); + private final char sqlConst; + OpertaionType(char sqlConst) { + this.sqlConst = sqlConst; + } + public String toString() { + return Character.toString(sqlConst); + } + public static OpertaionType fromString(char sqlConst) { + switch (sqlConst) { + case 'i': + return INSERT; + case 'u': + return UPDATE; + case 'd': + return DELETE; + default: + throw new IllegalArgumentException(quoteChar(sqlConst)); + } + } + //we should instead just pass in OpertaionType from client + @Deprecated + public static OpertaionType fromLockType(LockType lockType) { + switch (lockType) { + case SHARED_READ: + return INSERT; + case SHARED_WRITE: + return UPDATE; + default: + throw new IllegalArgumentException("Unexpected lock type: " + lockType); + } + } + } /** * Number of consecutive deadlocks we have seen @@ -449,6 +485,30 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } } + /** + * Concurrency/isolation notes: + * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)} + * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X + * see more notes below. + * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn + * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence + * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of + * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap. + * + * Motivating example: + * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1 + * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot + * that they read appropriately. In particular, if txns do not overlap, then one follows the other + * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure + * this by locking in snapshot after + * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()}) + * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure + * that txn T which will be considered a later txn, locks in a snapshot that includes the result + * of S's commit (assuming no other txns). + * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions + * were running in parallel). If T and S both locked in the same snapshot (for example commit of txnid:2) + * 'x' would be updated to the same value by both, i.e. lost update. + */ public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); @@ -456,40 +516,113 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = null; Statement stmt = null; ResultSet lockHandle = null; + ResultSet commitIdRs = null, rs; try { lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + /** + * This S4U will mutex with other commitTxn() and openTxns(). + * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial + * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start + * at the same time and no new txns start until all 3 commit. + * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. + */ + commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); + if(!commitIdRs.next()) { + throw new IllegalStateException("No rows found in NEXT_TXN_ID"); + } + long commitId = commitIdRs.getLong(1); /** * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other * operation can change this txn (such acquiring locks). While lock() and commitTxn() * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); if(lockHandle == null) { //this also ensures that txn is still there and in expected state (hasn't been timed out) ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); } - + Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + + " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + + "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"); + if(numCompsWritten == 0) { + /** + * current txn didn't update/delete anything (may have inserted), so just proceed with commit + * + * We only care about commit id for write txns, so for RO (when supported) txns we don't + * have to mutex on NEXT_TXN_ID. + * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's + * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn. + * If RO < W, then there is no reads-from relationship. + */ + } + else { + /** + * see if there are any overlapping txns wrote the same element, i.e. have a conflict + * Since entire commit operation is mutexed wrt other start/commit ops, + * committed.ws_commit_id <= current.ws_commit_id for all txns + * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap + * For example, [17,20] is committed, [6,80] is being committed right now - these overlap + * [17,20] committed and [21,21] committing now - these do not overlap. + * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) + */ + rs = stmt.executeQuery + (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + + "committed.ws_table, committed.ws_partition, cur.ws_commit_id " + + "from WRITE_SET committed INNER JOIN WRITE_SET cur " + + "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + + "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + + "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid + // with txnid, though any decent DB should infer this + " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as + // part of this commit() op + " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns + //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all + " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + + " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")")); + if(rs.next()) { + //found a conflict + String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]"; + StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4)); + String partitionName = rs.getString(5); + if(partitionName != null) { + resource.append('/').append(partitionName); + } + String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource + + " committed by " + committedTxn; + close(rs); + //remove WRITE_SET info for current txn since it's about to abort + dbConn.rollback(undoWriteSetForCurrentTxn); + LOG.info(msg); + //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this + if(abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + close(null, stmt, dbConn); + throw new TxnAbortedException(msg); + } + else { + //no conflicting operations, proceed with the rest of commit sequence + } + } // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { - //this can be reasonable for an empty txn START/COMMIT + //this can be reasonable for an empty txn START/COMMIT or read-only txn LOG.info("Expected to move at least one record from txn_components to " + "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid)); } - - // Always access TXN_COMPONENTS before HIVE_LOCKS; s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - // Always access HIVE_LOCKS before TXNS s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); @@ -505,6 +638,7 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + close(commitIdRs); close(lockHandle, stmt, dbConn); unlockInternal(); } @@ -512,7 +646,50 @@ public void commitTxn(CommitTxnRequest rqst) commitTxn(rqst); } } - + @Override + public void performWriteSetGC() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + if(!rs.next()) { + throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); + } + long highestAllocatedTxnId = rs.getLong(1); + close(rs); + rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); + if(!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + long commitHighWaterMark; + long lowestOpenTxnId = rs.getLong(1); + if(rs.wasNull()) { + //if here then there are no Open txns and highestAllocatedTxnId must be + //resolved (i.e. committed or aborted), either way + //there are no open txns with id <= highestAllocatedTxnId + //the +1 is there because "delete ..." below has < (which is correct for the case when + //there is an open txn + //Concurrency: even if new txn starts (or starts + commits) it is still true that + //there are no currently open txns that overlap with any committed txn with + //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. + commitHighWaterMark = highestAllocatedTxnId + 1; + } + else { + commitHighWaterMark = lowestOpenTxnId; + } + int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); + LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET"); + dbConn.commit(); + } catch (SQLException ex) { + LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex); + } + finally { + close(rs, stmt, dbConn); + } + } /** * As much as possible (i.e. in absence of retries) we want both operations to be done on the same * connection (but separate transactions). This avoid some flakiness in BONECP where if you @@ -540,7 +717,7 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { /** * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read - * and then executeUpdate(). One other alternative would be to actually update the row in TXNX but + * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but * to the same value as before thus forcing db to acquire write lock for duration of the transaction. * * There is no real reason to return the ResultSet here other than to make sure the reference to it @@ -611,6 +788,18 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc stmt.executeUpdate(s); if (txnid > 0) { + /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get) + * So if we add that to LockRequest we'll know that here + * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest + * to contain LockComponent for multiple operations + * + * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc + * FileSinkDesc.table is ql.metadata.Table + * Table.tableSpec which is TableSpec, which has specType which is SpecType + * So maybe this can work to know that this is part of dynamic partition insert in which case + * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here. + * In any case, that's an optimization for now; will be required when adding multi-stmt txns + */ // For each component in this lock request, // add an entry to the txn_components table // This must be done before HIVE_LOCKS is accessed @@ -619,10 +808,11 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc String tblName = lc.getTablename(); String partName = lc.getPartitionname(); s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition) " + + "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " + "values (" + txnid + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'") + ")"; + (partName == null ? "null" : "'" + partName + "'")+ "," + + quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -693,9 +883,8 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long lockInternal(); if(dbConn.isClosed()) { //should only get here if retrying this op - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); } - dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -751,7 +940,6 @@ public LockResponse checkLock(CheckLockRequest rqst) //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -1157,11 +1345,29 @@ private static void shouldNeverHappen(long txnid, long extLockId, long intLockId throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " " + JavaUtils.lockIdToString(extLockId) + " " + intLockId); } + + /** + * todo: need to know if this is I/U/D since (at lest currently) + * U conflicts with U/D, + * and others are conflict free + * (D cannot conflict with I, you can't delete something that hasn't been inserted yet) + * D+D don't conflict in material way) + * + * MoveTask knows if it's I/U/D + * MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions() + * which ends up here so we'd need to add a field to AddDynamicPartitions. + * + * @param rqst dynamic partition info. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; ResultSet lockHandle = null; + ResultSet rs = null; try { try { lockInternal(); @@ -1173,18 +1379,32 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) ensureValidTxn(dbConn, rqst.getTxnid(), stmt); shouldNeverHappen(rqst.getTxnid()); } + //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to + //for multi stmt txns if same table is written more than once per tx + String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); + //do limit 1 on this; currently they will all have the same operations + rs = stmt.executeQuery(addLimitClause(1, findOperationType)); + if(!rs.next()) { + throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid())); + } + OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0)); + + //what if a txn writes the same table > 1 time... let's go with this for now, but really + //need to not write this in the first place, i.e. make this delete not needed + //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS + String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" + + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename()); + //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is + //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which + //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually + //written to + stmt.executeUpdate(deleteSql); for (String partName : rqst.getPartitionnames()) { - StringBuilder buff = new StringBuilder(); - buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values ("); - buff.append(rqst.getTxnid()); - buff.append(", '"); - buff.append(rqst.getDbname()); - buff.append("', '"); - buff.append(rqst.getTablename()); - buff.append("', '"); - buff.append(partName); - buff.append("')"); - String s = buff.toString(); + String s = + "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" + + rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) + + "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1903,35 +2123,112 @@ private static boolean isValidTxn(long txnId) { return txnId != 0; } /** + * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller + * hl_lock_ext_id by only checking earlier locks. + * + * For any given SQL statment all locks required by it are grouped under single extLockId and are + * granted all at once or all locks wait. + * + * This is expected to run at READ_COMMITTED. + * * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take * all locks for given extLockId or none. Would be more efficient to update state on all locks - * at once. Semantics are the same since this is all part of the same txn@serializable. + * at once. Semantics are the same since this is all part of the same txn. * - * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller - * hl_lock_ext_id by only checking earlier locks. + * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS. + * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid, + * checkLock() will in the worst case keep locks in Waiting state a little longer. */ - private LockResponse checkLock(Connection dbConn, - long extLockId) + private LockResponse checkLock(Connection dbConn, long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { - if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) { - //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations - //that would be less prone to deadlocks - throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation()); - } - List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now + TxnStore.MutexAPI.LockHandle handle = null; + Statement stmt = null; + ResultSet rs = null; LockResponse response = new LockResponse(); - response.setLockid(extLockId); + /** + * Longer term we should pass this from client somehow - this would be an optimization; once + * that is in place make sure to build and test "writeSet" below using OperationType not LockType + */ + boolean isPartOfDynamicPartitionInsert = true; + try { + /** + * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks + * are not granted by parallel checkLock() calls. + */ + handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name()); + List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now + response.setLockid(extLockId); - LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - Savepoint save = dbConn.setSavepoint();//todo: get rid of this - StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); + LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); + Savepoint save = dbConn.setSavepoint();//todo: get rid of this + StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); + + Set strings = new HashSet(locksBeingChecked.size()); + + //This the set of entities that the statement represnted by extLockId wants to update + List writeSet = new ArrayList<>(); + + for (LockInfo info : locksBeingChecked) { + strings.add(info.db); + if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) { + writeSet.add(info); + } + } + if(!writeSet.isEmpty()) { + if(writeSet.get(0).txnId == 0) { + //Write operation always start a txn + throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid"); + } + stmt = dbConn.createStatement(); + StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " + + "ws_txnid, ws_commit_id " + + "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality + for(LockInfo info : writeSet) { + sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ") + .append(quoteString(info.table)).append(" and ws_partition ") + .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or "); + } + sb.setLength(sb.length() - 4);//nuke trailing " or " + sb.append(")"); + //1 row is sufficient to know we have to kill the query + rs = stmt.executeQuery(addLimitClause(1, sb.toString())); + if(rs.next()) { + /** + * if here, it means we found an already committed txn which overlaps with the current one and + * it updated the same resource the current txn wants to update. By First-committer-wins + * rule, current txn will not be allowed to commit so may as well kill it now; This is just an + * optimization to prevent wasting cluster resources to run a query which is known to be DOA. + * {@link #commitTxn(CommitTxnRequest)} has to primary responsibility to ensure this. + * checkLock() runs at READ_COMMITTED so you could have another (Hive) txn running commitTxn() + * in parallel and thus writing to WRITE_SET. commitTxn() logic is properly mutexed to ensure + * that we don't "miss" any WW conflicts. We could've mutexed the checkLock() and commitTxn() + * as well but this reduces concurrency for very little gain. + * Note that update/delete (which runs as dynamic partition insert) acquires a lock on the table, + * but WRITE_SET has entries for actual partitions updated. Thus this optimization will "miss" + * the WW conflict but it will be caught in commitTxn() where actual partitions written are known. + * This is OK since we want 2 concurrent updates that update different sets of partitions to both commit. + */ + String resourceName = rs.getString(1) + '/' + rs.getString(2); + String partName = rs.getString(3); + if(partName != null) { + resourceName += '/' + partName; + } + + String msg = "Aborting " + JavaUtils.txnIdToString(writeSet.get(0).txnId) + + " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) + + "] has already updated resouce '" + resourceName + "'"; + LOG.info(msg); + if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId)) != 1) { + throw new IllegalStateException(msg + " FAILED!"); + } + dbConn.commit(); + throw new TxnAbortedException(msg); + } + close(rs, stmt, null); + } - Set strings = new HashSet(locksBeingChecked.size()); - for (LockInfo info : locksBeingChecked) { - strings.add(info.db); - } boolean first = true; for (String s : strings) { if (first) first = false; @@ -1994,9 +2291,6 @@ private LockResponse checkLock(Connection dbConn, query.append(" and hl_lock_ext_id <= ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); - Statement stmt = null; - ResultSet rs = null; - try { stmt = dbConn.createStatement(); rs = stmt.executeQuery(query.toString()); SortedSet lockSet = new TreeSet(new LockInfoComparator()); @@ -2112,6 +2406,9 @@ private LockResponse checkLock(Connection dbConn, response.setState(LockState.ACQUIRED); } finally { close(rs, stmt, null); + if(handle != null) { + handle.releaseLocks(); + } } return response; } @@ -2153,7 +2450,7 @@ private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + //if lock is part of txn, heartbeat info is in txn record "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) + - ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + + ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " + extLockId + " and hl_lock_int_id = " + lockInfo.intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -2233,6 +2530,8 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt //todo: add LIMIT 1 instead of count - should be more efficient s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; ResultSet rs2 = stmt.executeQuery(s); + //todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but only + //possible for for multi-stmt txns boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; LOG.debug("Going to rollback"); dbConn.rollback(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 927e9bc..f9cac18 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -47,7 +47,7 @@ @InterfaceStability.Evolving public interface TxnStore { - public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory} + public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner} // Compactor states (Should really be enum) static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; @@ -321,6 +321,12 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, public void purgeCompactionHistory() throws MetaException; /** + * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the + * transaction metadata once it becomes unnecessary. + */ + public void performWriteSetGC(); + + /** * Determine if there are enough consecutive failures compacting a table or partition that no * new automatic compactions should be scheduled. User initiated compactions do not do this * check. diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index cc9e583..b829d9d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -69,6 +69,8 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long * @return a valid txn list. */ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" + + // quoteChar(TXN_OPEN) to compute compute HWM... long highWater = txns.getTxn_high_water_mark(); long minOpenTxn = Long.MAX_VALUE; long[] exceptions = new long[txns.getOpen_txnsSize()]; diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 2c1560b..80e3cd6 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -413,7 +413,7 @@ public void addDynamicPartitions() throws Exception { lc.setTablename(tableName); LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost"); lr.setTxnid(txnId); - LockResponse lock = txnHandler.lock(new LockRequest(Arrays.asList(lc), "me", "localhost")); + LockResponse lock = txnHandler.lock(lr); assertEquals(LockState.ACQUIRED, lock.getState()); txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName, @@ -429,8 +429,8 @@ public void addDynamicPartitions() throws Exception { assertEquals(dbName, ci.dbname); assertEquals(tableName, ci.tableName); switch (i++) { - case 0: assertEquals("ds=today", ci.partName); break; - case 1: assertEquals("ds=yesterday", ci.partName); break; + case 0: assertEquals("ds=today", ci.partName); break; + case 1: assertEquals("ds=yesterday", ci.partName); break; default: throw new RuntimeException("What?"); } } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 28d0269..1a118a9 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -483,6 +483,7 @@ public void testLockSRSW() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); } @@ -514,6 +515,7 @@ public void testLockESRSW() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.WAITING); } @@ -580,6 +582,7 @@ public void testLockSWSR() throws Exception { List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); LockResponse res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); @@ -602,6 +605,7 @@ public void testLockSWSWSR() throws Exception { List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); LockResponse res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); @@ -611,6 +615,7 @@ public void testLockSWSWSR() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.WAITING); @@ -633,6 +638,7 @@ public void testLockSWSWSW() throws Exception { List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); LockResponse res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); @@ -642,6 +648,7 @@ public void testLockSWSWSW() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.WAITING); @@ -651,6 +658,7 @@ public void testLockSWSWSW() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.WAITING); } @@ -682,6 +690,7 @@ public void testLockEESW() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); assertTrue(res.getState() == LockState.WAITING); } @@ -725,6 +734,8 @@ public void testCheckLockAcquireAfterWaiting() throws Exception { List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); + long txnId = openTxn(); + req.setTxnid(txnId); LockResponse res = txnHandler.lock(req); long lockid1 = res.getLockid(); assertTrue(res.getState() == LockState.ACQUIRED); @@ -735,11 +746,12 @@ public void testCheckLockAcquireAfterWaiting() throws Exception { components.clear(); components.add(comp); req = new LockRequest(components, "me", "localhost"); + req.setTxnid(openTxn()); res = txnHandler.lock(req); long lockid2 = res.getLockid(); assertTrue(res.getState() == LockState.WAITING); - txnHandler.unlock(new UnlockRequest(lockid1)); + txnHandler.abortTxn(new AbortTxnRequest(txnId)); res = txnHandler.checkLock(new CheckLockRequest(lockid2)); assertTrue(res.getState() == LockState.ACQUIRED); } @@ -1070,16 +1082,14 @@ public void testCompactMinorNoPartition() throws Exception { @Test public void showLocks() throws Exception { long begining = System.currentTimeMillis(); - long txnid = openTxn(); LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); - req.setTxnid(txnid); LockResponse res = txnHandler.lock(req); // Open txn - txnid = openTxn(); + long txnid = openTxn(); comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb"); comp.setTablename("mytable"); components = new ArrayList(1); @@ -1090,7 +1100,7 @@ public void showLocks() throws Exception { // Locks not associated with a txn components = new ArrayList(1); - comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb"); + comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb"); comp.setTablename("yourtable"); comp.setPartitionname("yourpartition"); components.add(comp); @@ -1104,14 +1114,13 @@ public void showLocks() throws Exception { for (int i = 0; i < saw.length; i++) saw[i] = false; for (ShowLocksResponseElement lock : locks) { if (lock.getLockid() == 1) { - assertEquals(1, lock.getTxnid()); + assertEquals(0, lock.getTxnid()); assertEquals("mydb", lock.getDbname()); assertNull(lock.getTablename()); assertNull(lock.getPartname()); assertEquals(LockState.ACQUIRED, lock.getState()); assertEquals(LockType.EXCLUSIVE, lock.getType()); - assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && - lock.getTxnid() != 0); + assertTrue(lock.toString(), 0 != lock.getLastheartbeat()); assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining + " and " + System.currentTimeMillis(), begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); @@ -1119,7 +1128,7 @@ public void showLocks() throws Exception { assertEquals("localhost", lock.getHostname()); saw[0] = true; } else if (lock.getLockid() == 2) { - assertEquals(2, lock.getTxnid()); + assertEquals(1, lock.getTxnid()); assertEquals("mydb", lock.getDbname()); assertEquals("mytable", lock.getTablename()); assertNull(lock.getPartname()); @@ -1137,7 +1146,7 @@ public void showLocks() throws Exception { assertEquals("yourtable", lock.getTablename()); assertEquals("yourpartition", lock.getPartname()); assertEquals(LockState.ACQUIRED, lock.getState()); - assertEquals(LockType.SHARED_WRITE, lock.getType()); + assertEquals(LockType.SHARED_READ, lock.getType()); assertTrue(lock.toString(), begining <= lock.getLastheartbeat() && System.currentTimeMillis() >= lock.getLastheartbeat()); assertTrue(begining <= lock.getAcquiredat() && diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 1de3309..52dadb7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -377,7 +377,7 @@ "instantiated, check hive.txn.manager"), TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " + "may have timed out", true), - TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true), + TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}. Reason: {1}", true), DBTXNMGR_REQUIRES_CONCURRENCY(10264, "To use DbTxnManager you must set hive.support.concurrency=true"), TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true), diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 7fa57d6..18ed864 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -172,8 +172,9 @@ else if(l.txnId == 0) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(lock.getTxnid())); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(lock.getTxnid())); } catch (TxnAbortedException e) { - LOG.error("Transaction " + JavaUtils.txnIdToString(lock.getTxnid()) + " already aborted."); - throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid())); + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()), e.getMessage()); + LOG.error(le.getMessage()); + throw le; } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index e8ebe55..b6b8b2b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -106,6 +106,8 @@ void setHiveConf(HiveConf conf) { @Override public long openTxn(String user) throws LockException { + //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call + //whenever it chooses init(); if(isTxnOpen()) { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); @@ -131,8 +133,17 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { - acquireLocks(plan, ctx, username, true); - startHeartbeat(); + try { + acquireLocks(plan, ctx, username, true); + startHeartbeat(); + } + catch(LockException e) { + if(e.getCause() instanceof TxnAbortedException) { + txnId = 0; + statementId = -1; + } + throw e; + } } /** @@ -156,7 +167,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB // For each source to read, get a shared lock for (ReadEntity input : plan.getInputs()) { if (!input.needsLock() || input.isUpdateOrDelete()) { - // We don't want to acquire readlocks during update or delete as we'll be acquiring write + // We don't want to acquire read locks during update or delete as we'll be acquiring write // locks instead. continue; } @@ -308,8 +319,9 @@ public void commitTxn() throws LockException { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); } catch (TxnAbortedException e) { - LOG.error("Transaction " + JavaUtils.txnIdToString(txnId) + " aborted"); - throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId)); + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage()); + LOG.error(le.getMessage()); + throw le; } catch (TException e) { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); @@ -377,8 +389,9 @@ public void heartbeat() throws LockException { LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); } catch (TxnAbortedException e) { - LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId)); - throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId)); + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage()); + LOG.error(le.getMessage()); + throw le; } catch (TException e) { throw new LockException( ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId) diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java new file mode 100644 index 0000000..9085a6a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Periodically cleans WriteSet tracking information used in Transaction management + */ +public class AcidWriteSetService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class); + @Override + protected long getStartDelayMs() { + return 0; + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.WRITE_SET_REAPER_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new WriteSetReaper(hiveConf, isAliveCounter); + } + @Override + public String getServiceDescription() { + return "Periodically cleans obsolete WriteSet tracking information used in Transaction management"; + } + private static final class WriteSetReaper implements Runnable { + private final TxnStore txnHandler; + private final AtomicInteger isAliveCounter; + private WriteSetReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = TxnUtils.getTxnStore(hiveConf); + this.isAliveCounter = isAliveCounter; + } + @Override + public void run() { + TxnStore.MutexAPI.LockHandle handle = null; + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name()); + long startTime = System.currentTimeMillis(); + txnHandler.performWriteSetGC(); + int count = isAliveCounter.incrementAndGet(); + LOG.info("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java index 947f17c..caab10d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java @@ -81,7 +81,7 @@ public int getIsAliveCounter() { */ protected abstract long getStartDelayMs(); /** - * Determines how fequently the service is running its task. + * Determines how frequently the service is running its task. */ protected abstract long getIntervalMs(); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index abbe5d4..949cbd5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -147,7 +147,7 @@ public void run() { if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + - ci + ". Marking clean to avoid repeated failures, " + + ci + ". Marking failed to avoid repeated failures, " + "" + StringUtils.stringifyException(t)); txnHandler.markFailed(ci); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 6238e2b..767c10c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -182,7 +182,7 @@ public Object run() throws Exception { txnHandler.markCompacted(ci); } catch (Exception e) { LOG.error("Caught exception while trying to compact " + ci + - ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); + ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); txnHandler.markFailed(ci); } } catch (Throwable t) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 04c1d17..bdbd042 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -647,7 +647,7 @@ public static void runCleaner(HiveConf hiveConf) throws MetaException { t.run(); } - private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception { + public static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception { int lastCount = houseKeeperService.getIsAliveCounter(); houseKeeperService.start(conf); int maxIter = 10; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index f87dd14..83a2ba3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -65,6 +65,26 @@ public void testCreateFilename() throws Exception { assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023", AcidUtils.createFilename(p, options).toString()); } + @Test + public void testCreateFilenameLargeIds() throws Exception { + Path p = new Path("/tmp"); + Configuration conf = new Configuration(); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .setOldStyle(true).bucket(123456789); + assertEquals("/tmp/123456789_0", + AcidUtils.createFilename(p, options).toString()); + options.bucket(23) + .minimumTransactionId(1234567880) + .maximumTransactionId(1234567890) + .writingBase(true) + .setOldStyle(false); + assertEquals("/tmp/base_1234567890/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + options.writingBase(false); + assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023", + AcidUtils.createFilename(p, options).toString()); + } + @Test public void testParsing() throws Exception { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 836b507..d8f468c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -17,7 +17,13 @@ */ package org.apache.hadoop.hive.ql.lockmgr; -import junit.framework.Assert; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.TestTxnCommands2; +import org.apache.hadoop.hive.ql.txn.AcidWriteSetService; +import org.junit.After; +import org.junit.Assert; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; @@ -29,23 +35,32 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager} * Tests here are "end-to-end"ish and simulate concurrent queries. + * + * The general approach is to use an instance of Driver to use Driver.run() to create tables + * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks(). + * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire + * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then + * simulate interleaved transactional/locking operations but all from within a single thread. + * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB. */ public class TestDbTxnManager2 { private static HiveConf conf = new HiveConf(Driver.class); private HiveTxnManager txnMgr; private Context ctx; private Driver driver; + TxnStore txnHandler; @BeforeClass public static void setUpClass() throws Exception { @@ -60,15 +75,17 @@ public void setUp() throws Exception { driver.init(); TxnDbUtil.cleanDb(); TxnDbUtil.prepDb(); - txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + SessionState ss = SessionState.get(); + ss.initTxnMgr(conf); + txnMgr = ss.getTxnMgr(); Assert.assertTrue(txnMgr instanceof DbTxnManager); + txnHandler = TxnUtils.getTxnStore(conf); + } @After public void tearDown() throws Exception { driver.close(); if (txnMgr != null) txnMgr.closeTxnManager(); - TxnDbUtil.cleanDb(); - TxnDbUtil.prepDb(); } @Test public void createTable() throws Exception { @@ -159,22 +176,24 @@ public void lockConflictDbTable() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6"); checkCmdOnDriver(cpr); + txnMgr.openTxn("Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); - List updateLocks = ctx.getHiveLocks(); - cpr = driver.compileAndRespond("drop database if exists temp"); - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 + checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp")); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + //txnMgr2.openTxn("Fiddler"); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7 List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0)); checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1)); - txnMgr.getLockManager().releaseLocks(updateLocks); - lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); + txnMgr.commitTxn(); + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0)); List xLock = new ArrayList(0); xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - txnMgr.getLockManager().releaseLocks(xLock); + txnMgr2.getLockManager().releaseLocks(xLock); } @Test public void updateSelectUpdate() throws Exception { @@ -182,29 +201,27 @@ public void updateSelectUpdate() throws Exception { checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("delete from T8 where b = 89"); checkCmdOnDriver(cpr); + txnMgr.openTxn("Fifer"); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8 - List deleteLocks = ctx.getHiveLocks(); cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8 checkCmdOnDriver(cpr); - txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler"); - cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1"); - checkCmdOnDriver(cpr); - LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("Fiddler"); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler"); + checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1")); + ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 3, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0)); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1)); checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2)); - txnMgr.getLockManager().releaseLocks(deleteLocks); - lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid()); + txnMgr.rollbackTxn(); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid()); locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0)); checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1)); - List relLocks = new ArrayList(2); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid())); - relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid())); - txnMgr.getLockManager().releaseLocks(relLocks); + txnMgr2.commitTxn(); cpr = driver.run("drop table if exists T6"); locks = getLocks(); Assert.assertEquals("Unexpected number of locks found", 0, locks.size()); @@ -503,12 +520,12 @@ public void testMetastoreTablesCleanup() throws Exception { Assert.assertEquals(0, count); } - private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { - Assert.assertEquals(l.toString(),l.getType(), type); - Assert.assertEquals(l.toString(),l.getState(), state); - Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db)); - Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table)); - Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition)); + private void checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, ShowLocksResponseElement actual) { + Assert.assertEquals(actual.toString(), expectedType, actual.getType()); + Assert.assertEquals(actual.toString(), expectedState,actual.getState()); + Assert.assertEquals(actual.toString(), normalizeCase(expectedDb), normalizeCase(actual.getDbname())); + Assert.assertEquals(actual.toString(), normalizeCase(expectedTable), normalizeCase(actual.getTablename())); + Assert.assertEquals(actual.toString(), normalizeCase(expectedPartition), normalizeCase(actual.getPartname())); } private void checkCmdOnDriver(CommandProcessorResponse cpr) { Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0); @@ -523,4 +540,541 @@ private String normalizeCase(String s) { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); return rsp.getLocks(); } + + /** + * txns update same resource but do not overlap in time - no conflict + */ + @Test + public void testWriteSetTracking1() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART")); + txnMgr.openTxn("Nicholas"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas"); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr.commitTxn(); + txnMgr2.openTxn("Alexandra"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas"); + txnMgr2.commitTxn(); + } + /** + * txns overlap in time but do not update same resource - no conflict + */ + @Test + public void testWriteSetTracking2() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr.openTxn("Peter"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter"); + txnMgr2.openTxn("Catherine"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + //note that "update" uses dynamic partitioning thus lock is on the table not partition + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + txnMgr.commitTxn(); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine"); + txnMgr2.commitTxn(); + } + + /** + * txns overlap and update the same resource - can't commit 2nd txn + */ + @Test + public void testWriteSetTracking3() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + + txnMgr.openTxn("Known"); + txnMgr2.openTxn("Unknown"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); + locks = getLocks(txnMgr2);//should not matter which txnMgr is used here + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1)); + txnMgr.commitTxn(); + LockException expectedException = null; + try { + txnMgr2.commitTxn(); + } + catch (LockException e) { + expectedException = e; + } + Assert.assertTrue("Didn't get exception", expectedException != null); + Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg()); + Assert.assertEquals("Exception msg didn't match", + "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]", + expectedException.getCause().getMessage()); + } + /** + * txns overlap, update same resource, simulate multi-stmt txn case + * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed + */ + @Test + public void testWriteSetTracking4() throws Exception { + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + txnMgr.openTxn("Long Running"); + checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + //for some reason this just locks the table; if I alter table to add this partition, then + //we end up locking both table and partition with share_read. (Plan has 2 ReadEntities)...? + //same for other locks below + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("Short Running"); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running"); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1)); + //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST)); + txnMgr2.commitTxn(); + //Short Running updated nothing, so we expect 0 rows in WRITE_SET + Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + + txnMgr2.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3"); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty + //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), + "default", "tab2", Collections.singletonList("p=two")));//simulate partition update + txnMgr2.commitTxn(); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + + AcidWriteSetService houseKeeper = new AcidWriteSetService(); + TestTxnCommands2.runHouseKeeperService(houseKeeper, conf); + //since T3 overlaps with Long Running (still open) GC does nothing + Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match + txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running"); + //so generate empty Dyn Part call + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), + "default", "tab2", Collections.EMPTY_LIST)); + txnMgr.commitTxn(); + + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 0, locks.size()); + TestTxnCommands2.runHouseKeeperService(houseKeeper, conf); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + } + /** + * overlapping txns updating the same resource but 1st one rolls back; 2nd commits + * @throws Exception + */ + @Test + public void testWriteSetTracking5() throws Exception { + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + + txnMgr.openTxn("Known"); + txnMgr2.openTxn("Unknown"); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Known"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'")); + ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false); + locks = getLocks(txnMgr2);//should not matter which txnMgr is used here + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1)); + txnMgr.rollbackTxn(); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds + Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + } + /** + * check that read query concurrent with txn works ok + */ + @Test + public void testWriteSetTracking6() throws Exception { + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " + + "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113")); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Works"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0)); + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("Horton"); + checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton"); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1)); + txnMgr2.commitTxn();//no conflict + Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0)); + TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + } + + /** + * 2 concurrent txns update different partitions of the same table and succeed + * @throws Exception + */ + @Test + public void testWriteSetTracking7() throws Exception { + Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET")); + CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " + + "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + + //test with predicates such that partition pruning works + txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); + List locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0)); + + //now start concurrent txn + txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1)); + + //this simulates the completion of txnid:2 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=two"))); + txnMgr2.commitTxn();//txnid:2 + + locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0)); + //completion of txnid:3 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2", + Collections.singletonList("p=one"))); + txnMgr.commitTxn();//txnid:3 + //now both txns concurrently updated TAB2 but different partitions. + + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'")); + //2 from txnid:1, 1 from txnid:2, 1 from txnid:3 + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null")); + + //================ + //test with predicates such that partition pruning doesn't kick in + cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4 + txnMgr2.openTxn("T5"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5"); + locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + + //now start concurrent txn + txnMgr.openTxn("T6"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 4, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3)); + + //this simulates the completion of txnid:5 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one"))); + txnMgr2.commitTxn();//txnid:5 + + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + //completion of txnid:6 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + txnMgr.commitTxn();//txnid:6 + + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + //2 from insert + 1 for each update stmt + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + } + /** + * Concurrent updates with partition pruning predicate and w/o one + */ + @Test + public void testWriteSetTracking8() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); + List locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + + //now start concurrent txn + txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + + //this simulates the completion of txnid:2 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one"))); + txnMgr2.commitTxn();//txnid:2 + + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + //completion of txnid:3 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + txnMgr.commitTxn();//txnid:3 + + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + } + /** + * Concurrent update/delete of different partitions - should pass + */ + @Test + public void testWriteSetTracking9() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); + List locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + + //now start concurrent txn + txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + + //this simulates the completion of txnid:2 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=one"))); + txnMgr2.commitTxn();//txnid:2 + + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + //completion of txnid:3 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + txnMgr.commitTxn();//txnid:3 + + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + } + /** + * Concurrent update/delete of same partition - should fail to commit + */ + @Test + public void testWriteSetTracking10() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); + List locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + + //now start concurrent txn + txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + + //this simulates the completion of txnid:2 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + txnMgr2.commitTxn();//txnid:2 + + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + //completion of txnid:3 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + LockException exception = null; + try { + txnMgr.commitTxn();//txnid:3 + } + catch(LockException e) { + exception = e; + } + Assert.assertNotEquals("Expected exception", null, exception); + Assert.assertEquals("Exception msg doesn't match", + "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]", + exception.getCause().getMessage()); + + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 3, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + } + /** + * Concurrent delte/detele of same partition - should pass + * This test doesn't work yet, because we don't yet pass in operation type + * + * todo: Concurrent insert/update of same partition - should pass + */ + @Ignore("HIVE-13622") + @Test + public void testWriteSetTracking11() throws Exception { + CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " + + "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1 + HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + txnMgr2.openTxn("T2"); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2")); + txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2"); + List locks = getLocks(txnMgr2); + Assert.assertEquals("Unexpected lock count", 2, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + + //now start concurrent txn + txnMgr.openTxn("T3"); + checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2")); + ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false); + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1)); + checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2)); + + //this simulates the completion of txnid:2 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + txnMgr2.commitTxn();//txnid:2 + + ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id) + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0)); + //completion of txnid:3 + txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1", + Collections.singletonList("p=two"))); + LockException exception = null; + try { + txnMgr.commitTxn();//txnid:3 + } + catch(LockException e) { + exception = e; + } + Assert.assertNotEquals("Expected exception", null, exception); + Assert.assertEquals("Exception msg doesn't match", + "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]", + exception.getCause().getMessage()); + + //todo: this currently fails since we don't yet set operation type properly + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"), + 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'")); + Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"), + 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null")); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index a247065..1578bfb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; @@ -261,6 +263,8 @@ public void blockedByLockPartition() throws Exception { List components = new ArrayList(1); components.add(comp); LockRequest req = new LockRequest(components, "me", "localhost"); + OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania")); + req.setTxnid(resp.getTxn_ids().get(0)); LockResponse res = txnHandler.lock(req); startCleaner();