diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 524a7a4..54d4ebe 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -56,10 +56,14 @@ public CompactionTxnHandler(HiveConf conf) { Set response = new HashSet(); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + - "ctc_partition from COMPLETED_TXN_COMPONENTS"; + String s = "SELECT DISTINCT " + + TxnDbUtil.getEscape("CTC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CTC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CTC_PARTITION", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + ""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -71,11 +75,21 @@ public CompactionTxnHandler(HiveConf conf) { } // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + - "from TXNS, TXN_COMPONENTS " + - "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + - "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + maxAborted; + s = "SELECT " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + "FROM " + + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + + " AND " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' " + + "GROUP BY " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + "HAVING COUNT(*) > " + maxAborted; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -110,8 +124,11 @@ public void setRunAs(long cq_id, String user) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " SET " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " = '" + user + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -152,9 +169,15 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + String s = "SELECT " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -174,8 +197,12 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; + s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = '" + workerId + "', " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = " + now + ", " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -216,9 +243,13 @@ public void markCompacted(CompactionInfo info) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + + " SET " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + READY_FOR_CLEANING + "', " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null " + + " WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -259,9 +290,17 @@ public void markCompacted(CompactionInfo info) throws MetaException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; + String s = "SELECT " + + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -306,8 +345,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + // Check for completed transactions + String s = "DELETE FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to delete compaction record"); @@ -317,10 +359,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again. - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + s = "DELETE FROM " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CTC_DATABASE", identifierQuoteString) + " = '" + info.dbname + "' and " + + TxnDbUtil.getEscape("CTC_TABLE", identifierQuoteString) + " = '" + info.tableName + "'"; if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; + s += " AND " + TxnDbUtil.getEscape("CTC_PARTITION", identifierQuoteString) + " = '" + info.partName + "'"; } LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) < 1) { @@ -329,10 +372,16 @@ public void markCleaned(CompactionInfo info) throws MetaException { } - s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + s = "SELECT " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " AND " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' AND " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + " = '" + info.dbname + "' AND " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + " = '" + info.tableName + "'"; + if (info.partName != null) + s += " AND " + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set txnids = new HashSet(); @@ -341,7 +390,8 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from txn_components, as there may be aborted txn components StringBuffer buf = new StringBuffer(); - buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); + buf.append("DELETE FROM " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " "); + buf.append("WHERE " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " IN ("); boolean first = true; for (long id : txnids) { if (first) first = false; @@ -349,13 +399,13 @@ public void markCleaned(CompactionInfo info) throws MetaException { buf.append(id); } - buf.append(") and tc_database = '"); + buf.append(") and " + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + " = '"); buf.append(info.dbname); - buf.append("' and tc_table = '"); + buf.append("' and " + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + " = '"); buf.append(info.tableName); buf.append("'"); if (info.partName != null) { - buf.append(" and tc_partition = '"); + buf.append(" and " + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " = '"); buf.append(info.partName); buf.append("'"); } @@ -399,16 +449,21 @@ public void cleanEmptyAbortedTxns() throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + String s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " NOT IN (" + + "SELECT " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " FROM " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + ") AND " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { - StringBuffer buf = new StringBuffer("delete from TXNS where txn_id in ("); + StringBuffer buf = new StringBuffer("DELETE FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " IN ("); boolean first = true; for (long tid : txnids) { if (first) first = false; @@ -457,10 +512,15 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + " SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' and " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " LIKE '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -504,10 +564,15 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { long latestValidStart = getDbTime(dbConn) - timeout; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + " SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' and " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 30cf814..a8c400d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -174,9 +174,11 @@ public static void cleanDb() throws Exception { */ public static int countLockComponents(long lockId) throws Exception { Connection conn = getConnection(); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks where " + - "hl_lock_ext_id = " + lockId); + String query = "SELECT COUNT(*) FROM " + getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + lockId; + ResultSet rs = s.executeQuery(query); if (!rs.next()) return 0; int rc = rs.getInt(1); conn.rollback(); @@ -186,8 +188,9 @@ public static int countLockComponents(long lockId) throws Exception { public static int findNumCurrentLocks() throws Exception { Connection conn = getConnection(); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery("select count(*) from hive_locks"); + ResultSet rs = s.executeQuery("SELECT COUNT(*) FROM " + getEscape("HIVE_LOCKS", identifierQuoteString)); if (!rs.next()) return 0; int rc = rs.getInt(1); conn.rollback(); @@ -209,4 +212,8 @@ private static Connection getConnection() throws Exception { return driver.connect(driverUrl, prop); } + public static String getEscape(String name, String identifierQuoteString) + { + return identifierQuoteString + name + identifierQuoteString; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 063dee6..e15e7ec 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -132,8 +132,10 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " - 1 " + + "FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -147,7 +149,11 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { } List txnInfo = new ArrayList(); - s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; + s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_HOST", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString); LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -194,8 +200,10 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { Statement stmt = null; try { timeOutTxns(dbConn); + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " - 1 " + + "FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -209,7 +217,8 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } Set openList = new HashSet(); - s = "select txn_id from TXNS"; + s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString); LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -254,8 +263,10 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString) + ""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -263,13 +274,19 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { "configured, can't find next transaction id."); } long first = rs.getLong(1); - s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " = " + (first + numTxns); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); long now = getDbTime(dbConn); - s = "insert into TXNS (txn_id, txn_state, txn_started, " + - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + s = "INSERT INTO " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + " (" + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_STARTED", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_USER", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_HOST", identifierQuoteString) + + ") VALUES (?, 'o', " + now + ", " + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to prepare statement <" + s + ">"); PreparedStatement ps = dbConn.prepareStatement(s); List txnIds = new ArrayList(numTxns); @@ -343,6 +360,7 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to // commit it, but it does two things. One, it makes sure the transaction is still valid. @@ -353,8 +371,16 @@ public void commitTxn(CommitTxnRequest rqst) // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. - String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + String s = "INSERT INTO " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + " " + + " SELECT " + + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + " FROM " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + + " WHERE " + + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.warn("Expected to move at least one record from txn_components to " + @@ -362,14 +388,17 @@ public void commitTxn(CommitTxnRequest rqst) } // Always access TXN_COMPONENTS before HIVE_LOCKS; - s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); // Always access HIVE_LOCKS before TXNS - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = "delete from TXNS where txn_id = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -504,8 +533,10 @@ public void unlock(UnlockRequest rqst) LOG.error(msg); throw new TxnOpenException(msg); } + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; + String s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -541,10 +572,22 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { List elems = new ArrayList(); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; + String s = "SELECT " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_ACQUIRED_AT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_HOST", identifierQuoteString) + " " + + " FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString); LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -656,10 +699,12 @@ public void compact(CompactionRequest rqst) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NCQ_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_COMPACTION_QUEUE_ID", identifierQuoteString) + ""; LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -669,16 +714,21 @@ public void compact(CompactionRequest rqst) throws MetaException { "no record found in next_compaction_queue_id"); } long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_COMPACTION_QUEUE_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NCQ_NEXT", identifierQuoteString) + " = " + (id + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); + StringBuilder buf = new StringBuilder("INSERT INTO " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString)); + buf.append(" ("); + buf.append(TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", "); String partName = rqst.getPartitionname(); - if (partName != null) buf.append("cq_partition, "); - buf.append("cq_state, cq_type"); - if (rqst.getRunas() != null) buf.append(", cq_run_as"); + if (partName != null) buf.append(TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ""); + if (rqst.getRunas() != null) buf.append(", " + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + ""); buf.append(") values ("); buf.append(id); buf.append(", '"); @@ -741,9 +791,17 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, cq_run_as from COMPACTION_QUEUE"; + String s = "SELECT " + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -791,8 +849,9 @@ int numLocksInLockTable() throws SQLException, MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select count(*) from HIVE_LOCKS"; + String s = "SELECT COUNT(*) FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); rs.next(); @@ -1085,10 +1144,12 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException Statement stmt = null; int updateCnt = 0; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS - StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); + StringBuilder buf = new StringBuilder("DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + " IN ("); boolean first = true; for (Long id : txnids) { if (first) first = false; @@ -1099,7 +1160,9 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); + buf = new StringBuilder("UPDATE " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' " + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " IN ("); first = true; for (Long id : txnids) { if (first) first = false; @@ -1150,10 +1213,12 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) timeOutLocks(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Get the next lock id. - String s = "select nl_next from NEXT_LOCK_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NL_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_LOCK_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1163,7 +1228,8 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_LOCK_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NL_NEXT", identifierQuoteString) + " = " + (extLockId + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit."); @@ -1182,9 +1248,12 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) String dbName = lc.getDbname(); String tblName = lc.getTablename(); String partName = lc.getPartitionname(); - s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition) " + - "values (" + txnid + ", '" + dbName + "', " + + s = "INSERT INTO " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "(" + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + ") " + + "VALUES (" + txnid + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + ", " + (partName == null ? "null" : "'" + partName + "'") + ")"; LOG.debug("Going to execute update <" + s + ">"); @@ -1206,10 +1275,21 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; } long now = getDbTime(dbConn); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + + + s = "INSERT INTO " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + " " + + " (" + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_HOST", identifierQuoteString) + ")" + + " VALUES (" + extLockId + ", " + + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + ", " + (partName == null ? "null" : "'" + partName + "'") + @@ -1245,9 +1325,17 @@ private LockResponse checkLock(Connection dbConn, LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); - StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); + StringBuilder query = new StringBuilder("SELECT "); + query.append(TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + " "); + query.append(" FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString)); + query.append(" WHERE " + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + " IN ("); Set strings = new HashSet(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1276,7 +1364,8 @@ private LockResponse checkLock(Connection dbConn, } } if (!sawNull) { - query.append(" and (hl_table is null or hl_table in("); + query.append(" AND (" + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + " IS NULL OR "); + query.append(TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + " IN ("); first = true; for (String s : strings) { if (first) first = false; @@ -1300,7 +1389,8 @@ private LockResponse checkLock(Connection dbConn, } } if (!sawNull) { - query.append(" and (hl_partition is null or hl_partition in("); + query.append(" and (" + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + " IS NULL OR "); + query.append(TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + " IN ("); first = true; for (String s : strings) { if (first) first = false; @@ -1429,9 +1519,15 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) throws SQLException, NoSuchLockException, MetaException { long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + intLockId; + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); + String s = "UPDATE " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " SET " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + " = '" + LOCK_ACQUIRED + "', " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + ", " + + TxnDbUtil.getEscape("HL_ACQUIRED_AT", identifierQuoteString) + " = " + now + + " WHERE " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId + " and " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + " = " + intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1452,11 +1548,13 @@ private void heartbeatLock(Connection dbConn, long extLockId) if (extLockId == 0) return; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; + String s = "UPDATE " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1478,10 +1576,13 @@ private void heartbeatTxn(Connection dbConn, long txnid) if (txnid == 0) return; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); long now = getDbTime(dbConn); // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; + String s = "SELECT " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1495,8 +1596,9 @@ private void heartbeatTxn(Connection dbConn, long txnid) throw new TxnAbortedException("Transaction " + txnid + " already aborted"); } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; + s = "UPDATE " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1511,9 +1613,11 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + String s = "SELECT " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1533,10 +1637,20 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + String s = "SELECT " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + " " + + " FROM " + + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); boolean sawAtLeastOne = false; @@ -1562,10 +1676,11 @@ private void timeOutLocks(Connection dbConn) throws SQLException, MetaException long now = getDbTime(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); + String s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " < " + (now - timeout); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1582,10 +1697,13 @@ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { long now = getDbTime(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - // Abort any timed out locks from the table. - String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); + String s = "SELECT " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + "WHERE " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_OPEN + "' and " + + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + " < " + (now - timeout); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); List deadTxns = new ArrayList(); diff --git metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql index 2ebd3b0..20c93d6 100644 --- metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql +++ metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql @@ -17,73 +17,73 @@ -- Tables for transaction management -- -CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, - TXN_STATE char(1) NOT NULL, - TXN_STARTED bigint NOT NULL, - TXN_LAST_HEARTBEAT bigint NOT NULL, - TXN_USER varchar(128) NOT NULL, - TXN_HOST varchar(128) NOT NULL +CREATE TABLE "TXNS" ( + "TXN_ID" bigint PRIMARY KEY, + "TXN_STATE" char(1) NOT NULL, + "TXN_STARTED" bigint NOT NULL, + "TXN_LAST_HEARTBEAT" bigint NOT NULL, + "TXN_USER" varchar(128) NOT NULL, + "TXN_HOST" varchar(128) NOT NULL ); -CREATE TABLE TXN_COMPONENTS ( - TC_TXNID bigint REFERENCES TXNS (TXN_ID), - TC_DATABASE varchar(128) NOT NULL, - TC_TABLE varchar(128), - TC_PARTITION varchar(767) DEFAULT NULL +CREATE TABLE "TXN_COMPONENTS" ( + "TC_TXNID" bigint REFERENCES "TXNS" ("TXN_ID"), + "TC_DATABASE" varchar(128) NOT NULL, + "TC_TABLE" varchar(128), + "TC_PARTITION" varchar(767) DEFAULT NULL ); -CREATE TABLE COMPLETED_TXN_COMPONENTS ( - CTC_TXNID bigint, - CTC_DATABASE varchar(128) NOT NULL, - CTC_TABLE varchar(128), - CTC_PARTITION varchar(767) +CREATE TABLE "COMPLETED_TXN_COMPONENTS" ( + "CTC_TXNID" bigint, + "CTC_DATABASE" varchar(128) NOT NULL, + "CTC_TABLE" varchar(128), + "CTC_PARTITION" varchar(767) ); -CREATE TABLE NEXT_TXN_ID ( - NTXN_NEXT bigint NOT NULL +CREATE TABLE "NEXT_TXN_ID" ( + "NTXN_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO "NEXT_TXN_ID" VALUES(1); -CREATE TABLE HIVE_LOCKS ( - HL_LOCK_EXT_ID bigint NOT NULL, - HL_LOCK_INT_ID bigint NOT NULL, - HL_TXNID bigint, - HL_DB varchar(128) NOT NULL, - HL_TABLE varchar(128), - HL_PARTITION varchar(767) DEFAULT NULL, - HL_LOCK_STATE char(1) NOT NULL, - HL_LOCK_TYPE char(1) NOT NULL, - HL_LAST_HEARTBEAT bigint NOT NULL, - HL_ACQUIRED_AT bigint, - HL_USER varchar(128) NOT NULL, - HL_HOST varchar(128) NOT NULL, - PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +CREATE TABLE "HIVE_LOCKS" ( + "HL_LOCK_EXT_ID" bigint NOT NULL, + "HL_LOCK_INT_ID" bigint NOT NULL, + "HL_TXNID" bigint, + "HL_DB" varchar(128) NOT NULL, + "HL_TABLE" varchar(128), + "HL_PARTITION" varchar(767) DEFAULT NULL, + "HL_LOCK_STATE" char(1) NOT NULL, + "HL_LOCK_TYPE" char(1) NOT NULL, + "HL_LAST_HEARTBEAT" bigint NOT NULL, + "HL_ACQUIRED_AT" bigint, + "HL_USER" varchar(128) NOT NULL, + "HL_HOST" varchar(128) NOT NULL, + PRIMARY KEY("HL_LOCK_EXT_ID", "HL_LOCK_INT_ID") ); -CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS USING hash (HL_TXNID); +CREATE INDEX "HL_TXNID_INDEX" ON "HIVE_LOCKS" USING hash ("HL_TXNID"); -CREATE TABLE NEXT_LOCK_ID ( - NL_NEXT bigint NOT NULL +CREATE TABLE "NEXT_LOCK_ID" ( + "NL_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_LOCK_ID VALUES(1); +INSERT INTO "NEXT_LOCK_ID" VALUES(1); -CREATE TABLE COMPACTION_QUEUE ( - CQ_ID bigint PRIMARY KEY, - CQ_DATABASE varchar(128) NOT NULL, - CQ_TABLE varchar(128) NOT NULL, - CQ_PARTITION varchar(767), - CQ_STATE char(1) NOT NULL, - CQ_TYPE char(1) NOT NULL, - CQ_WORKER_ID varchar(128), - CQ_START bigint, - CQ_RUN_AS varchar(128) +CREATE TABLE "COMPACTION_QUEUE" ( + "CQ_ID" bigint PRIMARY KEY, + "CQ_DATABASE" varchar(128) NOT NULL, + "CQ_TABLE" varchar(128) NOT NULL, + "CQ_PARTITION" varchar(767), + "CQ_STATE" char(1) NOT NULL, + "CQ_TYPE" char(1) NOT NULL, + "CQ_WORKER_ID" varchar(128), + "CQ_START" bigint, + "CQ_RUN_AS" varchar(128) ); -CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( - NCQ_NEXT bigint NOT NULL +CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( + "NCQ_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); +INSERT INTO "NEXT_COMPACTION_QUEUE_ID" VALUES(1);