diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index aded6f5486..45e1d1d880 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -75,20 +75,20 @@ public CompactionTxnHandler() { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " + - "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ? - "left join ( " + - " select c1.* from COMPLETED_COMPACTIONS c1 " + - " inner join ( " + - " select max(cc_id) cc_id from COMPLETED_COMPACTIONS " + - " group by cc_database, cc_table, cc_partition" + - " ) c2 " + - " on c1.cc_id = c2.cc_id " + - " where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + - ") c " + - "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " + - " and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " + - "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : ""); + String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " + + "FROM \"COMPLETED_TXN_COMPONENTS\" TC " + (checkInterval > 0 ? + "LEFT JOIN ( " + + " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + + " INNER JOIN ( " + + " SELECT MAX(\"CC_ID\") \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\" " + + " GROUP BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\"" + + " ) \"C2\" " + + " ON \"C1\".\"CC_ID\" = \"C2\".\"CC_ID\" " + + " WHERE \"C1\".\"CC_STATE\" IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + + ") \"C\" " + + "ON \"TC\".\"CTC_DATABASE\" = \"C\".\"CC_DATABASE\" AND \"TC\".\"CTC_TABLE\" = \"C\".\"CC_TABLE\" " + + " AND (\"TC\".\"CTC_PARTITION\" = \"C\".\"CC_PARTITION\" OR (\"TC\".\"CTC_PARTITION\" IS NULL AND \"C\".\"CC_PARTITION\" IS NULL)) " + + "WHERE \"C\".\"CC_ID\" IS NOT NULL OR " + isWithinCheckInterval("\"TC\".\"CTC_TIMESTAMP\"", checkInterval) : ""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -102,11 +102,11 @@ public CompactionTxnHandler() { rs.close(); // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + - "from TXNS, TXN_COMPONENTS " + - "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + - "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + abortedThreshold; + s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + + "FROM \"TXNS\", \"TXN_COMPONENTS\" " + + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + + "HAVING COUNT(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -151,8 +151,8 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -171,9 +171,9 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + - " AND cq_state='" + INITIATED_STATE + "'"; + s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + workerId + "', " + + "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + + " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); int updCount = updStmt.executeUpdate(s); if(updCount == 1) { @@ -221,8 +221,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + + "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -265,8 +265,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " - + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -333,7 +333,7 @@ public long findMinOpenTxnId() throws MetaException { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -341,7 +341,7 @@ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLExceptio "initialized, no record found in next_txn_id"); } long hwm = rs.getLong(1); - s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); rs.next(); @@ -369,7 +369,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " + + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\" " + + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -389,7 +392,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", " + + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", " + + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\")" + + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -397,13 +403,13 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest write ID include in this compaction job. //highestWriteId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + - "ctc_table = ?"; + s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND " + + "\"CTC_TABLE\" = ?"; if (info.partName != null) { - s += " and ctc_partition = ?"; + s += " AND \"CTC_PARTITION\" = ?"; } if(info.highestWriteId != 0) { - s += " and ctc_writeid <= ?"; + s += " AND \"CTC_WRITEID\" <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; @@ -426,10 +432,10 @@ public void markCleaned(CompactionInfo info) throws MetaException { * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ - s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; - if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; - if (info.partName != null) s += " and tc_partition = ?"; + s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " + + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; + if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; + if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; @@ -459,18 +465,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("delete from TXN_COMPONENTS where "); + prefix.append("DELETE FROM \"TXN_COMPONENTS\" WHERE "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = ?"); - suffix.append(" and tc_table = ?"); + suffix.append(" AND \"TC_DATABASE\" = ?"); + suffix.append(" AND \"TC_TABLE\" = ?"); if (info.partName != null) { - suffix.append(" and tc_partition = ?"); + suffix.append(" AND \"TC_PARTITION\" = ?"); } // Populate the complete query with provided prefix and suffix List counts = TxnUtils - .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "\"TC_TXNID\"", true, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { @@ -539,7 +545,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - String s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); + String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (rs.next()) { @@ -550,7 +556,7 @@ public void cleanTxnToWriteIdTable() throws MetaException { } // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; + s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); @@ -589,9 +595,9 @@ public void cleanEmptyAbortedTxns() throws MetaException { //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + + "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List txnids = new ArrayList<>(); @@ -607,10 +613,10 @@ public void cleanEmptyAbortedTxns() throws MetaException { StringBuilder suffix = new StringBuilder(); // Delete from TXNS. - prefix.append("delete from TXNS where "); + prefix.append("DELETE FROM \"TXNS\" WHERE "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -653,8 +659,8 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" + + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_WORKER_ID\" LIKE '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -698,8 +704,8 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" + + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_START\" < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -795,9 +801,9 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + - ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + - " WHERE CQ_ID = " + ci.id; + String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + + ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + + " WHERE \"CQ_ID\" = " + ci.id; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -813,13 +819,13 @@ public void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws * a new write id (so as not to invalidate result set caches/materialized views) but * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to * the level to which aborted files/data has been cleaned.*/ - sqlText = "insert into TXN_COMPONENTS(" + - "TC_TXNID, " + - "TC_DATABASE, " + - "TC_TABLE, " + - (ci.partName == null ? "" : "TC_PARTITION, ") + - "TC_WRITEID, " + - "TC_OPERATION_TYPE)" + + sqlText = "INSERT INTO \"TXN_COMPONENTS\"(" + + "\"TC_TXNID\", " + + "\"TC_DATABASE\", " + + "\"TC_TABLE\", " + + (ci.partName == null ? "" : "\"TC_PARTITION\", ") + + "\"TC_WRITEID\", " + + "\"TC_OPERATION_TYPE\")" + " VALUES(" + compactionTxnId + "," + quoteString(ci.dbname) + "," + @@ -902,8 +908,8 @@ public void purgeCompactionHistory() throws MetaException { stmt = dbConn.createStatement(); /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, thus this query groups by entity and withing group sorts most recent first*/ - rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + - "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); + rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\" " + + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_ID\" DESC"); String lastCompactedEntity = null; /*In each group, walk from most recent and count occurences of each state type. Once you * have counted enough (for each state) to satisfy retention policy, delete all other @@ -929,14 +935,15 @@ public void purgeCompactionHistory() throws MetaException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("delete from COMPLETED_COMPACTIONS where "); + prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE "); suffix.append(""); List questions = new ArrayList<>(deleteSet.size()); for (int i = 0; i < deleteSet.size(); i++) { questions.add("?"); } - List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + List counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, + "\"CC_ID\"", false, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { String query = queries.get(i); @@ -998,11 +1005,11 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = ? and " + - "CC_TABLE = ? " + - (ci.partName != null ? "and CC_PARTITION = ?" : "") + - " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\" FROM \"COMPLETED_COMPACTIONS\" WHERE " + + "\"CC_DATABASE\" = ? AND " + + "\"CC_TABLE\" = ? " + + (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") + + " AND \"CC_STATE\" != " + quoteChar(ATTEMPTED_STATE) + " ORDER BY \"CC_ID\" DESC"); pStmt.setString(1, ci.dbname); pStmt.setString(2, ci.tableName); if (ci.partName != null) { @@ -1054,12 +1061,15 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " + + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\" " + + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + String s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; pStmt = dbConn.prepareStatement(s); pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); @@ -1088,7 +1098,10 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho close(rs, stmt, null); closeStmt(pStmt); - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\" " + + "(\"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", " + + "\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\"," + + " \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\") VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit"); @@ -1122,7 +1135,8 @@ public void setHadoopJobId(String hadoopJobId, long id) { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + quoteString(hadoopJobId) + + " WHERE \"CQ_ID\" = " + id; LOG.debug("Going to execute <" + s + ">"); int updateCount = stmt.executeUpdate(s); LOG.debug("Going to commit");