Index: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (revision 01faca2f9d7dcb0f5feabfcb07fa5ea12b79c5b9) +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (date 1578317069449) @@ -137,7 +137,7 @@ * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is * still valid and active. In the code this is usually achieved at the same time the txn record * is locked for some operation. - * + * * Note on retry logic: * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}. The retry logic there is very @@ -368,7 +368,7 @@ */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -383,8 +383,8 @@ close(rs); List txnInfos = new ArrayList<>(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id, txn_state, txn_user, txn_host, txn_started, txn_last_heartbeat from " + - "TXNS where txn_id <= " + hwm; + s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " + + "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -442,7 +442,7 @@ */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -457,7 +457,7 @@ close(rs); List openList = new ArrayList<>(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id, txn_state, txn_type from TXNS where txn_id <= " + hwm + " order by txn_id"; + s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); long minOpenTxn = Long.MAX_VALUE; @@ -594,7 +594,7 @@ txnType = TxnType.REPL_CREATED; } - String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); + String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -602,7 +602,7 @@ "configured, can't find next transaction id."); } long first = rs.getLong(1); - s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); @@ -620,14 +620,15 @@ paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", + "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", rows, paramsList); for (PreparedStatement pst : insertPreparedStmts) { pst.execute(); } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. - s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN); + s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -645,7 +646,7 @@ // Insert transaction entries into MIN_HISTORY_LEVEL. List inserts = sqlGenerator.createInsertValuesStmt( - "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows); + "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); for (String insert : inserts) { LOG.debug("Going to execute insert <" + insert + ">"); stmt.execute(insert); @@ -668,7 +669,7 @@ } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl, + "\"REPL_TXN_MAP\" (\"RTM_REPL_POLICY\", \"RTM_SRC_TXN_ID\", \"RTM_TARGET_TXN_ID\")", rowsRepl, paramsList); for (PreparedStatement pst : insertPreparedStmts) { pst.execute(); @@ -699,10 +700,10 @@ StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); List targetTxnIdList = new ArrayList<>(); - prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where "); - suffix.append(" and RTM_REPL_POLICY = ?"); + prefix.append("SELECT \"RTM_TARGET_TXN_ID\" FROM \"REPL_TXN_MAP\" WHERE "); + suffix.append(" AND \"RTM_REPL_POLICY\" = ?"); TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList, - "RTM_SRC_TXN_ID", false, false); + "\"RTM_SRC_TXN_ID\"", false, false); List params = Arrays.asList(replPolicy); for (String query : inQueries) { LOG.debug("Going to execute select <" + query.replaceAll("\\?", "{}") + ">", quoteString(replPolicy)); @@ -755,7 +756,7 @@ } private void deleteReplTxnMapEntry(Connection dbConn, long sourceTxnId, String replPolicy) throws SQLException { - String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = ?"; + String s = "DELETE FROM \"REPL_TXN_MAP\" WHERE \"RTM_SRC_TXN_ID\" = " + sourceTxnId + " AND \"RTM_REPL_POLICY\" = ?"; try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(replPolicy))) { LOG.info("Going to execute <" + s.replaceAll("\\?", "{}") + ">", quoteString(replPolicy)); pst.executeUpdate(); @@ -845,12 +846,12 @@ stmt = dbConn.createStatement(); List queries = new ArrayList<>(); - StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where TXN_STATE = ") - .append(quoteChar(TXN_OPEN)).append(" and TXN_TYPE != ").append(TxnType.READ_ONLY.getValue()) + StringBuilder prefix = new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ") + .append(quoteChar(TXN_OPEN)).append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()) .append(" and "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), - txnIds, "TXN_ID", false, false); + txnIds, "\"TXN_ID\"", false, false); Map nonReadOnlyTxns = new HashMap<>(); for (String query : queries) { @@ -928,13 +929,13 @@ pst.close(); // not used select for update as it will be updated by single thread only from repl load - rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"DATABASE_PARAMS\" where \"PARAM_KEY\" = " + - "'repl.last.id' and \"DB_ID\" = " + dbId); + rs = stmt.executeQuery("SELECT \"PARAM_VALUE\" FROM \"DATABASE_PARAMS\" WHERE \"PARAM_KEY\" = " + + "'repl.last.id' AND \"DB_ID\" = " + dbId); if (!rs.next()) { - query = "insert into \"DATABASE_PARAMS\" values ( " + dbId + " , 'repl.last.id' , ? )"; + query = "INSERT INTO \"DATABASE_PARAMS\" VALUES ( " + dbId + " , 'repl.last.id' , ? )"; } else { - query = "update \"DATABASE_PARAMS\" set \"PARAM_VALUE\" = ? where \"DB_ID\" = " + dbId + - " and \"PARAM_KEY\" = 'repl.last.id'"; + query = "UPDATE \"DATABASE_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"DB_ID\" = " + dbId + + " AND \"PARAM_KEY\" = 'repl.last.id'"; } close(rs); params = Arrays.asList(lastReplId); @@ -951,7 +952,7 @@ return; } - query = "select \"TBL_ID\" from \"TBLS\" where \"TBL_NAME\" = ? and \"DB_ID\" = " + dbId; + query = "SELECT \"TBL_ID\" FROM \"TBLS\" WHERE \"TBL_NAME\" = ? AND \"DB_ID\" = " + dbId; params = Arrays.asList(table); pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">", quoteString(table)); @@ -965,13 +966,13 @@ pst.close(); // select for update is not required as only one task will update this during repl load. - rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"TABLE_PARAMS\" where \"PARAM_KEY\" = " + - "'repl.last.id' and \"TBL_ID\" = " + tblId); + rs = stmt.executeQuery("SELECT \"PARAM_VALUE\" FROM \"TABLE_PARAMS\" WHERE \"PARAM_KEY\" = " + + "'REPL.LAST.ID' AND \"TBL_ID\" = " + tblId); if (!rs.next()) { - query = "insert into \"TABLE_PARAMS\" values ( " + tblId + " , 'repl.last.id' , ? )"; + query = "INSERT INTO \"TABLE_PARAMS\" VALUES ( " + tblId + " , 'repl.last.id' , ? )"; } else { - query = "update \"TABLE_PARAMS\" set \"PARAM_VALUE\" = ? where \"TBL_ID\" = " + tblId + - " and \"PARAM_KEY\" = 'repl.last.id'"; + query = "UPDATE \"TABLE_PARAMS\" SET \"PARAM_VALUE\" = ? WHERE \"TBL_ID\" = " + tblId + + " AND \"PARAM_KEY\" = 'repl.last.id'"; } rs.close(); @@ -996,7 +997,7 @@ List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("select \"PART_ID\" from \"PARTITIONS\" where \"TBL_ID\" = " + tblId + " and "); + prefix.append("SELECT \"PART_ID\" FROM \"PARTITIONS\" WHERE \"TBL_ID\" = " + tblId + " and "); // Populate the complete query with provided prefix and suffix List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, @@ -1018,13 +1019,13 @@ prs = pst.executeQuery(); while (prs.next()) { long partId = prs.getLong(1); - rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"PARTITION_PARAMS\" where \"PARAM_KEY\" " + - " = 'repl.last.id' and \"PART_ID\" = " + partId); + rs = stmt.executeQuery("SELECT \"PARAM_VALUE\" FROM \"PARTITION_PARAMS\" WHERE \"PARAM_KEY\" " + + " = 'repl.last.id' AND \"PART_ID\" = " + partId); if (!rs.next()) { - query = "insert into \"PARTITION_PARAMS\" values ( " + partId + " , 'repl.last.id' , ? )"; + query = "INSERT INTO \"PARTITION_PARAMS\" VALUES ( " + partId + " , 'repl.last.id' , ? )"; } else { - query = "update \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? " + - " where \"PART_ID\" = " + partId + " and \"PARAM_KEY\" = 'repl.last.id'"; + query = "UPDATE \"PARTITION_PARAMS\" SET \"PARAM_VALUE\" = ? " + + " WHERE \"PART_ID\" = " + partId + " AND \"PARAM_KEY\" = 'repl.last.id'"; } rs.close(); @@ -1068,7 +1069,7 @@ * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot * that they read appropriately. In particular, if txns do not overlap, then one follows the other * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure - * this by locking in snapshot after + * this by locking in snapshot after * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn) * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure * that txn T which will be considered a later txn, locks in a snapshot that includes the result @@ -1076,7 +1077,7 @@ * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions * were running in parallel). If T and S both locked in the same snapshot (for example commit of * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) - * 'x' would be updated to the same value by both, i.e. lost update. + * 'x' would be updated to the same value by both, i.e. lost update. */ @Override @RetrySemantics.Idempotent("No-op if already committed") @@ -1144,23 +1145,23 @@ if (rqst.isSetReplPolicy()) { rs = null; } else { - conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + + conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")"; rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, - "tc_operation_type " + conflictSQLSuffix)); + "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix)); } if (rs != null && rs.next()) { isUpdateDelete = 'Y'; close(rs); //if here it means currently committing txn performed update/delete and we should check WW conflict /** - * This S4U will mutex with other commitTxn() and openTxns(). + * This S4U will mutex with other commitTxn() and openTxns(). * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start * at the same time and no new txns start until all 3 commit. * We could've incremented the sequence for commitId is well but it doesn't add anything functionally. */ - commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID")); + commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"")); if (!commitIdRs.next()) { throw new IllegalStateException("No rows found in NEXT_TXN_ID"); } @@ -1175,8 +1176,8 @@ * even if it includes all of it's columns */ stmt.executeUpdate( - "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + - " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); + "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + + " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + commitId + ", \"TC_OPERATION_TYPE\" " + conflictSQLSuffix); /** * see if there are any overlapping txns wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, @@ -1187,20 +1188,20 @@ * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running) */ rs = stmt.executeQuery - (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," + - "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " + - "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " + - "from WRITE_SET committed INNER JOIN WRITE_SET cur " + - "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " + + (sqlGenerator.addLimitClause(1, "\"COMMITTED\".\"WS_TXNID\", \"COMMITTED\".\"WS_COMMIT_ID\", " + + "\"COMMITTED\".\"WS_DATABASE\", \"COMMITTED\".\"WS_TABLE\", \"COMMITTED\".\"WS_PARTITION\", " + + "\"CUR\".\"WS_COMMIT_ID\" \"CUR_WS_COMMIT_ID\", \"CUR\".\"WS_OPERATION_TYPE\" \"CUR_OP\", " + + "\"COMMITTED\".\"WS_OPERATION_TYPE\" \"COMMITTED_OP\" FROM \"WRITE_SET\" \"COMMITTED\" INNER JOIN \"WRITE_SET\" \"CUR\" " + + "ON \"COMMITTED\".\"WS_DATABASE\"=\"CUR\".\"WS_DATABASE\" AND \"COMMITTED\".\"WS_TABLE\"=\"CUR\".\"WS_TABLE\" " + //For partitioned table we always track writes at partition level (never at table) //and for non partitioned - always at table level, thus the same table should never //have entries with partition key and w/o - "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " + - "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid + "AND (\"COMMITTED\".\"WS_PARTITION\"=\"CUR\".\"WS_PARTITION\" OR (\"COMMITTED\".\"WS_PARTITION\" IS NULL AND \"CUR\".\"WS_PARTITION\" IS NULL)) " + + "WHERE \"CUR\".\"WS_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + //txns overlap; could replace ws_txnid // with txnid, though any decent DB should infer this - " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as + " AND \"CUR\".\"WS_TXNID\"=" + txnid + //make sure RHS of join only has rows we just inserted as // part of this commitTxn() op - " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns + " AND \"COMMITTED\".\"WS_TXNID\" <> " + txnid + //and LHS only has committed txns //U+U and U+D and D+D is a conflict and we don't currently track I in WRITE_SET at all //it may seem like D+D should not be in conflict but consider 2 multi-stmt txns //where each does "delete X + insert X, where X is a row with the same PK. This is @@ -1208,9 +1209,9 @@ //The same happens when Hive splits U=I+D early so it looks like 2 branches of a //multi-insert stmt (an Insert and a Delete branch). It also 'feels' // un-serializable to allow concurrent deletes - " and (committed.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + + " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + quoteChar(OperationType.DELETE.sqlConst) + - ") AND cur.ws_operation_type IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", " + quoteChar(OperationType.DELETE.sqlConst) + "))")); if (rs.next()) { //found a conflict @@ -1254,13 +1255,13 @@ if (!rqst.isSetReplPolicy()) { // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. - s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " + - "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid," + - " tc_database, tc_table, tc_partition, tc_writeid, '" + isUpdateDelete + - "' from TXN_COMPONENTS where tc_txnid = " + txnid + + s = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" (\"CTC_TXNID\", \"CTC_DATABASE\", " + + "\"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\") SELECT \"TC_TXNID\"," + + " \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_WRITEID\", '" + isUpdateDelete + + "' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid + //we only track compactor activity in TXN_COMPONENTS to handle the case where the //compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS - " AND tc_operation_type <> " + quoteChar(OperationType.COMPACT.sqlConst); + " AND \"TC_OPERATION_TYPE\" <> " + quoteChar(OperationType.COMPACT.sqlConst); LOG.debug("Going to execute insert <" + s + ">"); if ((stmt.executeUpdate(s)) < 1) { @@ -1284,8 +1285,8 @@ paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "COMPLETED_TXN_COMPONENTS " + - "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", + "\"COMPLETED_TXN_COMPONENTS\" " + + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")", rows, paramsList); for (PreparedStatement pst : insertPreparedStmts) { pst.execute(); @@ -1295,21 +1296,21 @@ } // cleanup all txn related metadata - s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = "delete from TXNS where txn_id = " + txnid; + s = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid; + s = "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); - s = "delete from MATERIALIZATION_REBUILD_LOCKS where mrl_txn_id = " + txnid; + s = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); @@ -1326,10 +1327,10 @@ LOG.warn(errorMsg); throw new IllegalArgumentException(errorMsg); } - s = "UPDATE TABLE_PARAMS SET" - + " PARAM_VALUE = " + quoteString(rqst.getKeyValue().getValue()) - + " WHERE TBL_ID = " + rqst.getKeyValue().getTableId() - + " AND PARAM_KEY = " + quoteString(rqst.getKeyValue().getKey()); + s = "UPDATE \"TABLE_PARAMS\" SET" + + " \"PARAM_VALUE\" = " + quoteString(rqst.getKeyValue().getValue()) + + " WHERE \"TBL_ID\" = " + rqst.getKeyValue().getTableId() + + " AND \"PARAM_KEY\" = " + quoteString(rqst.getKeyValue().getKey()); LOG.debug("Going to execute update <" + s + ">"); int affectedRows = stmt.executeUpdate(s); if (affectedRows != 1) { @@ -1403,7 +1404,7 @@ // Check if this txn state is already replicated for this given table. If yes, then it is // idempotent case and just return. - String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"; + String sql = "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND NWI_TABLE = ?"; pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params); LOG.debug("Going to execute query <" + sql.replaceAll("\\?", "{}") + ">", quoteString(dbName), quoteString(tblName)); @@ -1434,7 +1435,7 @@ // Insert entries to TXN_TO_WRITE_ID for aborted write ids insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows, + "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows, paramsList); for (PreparedStatement pst : insertPreparedStmts) { pst.execute(); @@ -1452,7 +1453,7 @@ long nextWriteId = validWriteIdList.getHighWatermark() + 1; // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. - sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + sql = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " + Long.toString(nextWriteId) + ")"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params); @@ -1519,7 +1520,7 @@ String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; List params = Arrays.asList(names[0], names[1]); - String s = "select t2w_txnid from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ? and t2w_writeid = " + writeId; + String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -1606,8 +1607,8 @@ // Find the writeId high water mark based upon txnId high water mark. If found, then, need to // traverse through all write Ids less than writeId HWM to make exceptions list. // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm)) - String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + Long.toString(txnHwm) - + " and t2w_database = ? and t2w_table = ?"; + String s = "SELECT MAX(\"T2W_WRITEID\") FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" <= " + Long.toString(txnHwm) + + " AND \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?"; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), quoteString(names[1])); @@ -1620,7 +1621,7 @@ if (writeIdHwm <= 0) { // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest // allocated write id. - s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"; + s = "SELECT \"NWI_NEXT\"-1 FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; closeStmt(pst); pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", @@ -1639,8 +1640,8 @@ // then will be added to invalid list. The results should be sorted in ascending order based // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up // using binary search. - s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + Long.toString(writeIdHwm) - + " and t2w_database = ? and t2w_table = ? order by t2w_writeid asc"; + s = "SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_WRITEID\" <= " + Long.toString(writeIdHwm) + + " AND \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? ORDER BY \"T2W_WRITEID\" ASC"; closeStmt(pst); pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", @@ -1747,10 +1748,10 @@ // write id for the same db.table. If yes, then need to reuse it else have to allocate new one // The write id would have been already allocated in case of multi-statement txns where // first write on a table will allocate write id and rest of the writes should re-use it. - prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where" - + " t2w_database = ? and t2w_table = ?" + " and "); + prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, - txnIds, "t2w_txnid", false, false); + txnIds, "\"T2W_TXNID\"", false, false); long allocatedTxnsCount = 0; long txnId; @@ -1794,7 +1795,7 @@ // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID String s = sqlGenerator.addForUpdateClause( - "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"); + "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", @@ -1805,7 +1806,7 @@ // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here // For repl flow, we need to force set the incoming write id. writeId = (srcWriteId > 0) ? srcWriteId : 1; - s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " + Long.toString(writeId + numOfWriteIds) + ")"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); @@ -1817,8 +1818,8 @@ writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated - s = "update NEXT_WRITE_ID set nwi_next = " + Long.toString(writeId + numOfWriteIds) - + " where nwi_database = ? and nwi_table = ?"; + s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds) + + " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", @@ -1830,7 +1831,7 @@ // This is possible in case of first incremental repl after bootstrap where concurrent write // and drop table was performed at source during bootstrap dump. if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) { - s = "delete from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ?"; + s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute delete <" + s.replaceAll("\\?", "{}") + ">", @@ -1853,7 +1854,7 @@ // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows, + "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows, paramsList); for (PreparedStatement pst : insertPreparedStmts) { pst.execute(); @@ -1911,7 +1912,7 @@ // First allocation of write id should add the table to the next_write_id meta table // The initial value for write id should be 1 and hence we add 1 with number of write ids // allocated here - String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + String s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " + Long.toString(rqst.getSeeWriteId() + 1) + ")"; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(rqst.getDbName(), rqst.getTblName())); LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", @@ -1983,13 +1984,13 @@ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID"); + rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""); if(!rs.next()) { throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted"); } long highestAllocatedTxnId = rs.getLong(1); close(rs); - rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN)); + rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); if(!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -2002,14 +2003,14 @@ //the +1 is there because "delete ..." below has < (which is correct for the case when //there is an open txn //Concurrency: even if new txn starts (or starts + commits) it is still true that - //there are no currently open txns that overlap with any committed txn with + //there are no currently open txns that overlap with any committed txn with //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough. commitHighWaterMark = highestAllocatedTxnId + 1; } else { commitHighWaterMark = lowestOpenTxnId; } - int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark); + int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark); LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt); dbConn.commit(); } catch (SQLException ex) { @@ -2060,7 +2061,7 @@ List params = new ArrayList<>(); StringBuilder query = new StringBuilder(); // compose a query that select transactions containing an update... - query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND ("); + query.append("SELECT \"CTC_UPDATE_DELETE\" FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_UPDATE_DELETE\" ='Y' AND ("); int i = 0; for (String fullyQualifiedName : creationMetadata.getTablesUsed()) { ValidWriteIdList tblValidWriteIdList = @@ -2091,21 +2092,21 @@ } String[] names = TxnUtils.getDbTableName(fullyQualifiedName); assert(names.length == 2); - query.append(" (ctc_database=? AND ctc_table=?"); + query.append(" (\"CTC_DATABASE\"=? AND \"CTC_TABLE\"=?"); params.add(names[0]); params.add(names[1]); - query.append(" AND (ctc_writeid > " + tblValidWriteIdList.getHighWatermark()); + query.append(" AND (\"CTC_WRITEID\" > " + tblValidWriteIdList.getHighWatermark()); query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") " : - " OR ctc_writeid IN(" + StringUtils.join(",", + " OR \"CTC_WRITEID\" IN(" + StringUtils.join(",", Arrays.asList(ArrayUtils.toObject(tblValidWriteIdList.getInvalidWriteIds()))) + ") "); query.append(") "); i++; } // ... and where the transaction has already been committed as per snapshot taken // when we are running current query - query.append(") AND ctc_txnid <= " + currentValidTxnList.getHighWatermark()); + query.append(") AND \"CTC_TXNID\" <= " + currentValidTxnList.getHighWatermark()); query.append(currentValidTxnList.getInvalidTransactions().length == 0 ? " " : - " AND ctc_txnid NOT IN(" + StringUtils.join(",", + " AND \"CTC_TXNID\" NOT IN(" + StringUtils.join(",", Arrays.asList(ArrayUtils.toObject(currentValidTxnList.getInvalidTransactions()))) + ") "); // Execute query @@ -2150,8 +2151,8 @@ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); List params = Arrays.asList(dbName, tableName); - String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" + - " mrl_db_name = ? AND mrl_tbl_name = ?"; + String selectQ = "SELECT \"MRL_TXN_ID\" FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE" + + " \"MRL_DB_NAME\" = ? AND \"MRL_TBL_NAME\" = ?"; pst = sqlGenerator.prepareStmtWithParameters(dbConn, selectQ, params); LOG.debug("Going to execute query <" + selectQ.replaceAll("\\?", "{}") + ">", quoteString(dbName), quoteString(tableName)); @@ -2161,8 +2162,8 @@ " since it is already being rebuilt"); return new LockResponse(txnId, LockState.NOT_ACQUIRED); } - String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " + - "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId + + String insertQ = "INSERT INTO \"MATERIALIZATION_REBUILD_LOCKS\" " + + "(\"MRL_TXN_ID\", \"MRL_DB_NAME\", \"MRL_TBL_NAME\", \"MRL_LAST_HEARTBEAT\") VALUES (" + txnId + ", ?, ?, " + Instant.now().toEpochMilli() + ")"; closeStmt(pst); pst = sqlGenerator.prepareStmtWithParameters(dbConn, insertQ, params); @@ -2194,11 +2195,11 @@ try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - String s = "update MATERIALIZATION_REBUILD_LOCKS" + - " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() + - " where mrl_txn_id = " + txnId + - " AND mrl_db_name = ?" + - " AND mrl_tbl_name = ?"; + String s = "UPDATE \"MATERIALIZATION_REBUILD_LOCKS\"" + + " SET \"MRL_LAST_HEARTBEAT\" = " + Instant.now().toEpochMilli() + + " WHERE \"MRL_TXN_ID\" = " + txnId + + " AND \"MRL_DB_NAME\" = ?" + + " AND \"MRL_TBL_NAME\" = ?"; pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tableName)); LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", quoteString(dbName), quoteString(tableName)); @@ -2247,7 +2248,7 @@ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String selectQ = "select mrl_txn_id, mrl_last_heartbeat from MATERIALIZATION_REBUILD_LOCKS"; + String selectQ = "SELECT \"MRL_TXN_ID\", \"MRL_LAST_HEARTBEAT\" FROM \"MATERIALIZATION_REBUILD_LOCKS\""; LOG.debug("Going to execute query <" + selectQ + ">"); rs = stmt.executeQuery(selectQ); while(rs.next()) { @@ -2263,8 +2264,8 @@ } } if (!txnIds.isEmpty()) { - String deleteQ = "delete from MATERIALIZATION_REBUILD_LOCKS where" + - " mrl_txn_id IN(" + StringUtils.join(",", txnIds) + ") "; + String deleteQ = "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE" + + " \"MRL_TXN_ID\" IN(" + StringUtils.join(",", txnIds) + ") "; LOG.debug("Going to execute update <" + deleteQ + ">"); cnt = stmt.executeUpdate(deleteQ); } @@ -2291,9 +2292,9 @@ * connection (but separate transactions). This avoid some flakiness in BONECP where if you * perform an operation on 1 connection and immediately get another from the pool, the 2nd one * doesn't see results of the first. - * + * * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case - * there will be a duplicate set of locks but both sets will belong to the same txn so they + * there will be a duplicate set of locks but both sets will belong to the same txn so they * will not conflict with each other. For locks w/o txn context (i.e. read-only query), this * may lead to deadlock (at least a long wait). (e.g. 1st call creates locks in {@code LOCK_WAITING} * mode and response gets lost. Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} @@ -2334,8 +2335,8 @@ * @throws MetaException */ private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { - String query = "select TXN_TYPE from TXNS where TXN_ID = " + txnId - + (txnState != null ? " and TXN_STATE = " + quoteChar(txnState) : ""); + String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId + + (txnState != null ? " AND \"TXN_STATE\" = " + quoteChar(txnState) : ""); try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { return rs.next() ? new TxnRecord(rs.getInt(1)) : null; } @@ -2385,7 +2386,7 @@ * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ - String s = sqlGenerator.addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -2395,7 +2396,7 @@ "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); @@ -2443,7 +2444,7 @@ updateTxnComponents = false; break; default: - //since we have an open transaction, only 4 values above are expected + //since we have an open transaction, only 4 values above are expected throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); } @@ -2460,8 +2461,8 @@ // the acid tables. However, DDL operatons won't allocate write id and hence this query // may return empty result sets. // Get the write id allocated by this txn for the given table writes - s = "select t2w_writeid from TXN_TO_WRITE_ID where" - + " t2w_database = ? and t2w_table = ? and t2w_txnid = " + txnid; + s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(dbName), quoteString(tblName)); @@ -2486,7 +2487,7 @@ paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", + "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", rows, paramsList); for(PreparedStatement pst : insertPreparedStmts) { int modCount = pst.executeUpdate(); @@ -2556,9 +2557,9 @@ paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " + - "hl_table, hl_partition,hl_lock_state, hl_lock_type, " + - "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows, paramsList); + "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + + "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + + "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); for(PreparedStatement pst : insertPreparedStmts) { int modCount = pst.executeUpdate(); } @@ -2634,7 +2635,7 @@ * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking * against doesn't move from W to A in another txn) but this method can heartbeat in * separate txn at READ_COMMITTED. - * + * * Retry-by-caller note: * Retryable because {@link #checkLock(Connection, long)} is */ @@ -2710,8 +2711,8 @@ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); //hl_txnid <> 0 means it's associated with a transaction - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + - " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; + String s = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND (\"HL_TXNID\" = 0 OR" + + " (\"HL_TXNID\" <> 0 AND \"HL_LOCK_STATE\" = '" + LOCK_WAITING + "'))"; //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example LOG.debug("Going to execute update <" + s + ">"); @@ -2781,9 +2782,9 @@ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + - "hl_blockedby_ext_id, hl_blockedby_int_id, hl_agent_info from HIVE_LOCKS"; + String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", " + + "\"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_ACQUIRED_AT\", \"HL_USER\", \"HL_HOST\", \"HL_LOCK_INT_ID\"," + + "\"HL_BLOCKEDBY_EXT_ID\", \"HL_BLOCKEDBY_INT_ID\", \"HL_AGENT_INFO\" FROM \"HIVE_LOCKS\""; // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query. String dbName = rqst.getDbname(); @@ -2793,21 +2794,21 @@ StringBuilder filter = new StringBuilder(); if (dbName != null && !dbName.isEmpty()) { - filter.append("hl_db=?"); + filter.append("\"HL_DB\"=?"); params.add(dbName); } if (tableName != null && !tableName.isEmpty()) { if (filter.length() > 0) { filter.append(" and "); } - filter.append("hl_table=?"); + filter.append("\"HL_TABLE\"=?"); params.add(tableName); } if (partName != null && !partName.isEmpty()) { if (filter.length() > 0) { filter.append(" and "); } - filter.append("hl_partition=?"); + filter.append("\"HL_PARTITION\"=?"); params.add(partName); } String whereClause = filter.toString(); @@ -2935,9 +2936,9 @@ txnIds.add(txn); } TxnUtils.buildQueryWithINClause(conf, queries, - new StringBuilder("update TXNS set txn_last_heartbeat = " + getDbTime(dbConn) + - " where txn_state = " + quoteChar(TXN_OPEN) + " and "), - new StringBuilder(""), txnIds, "txn_id", true, false); + new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbTime(dbConn) + + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), + new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -2976,7 +2977,7 @@ long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException { // Get the id for the next entry in the queue - String s = sqlGenerator.addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + String s = sqlGenerator.addForUpdateClause("SELECT \"NCQ_NEXT\" FROM \"NEXT_COMPACTION_QUEUE_ID\""); LOG.debug("going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { if (!rs.next()) { @@ -2984,7 +2985,7 @@ + "no record found in next_compaction_queue_id"); } long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + s = "UPDATE \"NEXT_COMPACTION_QUEUE_ID\" SET \"NCQ_NEXT\" = " + (id + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); return id; @@ -3004,8 +3005,8 @@ */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - String query = "select t2w_txnid from TXN_TO_WRITE_ID where" - + " t2w_database = ? and t2w_table = ? and t2w_writeid = " + writeId; + String query = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId; pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, Arrays.asList(dbName, tblName)); LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">", quoteString(dbName), quoteString(tblName)); @@ -3053,17 +3054,17 @@ long id = generateCompactionQueueId(stmt); List params = new ArrayList<>(); - StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where"). - append(" cq_state IN(").append(quoteChar(INITIATED_STATE)). + StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE"). + append(" \"CQ_STATE\" IN(").append(quoteChar(INITIATED_STATE)). append(",").append(quoteChar(WORKING_STATE)). - append(") AND cq_database=?"). - append(" AND cq_table=?").append(" AND "); + append(") AND \"CQ_DATABASE\"=?"). + append(" AND \"CQ_TABLE\"=?").append(" AND "); params.add(rqst.getDbname()); params.add(rqst.getTablename()); if(rqst.getPartitionname() == null) { - sb.append("cq_partition is null"); + sb.append("\"CQ_PARTITION\" is null"); } else { - sb.append("cq_partition=?"); + sb.append("\"CQ_PARTITION\"=?"); params.add(rqst.getPartitionname()); } @@ -3081,15 +3082,15 @@ close(rs); closeStmt(pst); params.clear(); - StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); + StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " + + "\"CQ_TABLE\", "); String partName = rqst.getPartitionname(); - if (partName != null) buf.append("cq_partition, "); - buf.append("cq_state, cq_type"); + if (partName != null) buf.append("\"CQ_PARTITION\", "); + buf.append("\"CQ_STATE\", \"CQ_TYPE\""); if (rqst.getProperties() != null) { - buf.append(", cq_tblproperties"); + buf.append(", \"CQ_TBLPROPERTIES\""); } - if (rqst.getRunas() != null) buf.append(", cq_run_as"); + if (rqst.getRunas() != null) buf.append(", \"CQ_RUN_AS\""); buf.append(") values ("); buf.append(id); buf.append(", ?"); @@ -3139,7 +3140,7 @@ } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "compact(" + rqst + ")"); + checkRetryable(dbConn, e, "COMPACT(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -3177,11 +3178,11 @@ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + String s = "SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", \"CQ_STATE\", \"CQ_TYPE\", \"CQ_WORKER_ID\", " + //-1 because 'null' literal doesn't work for all DBs... - "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + - "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + - "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; //todo: sort by cq_id? + "\"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", \"CQ_HADOOP_JOB_ID\", \"CQ_ID\" FROM \"COMPACTION_QUEUE\" UNION ALL " + + "SELECT \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " + + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id? //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013) //to sort so that currently running jobs are at the end of the list (bottom of screen) //and currently running ones are in sorted by start time @@ -3283,7 +3284,7 @@ int modCount = 0; //record partitions that were written to insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", + "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", rows, paramsList); for(PreparedStatement pst : insertPreparedStmts) { modCount = pst.executeUpdate(); @@ -3348,37 +3349,37 @@ return; } - buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_DATABASE\"='"); buff.append(dbName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append("DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\"='"); buff.append(dbName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_DATABASE\"='"); buff.append(dbName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE \"CC_DATABASE\"='"); buff.append(dbName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from TXN_TO_WRITE_ID where t2w_database='"); + buff.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\"='"); buff.append(dbName.toLowerCase()); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from NEXT_WRITE_ID where nwi_database='"); + buff.append("DELETE FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\"='"); buff.append(dbName.toLowerCase()); buff.append("'"); queries.add(buff.toString()); @@ -3394,49 +3395,49 @@ return; } - buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_DATABASE\"='"); buff.append(dbName); - buff.append("' and tc_table='"); + buff.append("' AND \"TC_TABLE\"='"); buff.append(tblName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append("DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\"='"); buff.append(dbName); - buff.append("' and ctc_table='"); + buff.append("' AND \"CTC_TABLE\"='"); buff.append(tblName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_DATABASE\"='"); buff.append(dbName); - buff.append("' and cq_table='"); + buff.append("' AND \"CQ_TABLE\"='"); buff.append(tblName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE \"CC_DATABASE\"='"); buff.append(dbName); - buff.append("' and cc_table='"); + buff.append("' AND \"CC_TABLE\"='"); buff.append(tblName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from TXN_TO_WRITE_ID where t2w_database='"); + buff.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\"='"); buff.append(dbName.toLowerCase()); - buff.append("' and t2w_table='"); + buff.append("' AND \"T2W_TABLE\"='"); buff.append(tblName.toLowerCase()); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from NEXT_WRITE_ID where nwi_database='"); + buff.append("DELETE FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\"='"); buff.append(dbName.toLowerCase()); - buff.append("' and nwi_table='"); + buff.append("' AND \"NWI_TABLE\"='"); buff.append(tblName.toLowerCase()); buff.append("'"); queries.add(buff.toString()); @@ -3461,41 +3462,41 @@ partVals = p.getValues(); partName = Warehouse.makePartName(partCols, partVals); - buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_DATABASE\"='"); buff.append(dbName); - buff.append("' and tc_table='"); + buff.append("' AND \"TC_TABLE\"='"); buff.append(tblName); - buff.append("' and tc_partition='"); + buff.append("' AND \"TC_PARTITION\"='"); buff.append(partName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append("DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\"='"); buff.append(dbName); - buff.append("' and ctc_table='"); + buff.append("' AND \"CTC_TABLE\"='"); buff.append(tblName); - buff.append("' and ctc_partition='"); + buff.append("' AND \"CTC_PARTITION\"='"); buff.append(partName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_DATABASE\"='"); buff.append(dbName); - buff.append("' and cq_table='"); + buff.append("' AND \"CQ_TABLE\"='"); buff.append(tblName); - buff.append("' and cq_partition='"); + buff.append("' AND \"CQ_PARTITION\"='"); buff.append(partName); buff.append("'"); queries.add(buff.toString()); buff.setLength(0); - buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE \"CC_DATABASE\"='"); buff.append(dbName); - buff.append("' and cc_table='"); + buff.append("' AND \"CC_TABLE\"='"); buff.append(tblName); - buff.append("' and cc_partition='"); + buff.append("' AND \"CC_PARTITION\"='"); buff.append(partName); buff.append("'"); queries.add(buff.toString()); @@ -3562,31 +3563,31 @@ stmt = dbConn.createStatement(); List queries = new ArrayList<>(); - String update = "update TXN_COMPONENTS set "; - String where = " where "; + String update = "UPDATE \"TXN_COMPONENTS\" SET "; + String where = " WHERE "; if(oldPartName != null) { - update += "TC_PARTITION = " + quoteString(newPartName) + ", "; - where += "TC_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"TC_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"TC_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "TC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "TC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"TC_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"TC_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "TC_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "TC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"TC_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"TC_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update COMPLETED_TXN_COMPONENTS set "; - where = " where "; + update = "UPDATE \"COMPLETED_TXN_COMPONENTS\" SET "; + where = " WHERE "; if(oldPartName != null) { - update += "CTC_PARTITION = " + quoteString(newPartName) + ", "; - where += "CTC_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"CTC_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"CTC_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "CTC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "CTC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"CTC_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"CTC_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { update += "CTC_DATABASE = " + quoteString(normalizeCase(newDbName)); @@ -3594,91 +3595,91 @@ } queries.add(update + where); - update = "update HIVE_LOCKS set "; - where = " where "; + update = "UPDATE \"HIVE_LOCKS\" SET "; + where = " WHERE "; if(oldPartName != null) { - update += "HL_PARTITION = " + quoteString(newPartName) + ", "; - where += "HL_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"HL_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"HL_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "HL_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "HL_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"HL_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"HL_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "HL_DB = " + quoteString(normalizeCase(newDbName)); - where += "HL_DB = " + quoteString(normalizeCase(oldDbName)); + update += "\"HL_DB\" = " + quoteString(normalizeCase(newDbName)); + where += "\"HL_DB\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update COMPACTION_QUEUE set "; - where = " where "; + update = "UPDATE \"COMPACTION_QUEUE\" SET "; + where = " WHERE "; if(oldPartName != null) { - update += "CQ_PARTITION = " + quoteString(newPartName) + ", "; - where += "CQ_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"CQ_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"CQ_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "CQ_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "CQ_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"CQ_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"CQ_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "CQ_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"CQ_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"CQ_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update COMPLETED_COMPACTIONS set "; - where = " where "; + update = "UPDATE \"COMPLETED_COMPACTIONS\" SET "; + where = " WHERE "; if(oldPartName != null) { - update += "CC_PARTITION = " + quoteString(newPartName) + ", "; - where += "CC_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"CC_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"CC_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "CC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "CC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"CC_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"CC_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "CC_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "CC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"CC_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"CC_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update WRITE_SET set "; - where = " where "; + update = "UPDATE \"WRITE_SET\" SET "; + where = " WHERE "; if(oldPartName != null) { - update += "WS_PARTITION = " + quoteString(newPartName) + ", "; - where += "WS_PARTITION = " + quoteString(oldPartName) + " AND "; + update += "\"WS_PARTITION\" = " + quoteString(newPartName) + ", "; + where += "\"WS_PARTITION\" = " + quoteString(oldPartName) + " AND "; } if(oldTabName != null) { - update += "WS_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "WS_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"WS_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"WS_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "WS_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "WS_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"WS_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"WS_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update TXN_TO_WRITE_ID set "; - where = " where "; + update = "UPDATE \"TXN_TO_WRITE_ID\" SET "; + where = " WHERE "; if(oldTabName != null) { - update += "T2W_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "T2W_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"T2W_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"T2W_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "T2W_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "T2W_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"T2W_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"T2W_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); - update = "update NEXT_WRITE_ID set "; - where = " where "; + update = "UPDATE \"NEXT_WRITE_ID\" SET "; + where = " WHERE "; if(oldTabName != null) { - update += "NWI_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; - where += "NWI_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + update += "\"NWI_TABLE\" = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "\"NWI_TABLE\" = " + quoteString(normalizeCase(oldTabName)) + " AND "; } if(oldDbName != null) { - update += "NWI_DATABASE = " + quoteString(normalizeCase(newDbName)); - where += "NWI_DATABASE = " + quoteString(normalizeCase(oldDbName)); + update += "\"NWI_DATABASE\" = " + quoteString(normalizeCase(newDbName)); + where += "\"NWI_DATABASE\" = " + quoteString(normalizeCase(oldDbName)); } queries.add(update + where); @@ -3718,7 +3719,7 @@ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select count(*) from HIVE_LOCKS"; + String s = "SELECT COUNT(*) FROM \"HIVE_LOCKS\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); rs.next(); @@ -4009,27 +4010,27 @@ // Assumes the result set is set to a valid row LockInfo(ResultSet rs) throws SQLException, MetaException { - extLockId = rs.getLong("hl_lock_ext_id"); // can't be null - intLockId = rs.getLong("hl_lock_int_id"); // can't be null - db = rs.getString("hl_db"); // can't be null - String t = rs.getString("hl_table"); + extLockId = rs.getLong("HL_LOCK_EXT_ID"); // can't be null + intLockId = rs.getLong("HL_LOCK_INT_ID"); // can't be null + db = rs.getString("HL_DB"); // can't be null + String t = rs.getString("HL_TABLE"); table = (rs.wasNull() ? null : t); - String p = rs.getString("hl_partition"); + String p = rs.getString("HL_PARTITION"); partition = (rs.wasNull() ? null : p); - switch (rs.getString("hl_lock_state").charAt(0)) { + switch (rs.getString("HL_LOCK_STATE").charAt(0)) { case LOCK_WAITING: state = LockState.WAITING; break; case LOCK_ACQUIRED: state = LockState.ACQUIRED; break; default: - throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0)); + throw new MetaException("Unknown lock state " + rs.getString("HL_LOCK_STATE").charAt(0)); } - switch (rs.getString("hl_lock_type").charAt(0)) { + switch (rs.getString("HL_LOCK_TYPE").charAt(0)) { case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break; case LOCK_SHARED: type = LockType.SHARED_READ; break; case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break; default: - throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); + throw new MetaException("Unknown lock type " + rs.getString("HL_LOCK_TYPE").charAt(0)); } - txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL + txnId = rs.getLong("HL_TXNID");//returns 0 if value is NULL } LockInfo(ShowLocksResponseElement e) { extLockId = e.getLockid(); @@ -4213,15 +4214,15 @@ StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("update TXNS set txn_state = " + quoteChar(TXN_ABORTED) + - " where txn_state = " + quoteChar(TXN_OPEN) + " and "); + prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "); if(max_heartbeat > 0) { - suffix.append(" and txn_last_heartbeat < ").append(max_heartbeat); + suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(max_heartbeat); } else { suffix.append(""); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -4236,10 +4237,10 @@ prefix.setLength(0); suffix.setLength(0); - prefix.append("delete from MIN_HISTORY_LEVEL where "); + prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "mhl_txnid", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -4262,10 +4263,10 @@ prefix.setLength(0); suffix.setLength(0); - prefix.append("delete from HIVE_LOCKS where "); + prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -4282,8 +4283,8 @@ return txnId != 0; } /** - * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller * hl_lock_ext_id by only checking earlier locks. + * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller * * For any given SQL statement all locks required by it are grouped under single extLockId and are * granted all at once or all locks wait. @@ -4323,9 +4324,9 @@ LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); Savepoint save = dbConn.setSavepoint(); - StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); + StringBuilder query = new StringBuilder("SELECT \"HL_LOCK_EXT_ID\", " + + "\"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", " + + "\"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE \"HL_DB\" IN ("); Set strings = new HashSet<>(locksBeingChecked.size()); @@ -4344,13 +4345,13 @@ throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid"); } stmt = dbConn.createStatement(); - StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " + - "ws_txnid, ws_commit_id " + - "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality + StringBuilder sb = new StringBuilder(" \"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\", " + + "\"WS_TXNID\", \"WS_COMMIT_ID\" " + + "FROM \"WRITE_SET\" WHERE WS_COMMIT_ID >= " + writeSet.get(0).txnId + " AND (");//see commitTxn() for more info on this inequality for(LockInfo info : writeSet) { - sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ") - .append(quoteString(info.table)).append(" and ws_partition ") - .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or "); + sb.append("(\"WS_DATABASE\" = ").append(quoteString(info.db)).append(" AND \"WS_TABLE\" = ") + .append(quoteString(info.table)).append(" AND \"WS_PARTITION\" ") + .append(info.partition == null ? "IS NULL" : "= " + quoteString(info.partition)).append(") OR "); } sb.setLength(sb.length() - 4);//nuke trailing " or " sb.append(")"); @@ -4414,7 +4415,7 @@ } } if (!sawNull) { - query.append(" and (hl_table is null or hl_table in("); + query.append(" AND (\"HL_TABLE\" IS NULL OR \"HL_TABLE\" IN("); first = true; for (String s : strings) { if (first) first = false; @@ -4438,7 +4439,7 @@ } } if (!sawNull) { - query.append(" and (hl_partition is null or hl_partition in("); + query.append(" AND (\"HL_PARTITION\" IS NULL OR \"HL_PARTITION\" IN("); first = true; for (String s : strings) { if (first) first = false; @@ -4450,7 +4451,7 @@ query.append("))"); } } - query.append(" and hl_lock_ext_id < ").append(extLockId); + query.append(" AND \"HL_LOCK_EXT_ID\" < ").append(extLockId); LOG.debug("Going to execute query <" + query.toString() + ">"); stmt = dbConn.createStatement(); @@ -4516,10 +4517,10 @@ * this (metastore db) transaction and then we record which lock blocked the lock * we were testing ('info').*/ wait(dbConn, save); - String sqlText = "update HIVE_LOCKS" + - " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId + - ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId + - " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId; + String sqlText = "UPDATE \"HIVE_LOCKS\"" + + " SET \"HL_BLOCKEDBY_EXT_ID\"=" + locks[i].extLockId + + ", \"HL_BLOCKEDBY_INT_ID\"=" + locks[i].intLockId + + " WHERE \"HL_LOCK_EXT_ID\"=" + info.extLockId + " AND \"HL_LOCK_INT_ID\"=" + info.intLockId; LOG.debug("Executing sql: " + sqlText); int updCnt = stmt.executeUpdate(sqlText); if(updCnt != 1) { @@ -4564,11 +4565,11 @@ long txnId = locksBeingChecked.get(0).txnId; long extLockId = locksBeingChecked.get(0).extLockId; long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + + String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " + //if lock is part of txn, heartbeat info is in txn record - "hl_last_heartbeat = " + (isValidTxn(txnId) ? 0 : now) + - ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + - " where hl_lock_ext_id = " + extLockId; + "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : now) + + ", \"HL_ACQUIRED_AT\" = " + now + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < locksBeingChecked.size()) { @@ -4576,7 +4577,7 @@ dbConn.rollback(); /*select all locks for this ext ID and see which ones are missing*/ StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); - ResultSet rs = stmt.executeQuery("select hl_lock_int_id from HIVE_LOCKS where hl_lock_ext_id = " + extLockId); + ResultSet rs = stmt.executeQuery("SELECT \"HL_LOCK_INT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId); while(rs.next()) { int intLockId = rs.getInt(1); int idx = 0; @@ -4653,8 +4654,8 @@ stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; + String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + + now + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4678,8 +4679,8 @@ try { stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - String s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'"; + String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + now + + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4701,12 +4702,12 @@ * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) */ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, MetaException { - String s = "select txn_state from TXNS where txn_id = " + txnid; + String s = "SELECT \"TXN_STATE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { if (!rs.next()) { s = - sqlGenerator.addLimitClause(1, "1 from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + sqlGenerator.addLimitClause(1, "1 FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_TXNID\" = " + txnid); LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs2 = stmt.executeQuery(s)) { @@ -4738,11 +4739,11 @@ // Get the count of txns from the given list that are in open state and not read-only. // If the returned count is same as the input number of txns, then all txns are in open state and not read-only. - prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN - + "' and txn_type != " + TxnType.READ_ONLY.getValue() + " and "); + prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + + "' AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), - txnIds, "txn_id", false, false); + txnIds, "\"TXN_ID\"", false, false); long count = 0; for (String query : queries) { @@ -4766,9 +4767,9 @@ StringBuilder prefix = new StringBuilder(); // Check if any of the txns in the list are either aborted or read-only. - prefix.append("select txn_id, txn_state, txn_type from TXNS where "); + prefix.append("SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), - txnIds, "txn_id", false, false); + txnIds, "\"TXN_ID\"", false, false); StringBuilder txnInfo = new StringBuilder(); for (String query : queries) { @@ -4800,9 +4801,9 @@ StringBuilder prefix = new StringBuilder(); // Check if any of the txns in the list are committed. - prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where "); + prefix.append("SELECT \"CTC_TXNID\" FROM \"COMPLETED_TXN_COMPONENTS\" WHERE "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), - txnIds, "ctc_txnid", false, false); + txnIds, "\"CTC_TXNID\"", false, false); StringBuilder txnInfo = new StringBuilder(); for (String query : queries) { @@ -4821,7 +4822,7 @@ * Used to raise an informative error when the caller expected a txn in a particular TxnStatus * but found it in some other status */ - private static void raiseTxnUnexpectedState(TxnStatus actualStatus, long txnid) + private static void raiseTxnUnexpectedState(TxnStatus actualStatus, long txnid) throws NoSuchTxnException, TxnAbortedException { switch (actualStatus) { case ABORTED: @@ -4842,12 +4843,12 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt) throws SQLException, NoSuchTxnException, TxnAbortedException { // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; + String s = "SELECT \"TXN_STATE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid; LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs = stmt.executeQuery(s)) { if (!rs.next()) { // todo: add LIMIT 1 instead of count - should be more efficient - s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; + s = "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_TXNID\" = " + txnid; try (ResultSet rs2 = stmt.executeQuery(s)) { // todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but // only @@ -4879,9 +4880,9 @@ ResultSet rs = null; try { stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + + "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + + "\"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -4902,9 +4903,9 @@ Statement stmt = null; try { stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + String s = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", " + + "\"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE " + + "\"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); boolean sawAtLeastOne = false; @@ -4934,8 +4935,8 @@ stmt = dbConn.createStatement(); long maxHeartbeatTime = now - timeout; //doing a SELECT first is less efficient but makes it easier to debug things - String s = "select distinct hl_lock_ext_id from HIVE_LOCKS where hl_last_heartbeat < " + - maxHeartbeatTime + " and hl_txnid = 0";//when txnid is <> 0, the lock is + String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + + maxHeartbeatTime + " AND \"HL_TXNID\" = 0";//when txnid is <> 0, the lock is //associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself List extLockIDs = new ArrayList<>(); @@ -4955,12 +4956,12 @@ StringBuilder suffix = new StringBuilder(); //include same hl_last_heartbeat condition in case someone heartbeated since the select - prefix.append("delete from HIVE_LOCKS where hl_last_heartbeat < "); + prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < "); prefix.append(maxHeartbeatTime); - prefix.append(" and hl_txnid = 0 and "); + prefix.append(" AND \"HL_TXNID\" = 0 AND "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); int deletedLocks = 0; for (String query : queries) { @@ -5014,8 +5015,8 @@ timeOutLocks(dbConn, now); while(true) { stmt = dbConn.createStatement(); - String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout) + " and txn_type != " + TxnType.REPL_CREATED.getValue(); + String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + + "' AND \"TXN_LAST_HEARTBEAT\" < " + (now - timeout) + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); LOG.debug("Going to execute query <" + s + ">"); @@ -5075,7 +5076,7 @@ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -5367,7 +5368,7 @@ ResultSet rs = null; try { try { - String sqlStmt = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0"); + String sqlStmt = sqlGenerator.addForUpdateClause("SELECT \"MT_COMMENT\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\"=" + quoteString(key) + " and \"MT_KEY2\"=0"); lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); stmt = dbConn.createStatement(); @@ -5378,7 +5379,7 @@ if (!rs.next()) { close(rs); try { - stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)"); + stmt.executeUpdate("INSERT INTO \"AUX_TABLE\" (\"MT_KEY1\", \"MT_KEY2\") VALUES(" + quoteString(key) + ", 0)"); dbConn.commit(); } catch (SQLException ex) { if (!isDuplicateKeyError(ex)) { @@ -5449,7 +5450,7 @@ //would need a list of (stmt,rs) pairs - 1 for each key throw new NotImplementedException(); } - + @Override public void releaseLocks() { rollbackDBConn(dbConn); Index: ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (revision 01faca2f9d7dcb0f5feabfcb07fa5ea12b79c5b9) +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (date 1578316443456) @@ -1388,12 +1388,12 @@ Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = conn.createStatement(); long now = tHndlr.getDbTime(conn); - stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " + + stmt.executeUpdate("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " + "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " + "'scooby.com')"); - stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " + - "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " + - "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" + + stmt.executeUpdate("INSERT INTO \"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", " + + "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", " + + "\"HL_USER\", \"HL_HOST\") VALUES (1, 1, 1, 'MYDB', 'MYTABLE', 'MYPARTITION', '" + tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " + "'scooby.com')"); conn.commit(); @@ -1496,7 +1496,7 @@ * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive"); * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack() - * + * */ @Ignore("multiple threads wedge Derby") @Test @@ -1505,7 +1505,7 @@ final AtomicInteger stepTracker = new AtomicInteger(0); /** * counter = 0; - * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock + * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock * Thread2 counter=2, lock (and block), inc counter, should be 4 */ Thread t1 = new Thread("MutexTest1") { @@ -1600,8 +1600,8 @@ OpenTxnsResponse openedTxns = txnHandler.openTxns(rqst); List txnList = openedTxns.getTxn_ids(); assertEquals(txnList.size(), numTxn); - int numTxnPresentNow = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXNS where TXN_ID >= " + - txnList.get(0) + " and TXN_ID <= " + txnList.get(numTxn - 1)); + int numTxnPresentNow = TxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_ID\" >= " + + txnList.get(0) + " and \"TXN_ID\" <= " + txnList.get(numTxn - 1)); assertEquals(numTxn, numTxnPresentNow); checkReplTxnForTest(startId, lastId, replPolicy, txnList); @@ -1620,9 +1620,9 @@ private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPolicy, List targetTxnId) throws Exception { - String[] output = TxnDbUtil.queryToString(conf, "select RTM_TARGET_TXN_ID from REPL_TXN_MAP where " + - " RTM_SRC_TXN_ID >= " + startTxnId + "and RTM_SRC_TXN_ID <= " + endTxnId + - " and RTM_REPL_POLICY = \'" + replPolicy + "\'").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT \"RTM_TARGET_TXN_ID\" FROM \"REPL_TXN_MAP\" WHERE " + + " \"RTM_SRC_TXN_ID\" >= " + startTxnId + "AND \"RTM_SRC_TXN_ID\" <= " + endTxnId + + " AND \"RTM_REPL_POLICY\" = \'" + replPolicy + "\'").split("\n"); assertEquals(output.length - 1, targetTxnId.size()); for (int idx = 1; idx < output.length; idx++) { long txnId = Long.parseLong(output[idx].trim()); @@ -1633,7 +1633,7 @@ @Test public void testReplOpenTxn() throws Exception { int numTxn = 50000; - String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); List txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*"); assert(txnList.size() == numTxn); @@ -1643,7 +1643,7 @@ @Test public void testReplAllocWriteId() throws Exception { int numTxn = 2; - String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n"); + String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n"); long startTxnId = Long.parseLong(output[1].trim()); List srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1) .boxed().collect(Collectors.toList()); @@ -1705,12 +1705,12 @@ private void updateTxns(Connection conn) throws SQLException { Statement stmt = conn.createStatement(); - stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1"); + stmt.executeUpdate("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = \"TXN_LAST_HEARTBEAT\" + 1"); } private void updateLocks(Connection conn) throws SQLException { Statement stmt = conn.createStatement(); - stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1"); + stmt.executeUpdate("UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = \"HL_LAST_HEARTBEAT\" + 1"); } @Before