diff --git metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql index 2ebd3b0..20c93d6 100644 --- metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql +++ metastore/scripts/upgrade/postgres/hive-txn-schema-0.13.0.postgres.sql @@ -17,73 +17,73 @@ -- Tables for transaction management -- -CREATE TABLE TXNS ( - TXN_ID bigint PRIMARY KEY, - TXN_STATE char(1) NOT NULL, - TXN_STARTED bigint NOT NULL, - TXN_LAST_HEARTBEAT bigint NOT NULL, - TXN_USER varchar(128) NOT NULL, - TXN_HOST varchar(128) NOT NULL +CREATE TABLE "TXNS" ( + "TXN_ID" bigint PRIMARY KEY, + "TXN_STATE" char(1) NOT NULL, + "TXN_STARTED" bigint NOT NULL, + "TXN_LAST_HEARTBEAT" bigint NOT NULL, + "TXN_USER" varchar(128) NOT NULL, + "TXN_HOST" varchar(128) NOT NULL ); -CREATE TABLE TXN_COMPONENTS ( - TC_TXNID bigint REFERENCES TXNS (TXN_ID), - TC_DATABASE varchar(128) NOT NULL, - TC_TABLE varchar(128), - TC_PARTITION varchar(767) DEFAULT NULL +CREATE TABLE "TXN_COMPONENTS" ( + "TC_TXNID" bigint REFERENCES "TXNS" ("TXN_ID"), + "TC_DATABASE" varchar(128) NOT NULL, + "TC_TABLE" varchar(128), + "TC_PARTITION" varchar(767) DEFAULT NULL ); -CREATE TABLE COMPLETED_TXN_COMPONENTS ( - CTC_TXNID bigint, - CTC_DATABASE varchar(128) NOT NULL, - CTC_TABLE varchar(128), - CTC_PARTITION varchar(767) +CREATE TABLE "COMPLETED_TXN_COMPONENTS" ( + "CTC_TXNID" bigint, + "CTC_DATABASE" varchar(128) NOT NULL, + "CTC_TABLE" varchar(128), + "CTC_PARTITION" varchar(767) ); -CREATE TABLE NEXT_TXN_ID ( - NTXN_NEXT bigint NOT NULL +CREATE TABLE "NEXT_TXN_ID" ( + "NTXN_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_TXN_ID VALUES(1); +INSERT INTO "NEXT_TXN_ID" VALUES(1); -CREATE TABLE HIVE_LOCKS ( - HL_LOCK_EXT_ID bigint NOT NULL, - HL_LOCK_INT_ID bigint NOT NULL, - HL_TXNID bigint, - HL_DB varchar(128) NOT NULL, - HL_TABLE varchar(128), - HL_PARTITION varchar(767) DEFAULT NULL, - HL_LOCK_STATE char(1) NOT NULL, - HL_LOCK_TYPE char(1) NOT NULL, - HL_LAST_HEARTBEAT bigint NOT NULL, - HL_ACQUIRED_AT bigint, - HL_USER varchar(128) NOT NULL, - HL_HOST varchar(128) NOT NULL, - PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +CREATE TABLE "HIVE_LOCKS" ( + "HL_LOCK_EXT_ID" bigint NOT NULL, + "HL_LOCK_INT_ID" bigint NOT NULL, + "HL_TXNID" bigint, + "HL_DB" varchar(128) NOT NULL, + "HL_TABLE" varchar(128), + "HL_PARTITION" varchar(767) DEFAULT NULL, + "HL_LOCK_STATE" char(1) NOT NULL, + "HL_LOCK_TYPE" char(1) NOT NULL, + "HL_LAST_HEARTBEAT" bigint NOT NULL, + "HL_ACQUIRED_AT" bigint, + "HL_USER" varchar(128) NOT NULL, + "HL_HOST" varchar(128) NOT NULL, + PRIMARY KEY("HL_LOCK_EXT_ID", "HL_LOCK_INT_ID") ); -CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS USING hash (HL_TXNID); +CREATE INDEX "HL_TXNID_INDEX" ON "HIVE_LOCKS" USING hash ("HL_TXNID"); -CREATE TABLE NEXT_LOCK_ID ( - NL_NEXT bigint NOT NULL +CREATE TABLE "NEXT_LOCK_ID" ( + "NL_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_LOCK_ID VALUES(1); +INSERT INTO "NEXT_LOCK_ID" VALUES(1); -CREATE TABLE COMPACTION_QUEUE ( - CQ_ID bigint PRIMARY KEY, - CQ_DATABASE varchar(128) NOT NULL, - CQ_TABLE varchar(128) NOT NULL, - CQ_PARTITION varchar(767), - CQ_STATE char(1) NOT NULL, - CQ_TYPE char(1) NOT NULL, - CQ_WORKER_ID varchar(128), - CQ_START bigint, - CQ_RUN_AS varchar(128) +CREATE TABLE "COMPACTION_QUEUE" ( + "CQ_ID" bigint PRIMARY KEY, + "CQ_DATABASE" varchar(128) NOT NULL, + "CQ_TABLE" varchar(128) NOT NULL, + "CQ_PARTITION" varchar(767), + "CQ_STATE" char(1) NOT NULL, + "CQ_TYPE" char(1) NOT NULL, + "CQ_WORKER_ID" varchar(128), + "CQ_START" bigint, + "CQ_RUN_AS" varchar(128) ); -CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( - NCQ_NEXT bigint NOT NULL +CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( + "NCQ_NEXT" bigint NOT NULL ); -INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); +INSERT INTO "NEXT_COMPACTION_QUEUE_ID" VALUES(1); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d3aa66f..fa46e75 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -56,10 +56,14 @@ public CompactionTxnHandler(HiveConf conf) { Set response = new HashSet(); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + - "ctc_partition from COMPLETED_TXN_COMPONENTS"; + String s = "SELECT DISTINCT " + + TxnDbUtil.getEscape("CTC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CTC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CTC_PARTITION", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + ""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -71,11 +75,21 @@ public CompactionTxnHandler(HiveConf conf) { } // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + - "from TXNS, TXN_COMPONENTS " + - "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + - "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + maxAborted; + s = "SELECT " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + "FROM " + + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + + " AND " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' " + + "GROUP BY " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + "HAVING COUNT(*) > " + maxAborted; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -110,8 +124,11 @@ public void setRunAs(long cq_id, String user) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " SET " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " = '" + user + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -152,9 +169,15 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + String s = "SELECT " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -174,8 +197,12 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; + s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = '" + workerId + "', " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = " + now + ", " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -216,9 +243,13 @@ public void markCompacted(CompactionInfo info) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + + " SET " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + READY_FOR_CLEANING + "', " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null " + + " WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -259,9 +290,17 @@ public void markCompacted(CompactionInfo info) throws MetaException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; + String s = "SELECT " + + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -306,8 +345,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + // Check for completed transactions + String s = "DELETE FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + " = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to delete compaction record"); @@ -317,10 +359,11 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again. - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + s = "DELETE FROM " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("CTC_DATABASE", identifierQuoteString) + " = '" + info.dbname + "' and " + + TxnDbUtil.getEscape("CTC_TABLE", identifierQuoteString) + " = '" + info.tableName + "'"; if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; + s += " AND " + TxnDbUtil.getEscape("CTC_PARTITION", identifierQuoteString) + " = '" + info.partName + "'"; } LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) < 1) { @@ -329,10 +372,16 @@ public void markCleaned(CompactionInfo info) throws MetaException { } - s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + s = "SELECT " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " AND " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' AND " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + " = '" + info.dbname + "' AND " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + " = '" + info.tableName + "'"; + if (info.partName != null) + s += " AND " + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set txnids = new HashSet(); @@ -341,7 +390,8 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from txn_components, as there may be aborted txn components StringBuffer buf = new StringBuffer(); - buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); + buf.append("DELETE FROM " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " "); + buf.append("WHERE " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " IN ("); boolean first = true; for (long id : txnids) { if (first) first = false; @@ -349,13 +399,13 @@ public void markCleaned(CompactionInfo info) throws MetaException { buf.append(id); } - buf.append(") and tc_database = '"); + buf.append(") and " + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + " = '"); buf.append(info.dbname); - buf.append("' and tc_table = '"); + buf.append("' and " + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + " = '"); buf.append(info.tableName); buf.append("'"); if (info.partName != null) { - buf.append(" and tc_partition = '"); + buf.append(" and " + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " = '"); buf.append(info.partName); buf.append("'"); } @@ -399,16 +449,21 @@ public void cleanEmptyAbortedTxns() throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + String s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + " " + + "WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " NOT IN (" + + "SELECT " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " FROM " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + ") AND " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { - StringBuffer buf = new StringBuffer("delete from TXNS where txn_id in ("); + StringBuffer buf = new StringBuffer("DELETE FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " IN ("); boolean first = true; for (long tid : txnids) { if (first) first = false; @@ -457,10 +512,15 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + " SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' and " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " LIKE '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -504,10 +564,15 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { long latestValidStart = getDbTime(dbConn) - timeout; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; + String s = "UPDATE " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString) + " " + + " SET " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " = null, " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + INITIATED_STATE + "' " + + "WHERE " + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + " = '" + WORKING_STATE + "' and " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + " < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -547,10 +612,18 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { Statement stmt = null; ResultSet rs = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") - + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" - + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'"); + String s = "SELECT " + TxnDbUtil.getEscape("COLUMN_NAME", identifierQuoteString); + if (ci.partName == null) { + s += " FROM " + TxnDbUtil.getEscape("TAB_COL_STATS", identifierQuoteString); + } else { + s += " FROM " + TxnDbUtil.getEscape("PART_COL_STATS", identifierQuoteString); + } + s += " WHERE " + TxnDbUtil.getEscape("DB_NAME", identifierQuoteString) + "='" + ci.dbname + "' AND " + + TxnDbUtil.getEscape("TABLE_NAME", identifierQuoteString) + "='" + ci.tableName + "'"; + if (ci.partName != null) + s += " AND " + TxnDbUtil.getEscape("PARTITION_NAME", identifierQuoteString) + "='" + ci.partName + "'"; LOG.debug("Going to execute <" + s + ">"); rs = stmt.executeQuery(s); List columns = new ArrayList(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index df183a0..88dadd7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -63,55 +63,58 @@ public static void prepDb() throws Exception { try { conn = getConnection(); stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TXNS (" + - " TXN_ID bigint PRIMARY KEY," + - " TXN_STATE char(1) NOT NULL," + - " TXN_STARTED bigint NOT NULL," + - " TXN_LAST_HEARTBEAT bigint NOT NULL," + - " TXN_USER varchar(128) NOT NULL," + - " TXN_HOST varchar(128) NOT NULL)"); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); + stmt.execute("CREATE TABLE " + getEscape("TXNS", identifierQuoteString) + " (" + + " " + getEscape("TXN_ID", identifierQuoteString) + " bigint PRIMARY KEY," + + " " + getEscape("TXN_STATE", identifierQuoteString) + " char(1) NOT NULL," + + " " + getEscape("TXN_STARTED", identifierQuoteString) + " bigint NOT NULL," + + " " + getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + " bigint NOT NULL," + + " " + getEscape("TXN_USER", identifierQuoteString) + " varchar(128) NOT NULL," + + " " + getEscape("TXN_HOST", identifierQuoteString) + " varchar(128) NOT NULL)"); stmt.execute("CREATE TABLE TXN_COMPONENTS (" + - " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + - " TC_DATABASE varchar(128) NOT NULL," + - " TC_TABLE varchar(128)," + - " TC_PARTITION varchar(767))"); + " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," + + " TC_DATABASE varchar(128) NOT NULL," + + " TC_TABLE varchar(128)," + + " TC_PARTITION varchar(767))"); stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + - " CTC_TXNID bigint," + - " CTC_DATABASE varchar(128) NOT NULL," + - " CTC_TABLE varchar(128)," + - " CTC_PARTITION varchar(767))"); - stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); + " CTC_TXNID bigint," + + " CTC_DATABASE varchar(128) NOT NULL," + + " CTC_TABLE varchar(128)," + + " CTC_PARTITION varchar(767))"); + stmt.execute("CREATE TABLE NEXT_TXN_ID (" + + " NTXN_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); stmt.execute("CREATE TABLE HIVE_LOCKS (" + - " HL_LOCK_EXT_ID bigint NOT NULL," + - " HL_LOCK_INT_ID bigint NOT NULL," + - " HL_TXNID bigint," + - " HL_DB varchar(128) NOT NULL," + - " HL_TABLE varchar(128)," + - " HL_PARTITION varchar(767)," + - " HL_LOCK_STATE char(1) NOT NULL," + - " HL_LOCK_TYPE char(1) NOT NULL," + - " HL_LAST_HEARTBEAT bigint NOT NULL," + - " HL_ACQUIRED_AT bigint," + - " HL_USER varchar(128) NOT NULL," + - " HL_HOST varchar(128) NOT NULL," + - " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); + " HL_LOCK_EXT_ID bigint NOT NULL," + + " HL_LOCK_INT_ID bigint NOT NULL," + + " HL_TXNID bigint," + + " HL_DB varchar(128) NOT NULL," + + " HL_TABLE varchar(128)," + + " HL_PARTITION varchar(767)," + + " HL_LOCK_STATE char(1) NOT NULL," + + " HL_LOCK_TYPE char(1) NOT NULL," + + " HL_LAST_HEARTBEAT bigint NOT NULL," + + " HL_ACQUIRED_AT bigint," + + " HL_USER varchar(128) NOT NULL," + + " HL_HOST varchar(128) NOT NULL," + + " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); - stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)"); + stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + + " NL_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); stmt.execute("CREATE TABLE COMPACTION_QUEUE (" + - " CQ_ID bigint PRIMARY KEY," + - " CQ_DATABASE varchar(128) NOT NULL," + - " CQ_TABLE varchar(128) NOT NULL," + - " CQ_PARTITION varchar(767)," + - " CQ_STATE char(1) NOT NULL," + - " CQ_TYPE char(1) NOT NULL," + - " CQ_WORKER_ID varchar(128)," + - " CQ_START bigint," + - " CQ_RUN_AS varchar(128))"); + " CQ_ID bigint PRIMARY KEY," + + " CQ_DATABASE varchar(128) NOT NULL," + + " CQ_TABLE varchar(128) NOT NULL," + + " CQ_PARTITION varchar(767)," + + " CQ_STATE char(1) NOT NULL," + + " CQ_TYPE char(1) NOT NULL," + + " CQ_WORKER_ID varchar(128)," + + " CQ_START bigint," + + " CQ_RUN_AS varchar(128))"); stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); @@ -173,7 +176,9 @@ public static int countLockComponents(long lockId) throws Exception { ResultSet rs = null; try { conn = getConnection(); - stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?"); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); + stmt = conn.prepareStatement("select count(*) from " + getEscape("HIVE_LOCKS", identifierQuoteString) + + " where " + getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = ?"); stmt.setLong(1, lockId); rs = stmt.executeQuery(); if (!rs.next()) { @@ -191,8 +196,10 @@ public static int findNumCurrentLocks() throws Exception { ResultSet rs = null; try { conn = getConnection(); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); stmt = conn.createStatement(); - rs = stmt.executeQuery("select count(*) from hive_locks"); + rs = stmt.executeQuery("select count(*) from " + + getEscape("HIVE_LOCKS", identifierQuoteString)); if (!rs.next()) { return 0; } @@ -246,4 +253,9 @@ private static void closeResources(Connection conn, Statement stmt, ResultSet rs } } } + + public static String getEscape(String name, String identifierQuoteString) + { + return identifierQuoteString + name + identifierQuoteString; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index f1697bb..5e39aeb 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -133,8 +133,10 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " - 1 " + + "FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -148,7 +150,11 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { } List txnInfo = new ArrayList(); - s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; + s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TXN_HOST", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString); LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -195,8 +201,10 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { Statement stmt = null; try { timeOutTxns(dbConn); + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " - 1 " + + "FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -210,7 +218,8 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } Set openList = new HashSet(); - s = "select txn_id from TXNS"; + s = "SELECT " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString); LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -255,8 +264,10 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString) + ""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -264,13 +275,19 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { "configured, can't find next transaction id."); } long first = rs.getLong(1); - s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_TXN_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NTXN_NEXT", identifierQuoteString) + " = " + (first + numTxns); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); long now = getDbTime(dbConn); - s = "insert into TXNS (txn_id, txn_state, txn_started, " + - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + s = "INSERT INTO " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + " (" + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_STARTED", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_USER", identifierQuoteString) + "," + + TxnDbUtil.getEscape("TXN_HOST", identifierQuoteString) + + ") VALUES (?, 'o', " + now + ", " + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to prepare statement <" + s + ">"); PreparedStatement ps = dbConn.prepareStatement(s); List txnIds = new ArrayList(numTxns); @@ -344,6 +361,7 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to // commit it, but it does two things. One, it makes sure the transaction is still valid. @@ -354,8 +372,16 @@ public void commitTxn(CommitTxnRequest rqst) // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. - String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + String s = "INSERT INTO " + TxnDbUtil.getEscape("COMPLETED_TXN_COMPONENTS", identifierQuoteString) + " " + + " SELECT " + + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + " " + + " FROM " + + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + + " WHERE " + + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.warn("Expected to move at least one record from txn_components to " + @@ -363,14 +389,17 @@ public void commitTxn(CommitTxnRequest rqst) } // Always access TXN_COMPONENTS before HIVE_LOCKS; - s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); // Always access HIVE_LOCKS before TXNS - s = "delete from HIVE_LOCKS where hl_txnid = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = "delete from TXNS where txn_id = " + txnid; + s = "DELETE FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -505,8 +534,10 @@ public void unlock(UnlockRequest rqst) LOG.error(msg); throw new TxnOpenException(msg); } + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; + String s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -542,10 +573,22 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { List elems = new ArrayList(); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; + String s = "SELECT " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_ACQUIRED_AT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_HOST", identifierQuoteString) + " " + + " FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString); LOG.debug("Doing to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -657,10 +700,12 @@ public void compact(CompactionRequest rqst) throws MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NCQ_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_COMPACTION_QUEUE_ID", identifierQuoteString) + ""; LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -670,16 +715,21 @@ public void compact(CompactionRequest rqst) throws MetaException { "no record found in next_compaction_queue_id"); } long id = rs.getLong(1); - s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_COMPACTION_QUEUE_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NCQ_NEXT", identifierQuoteString) + " = " + (id + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); + StringBuilder buf = new StringBuilder("INSERT INTO " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString)); + buf.append(" ("); + buf.append(TxnDbUtil.getEscape("CQ_ID", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", "); String partName = rqst.getPartitionname(); - if (partName != null) buf.append("cq_partition, "); - buf.append("cq_state, cq_type"); - if (rqst.getRunas() != null) buf.append(", cq_run_as"); + if (partName != null) buf.append(TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + ", "); + buf.append(TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ""); + if (rqst.getRunas() != null) buf.append(", " + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + ""); buf.append(") values ("); buf.append(id); buf.append(", '"); @@ -742,9 +792,17 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, cq_run_as from COMPACTION_QUEUE"; + String s = "SELECT " + TxnDbUtil.getEscape("CQ_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_WORKER_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_START", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("CQ_RUN_AS", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("COMPACTION_QUEUE", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); while (rs.next()) { @@ -792,8 +850,9 @@ int numLocksInLockTable() throws SQLException, MetaException { Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select count(*) from HIVE_LOCKS"; + String s = "SELECT COUNT(*) FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); rs.next(); @@ -1109,10 +1168,12 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException Statement stmt = null; int updateCnt = 0; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS - StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); + StringBuilder buf = new StringBuilder("DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + " IN ("); boolean first = true; for (Long id : txnids) { if (first) first = false; @@ -1123,7 +1184,9 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); + buf = new StringBuilder("UPDATE " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_ABORTED + "' " + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " IN ("); first = true; for (Long id : txnids) { if (first) first = false; @@ -1174,10 +1237,12 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) timeOutLocks(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Get the next lock id. - String s = "select nl_next from NEXT_LOCK_ID"; + String s = "SELECT " + TxnDbUtil.getEscape("NL_NEXT", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("NEXT_LOCK_ID", identifierQuoteString); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1187,7 +1252,8 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + s = "UPDATE " + TxnDbUtil.getEscape("NEXT_LOCK_ID", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("NL_NEXT", identifierQuoteString) + " = " + (extLockId + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit."); @@ -1206,9 +1272,12 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) String dbName = lc.getDbname(); String tblName = lc.getTablename(); String partName = lc.getPartitionname(); - s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition) " + - "values (" + txnid + ", '" + dbName + "', " + + s = "INSERT INTO " + TxnDbUtil.getEscape("TXN_COMPONENTS", identifierQuoteString) + " " + + "(" + TxnDbUtil.getEscape("TC_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_DATABASE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("TC_PARTITION", identifierQuoteString) + ") " + + "VALUES (" + txnid + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + ", " + (partName == null ? "null" : "'" + partName + "'") + ")"; LOG.debug("Going to execute update <" + s + ">"); @@ -1230,10 +1299,21 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; } long now = getDbTime(dbConn); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + + + s = "INSERT INTO " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + " " + + " (" + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_USER", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_HOST", identifierQuoteString) + ")" + + " VALUES (" + extLockId + ", " + + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + ", " + (partName == null ? "null" : "'" + partName + "'") + @@ -1269,9 +1349,17 @@ private LockResponse checkLock(Connection dbConn, LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); - StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); + StringBuilder query = new StringBuilder("SELECT "); + query.append(TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", "); + query.append(TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + " "); + query.append(" FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString)); + query.append(" WHERE " + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + " IN ("); Set strings = new HashSet(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1300,7 +1388,8 @@ private LockResponse checkLock(Connection dbConn, } } if (!sawNull) { - query.append(" and (hl_table is null or hl_table in("); + query.append(" AND (" + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + " IS NULL OR "); + query.append(TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + " IN ("); first = true; for (String s : strings) { if (first) first = false; @@ -1324,7 +1413,8 @@ private LockResponse checkLock(Connection dbConn, } } if (!sawNull) { - query.append(" and (hl_partition is null or hl_partition in("); + query.append(" and (" + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + " IS NULL OR "); + query.append(TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + " IN ("); first = true; for (String s : strings) { if (first) first = false; @@ -1453,9 +1543,15 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) throws SQLException, NoSuchLockException, MetaException { long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + intLockId; + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); + String s = "UPDATE " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " SET " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + " = '" + LOCK_ACQUIRED + "', " + + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + ", " + + TxnDbUtil.getEscape("HL_ACQUIRED_AT", identifierQuoteString) + " = " + now + + " WHERE " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId + " and " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + " = " + intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1476,11 +1572,13 @@ private void heartbeatLock(Connection dbConn, long extLockId) if (extLockId == 0) return; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; + String s = "UPDATE " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1502,10 +1600,13 @@ private void heartbeatTxn(Connection dbConn, long txnid) if (txnid == 0) return; Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); long now = getDbTime(dbConn); // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + txnid; + String s = "SELECT " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1519,8 +1620,9 @@ private void heartbeatTxn(Connection dbConn, long txnid) throw new TxnAbortedException("Transaction " + txnid + " already aborted"); } - s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; + s = "UPDATE " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + " SET " + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + " = " + now + + " WHERE " + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1535,9 +1637,11 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + String s = "SELECT " + TxnDbUtil.getEscape("HL_TXNID", identifierQuoteString) + + " FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1557,10 +1661,20 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + String s = "SELECT " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_INT_ID", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_DB", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_TABLE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_PARTITION", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_STATE", identifierQuoteString) + ", " + + TxnDbUtil.getEscape("HL_LOCK_TYPE", identifierQuoteString) + " " + + " FROM " + + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + + TxnDbUtil.getEscape("HL_LOCK_EXT_ID", identifierQuoteString) + " = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); boolean sawAtLeastOne = false; @@ -1586,10 +1700,11 @@ private void timeOutLocks(Connection dbConn) throws SQLException, MetaException long now = getDbTime(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); // Remove any timed out locks from the table. - String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); + String s = "DELETE FROM " + TxnDbUtil.getEscape("HIVE_LOCKS", identifierQuoteString) + + " WHERE " + TxnDbUtil.getEscape("HL_LAST_HEARTBEAT", identifierQuoteString) + " < " + (now - timeout); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1606,10 +1721,13 @@ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { long now = getDbTime(dbConn); Statement stmt = null; try { + String identifierQuoteString = dbConn.getMetaData().getIdentifierQuoteString(); stmt = dbConn.createStatement(); - // Abort any timed out locks from the table. - String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); + String s = "SELECT " + + TxnDbUtil.getEscape("TXN_ID", identifierQuoteString) + " " + + "FROM " + TxnDbUtil.getEscape("TXNS", identifierQuoteString) + + "WHERE " + TxnDbUtil.getEscape("TXN_STATE", identifierQuoteString) + " = '" + TXN_OPEN + "' and " + + TxnDbUtil.getEscape("TXN_LAST_HEARTBEAT", identifierQuoteString) + " < " + (now - timeout); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); List deadTxns = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 264052f..f641a96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -244,7 +244,7 @@ public void rollbackTxn() throws LockException { @Override public void heartbeat() throws LockException { LOG.debug("Heartbeating lock and transaction " + txnId); - List locks = lockMgr.getLocks(false, false); + List locks = getLockManager().getLocks(false, false); if (locks.size() == 0) { if (txnId == 0) { // No locks, no txn, we outta here. diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java index b074ca9..62afd80 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java @@ -90,11 +90,12 @@ public Void run(PreparedStatement stmt) throws SQLException { for (int failures = 0;; failures++) { try { conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); for (String statType : JDBCStatsUtils.getSupportedStatistics()) { // prepare statements PreparedStatement selStmt = Utilities.prepareWithRetry(conn, - JDBCStatsUtils.getSelectAggr(statType, comment), waitWindow, maxRetries); + JDBCStatsUtils.getSelectAggr(statType, comment, identifierQuoteString), waitWindow, maxRetries); columnMapping.put(statType, selStmt); // set query timeout Utilities.executeWithRetry(setQueryTimeout, selStmt, waitWindow, failures); @@ -216,11 +217,11 @@ public Void run(PreparedStatement stmt) throws SQLException { } }; try { - + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); String keyPrefix = Utilities.escapeSqlLike(rowID) + "%"; PreparedStatement delStmt = Utilities.prepareWithRetry(conn, - JDBCStatsUtils.getDeleteAggr(rowID, comment), waitWindow, maxRetries); + JDBCStatsUtils.getDeleteAggr(rowID, comment, identifierQuoteString), waitWindow, maxRetries); delStmt.setString(1, keyPrefix); delStmt.setString(2, Character.toString(Utilities.sqlEscapeChar)); diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java index 5e317ab..bdff526 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java @@ -27,11 +27,13 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLRecoverableException; import java.sql.Statement; +import java.sql.Types; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -89,11 +91,12 @@ public Void run(PreparedStatement stmt) throws SQLException { for (int failures = 0;; failures++) { try { conn = Utilities.connectWithRetry(connectionString, waitWindow, maxRetries); + String identifierQuoteString = conn.getMetaData().getIdentifierQuoteString(); // prepare statements - updStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getUpdate(comment), waitWindow, + updStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getUpdate(comment, identifierQuoteString), waitWindow, maxRetries); - insStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getInsert(comment), waitWindow, + insStmt = Utilities.prepareWithRetry(conn, JDBCStatsUtils.getInsert(comment, identifierQuoteString), waitWindow, maxRetries); // set query timeout @@ -155,7 +158,13 @@ public Void run(PreparedStatement stmt) throws SQLException { try { insStmt.setString(1, fileID); for (int i = 0; i < JDBCStatsUtils.getSupportedStatistics().size(); i++) { - insStmt.setString(i + 2, stats.get(supportedStatistics.get(i))); + String valLong = StringUtils.trim(stats.get(supportedStatistics.get(i))); + if (StringUtils.isNotBlank(valLong)) + { + insStmt.setLong(i + 2, Long.parseLong(valLong)); + } else { + insStmt.setNull(i + 2, Types.BIGINT); + } } Utilities.executeWithRetry(execUpdate, insStmt, waitWindow, maxRetries); return true; @@ -282,7 +291,7 @@ public boolean init(Configuration hconf) { rs = dbm.getTables(null, null, JDBCStatsUtils.getStatTableName(), null); boolean tblExists = rs.next(); if (!tblExists) { // Table does not exist, create it - String createTable = JDBCStatsUtils.getCreate(""); + String createTable = JDBCStatsUtils.getCreate("", conn.getMetaData().getIdentifierQuoteString()); stmt.executeUpdate(createTable); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java index 4625d27..9dd609c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsUtils.java @@ -123,12 +123,12 @@ public static String getBasicStat() { /** * Prepares CREATE TABLE query */ - public static String getCreate(String comment) { - String create = "CREATE TABLE /* " + comment + " */ " + JDBCStatsUtils.getStatTableName() + - " (" + getTimestampColumnName() + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + - JDBCStatsUtils.getIdColumnName() + " VARCHAR(255) PRIMARY KEY "; + public static String getCreate(String comment, String identifierQuoteString) { + String create = "CREATE TABLE /* " + comment + " */ " + getEscape(getStatTableName(), identifierQuoteString) + + " (" + getEscape(getTimestampColumnName(), identifierQuoteString) + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + getEscape(getIdColumnName(), identifierQuoteString) + " VARCHAR(255) PRIMARY KEY "; for (int i = 0; i < supportedStats.size(); i++) { - create += ", " + getStatColumnName(supportedStats.get(i)) + " BIGINT "; + create += ", " + getEscape(getStatColumnName(supportedStats.get(i)), identifierQuoteString) + " BIGINT "; } create += ")"; return create; @@ -137,15 +137,17 @@ public static String getCreate(String comment) { /** * Prepares UPDATE statement issued when updating existing statistics */ - public static String getUpdate(String comment) { - String update = "UPDATE /* " + comment + " */ " + getStatTableName() + " SET "; + public static String getUpdate(String comment, String identifierQuoteString) { + String update = "UPDATE /* " + comment + " */ " + getEscape(getStatTableName(), identifierQuoteString); + update += " SET "; for (int i = 0; i < supportedStats.size(); i++) { - update += columnNameMapping.get(supportedStats.get(i)) + " = ? , "; + update += getEscape(columnNameMapping.get(supportedStats.get(i)), identifierQuoteString) + " = ? , "; } - update += getTimestampColumnName() + " = CURRENT_TIMESTAMP"; - update += " WHERE " + JDBCStatsUtils.getIdColumnName() + " = ? AND ? > ( SELECT TEMP." + update += getEscape(getTimestampColumnName(), identifierQuoteString) + " = CURRENT_TIMESTAMP"; + update += " WHERE " + getEscape(getIdColumnName(), identifierQuoteString) + " = ? AND ? > ( SELECT TEMP." + getStatColumnName(getBasicStat()) + " FROM ( " + - " SELECT " + getStatColumnName(getBasicStat()) + " FROM " + getStatTableName() + " WHERE " + " SELECT " + getEscape(getStatColumnName(getBasicStat()), identifierQuoteString) + + " FROM " + getEscape(getStatTableName(), identifierQuoteString) + " WHERE " + getIdColumnName() + " = ? ) TEMP )"; return update; } @@ -153,15 +155,15 @@ public static String getUpdate(String comment) { /** * Prepares INSERT statement for statistic publishing. */ - public static String getInsert(String comment) { - String columns = JDBCStatsUtils.getIdColumnName(); + public static String getInsert(String comment, String identifierQuoteString) { + String columns = getEscape(getIdColumnName(), identifierQuoteString); String values = "?"; for (int i = 0; i < supportedStats.size(); i++) { - columns += ", " + getStatColumnName(supportedStats.get(i)); + columns += ", " + getEscape(getStatColumnName(supportedStats.get(i)), identifierQuoteString); values += ", ?"; } - String insert = "INSERT INTO /* " + comment + " */ " + getStatTableName() + "(" + columns + + String insert = "INSERT INTO /* " + comment + " */ " + getEscape(getStatTableName(), identifierQuoteString) + "(" + columns + ") VALUES (" + values + ")"; return insert; } @@ -174,21 +176,26 @@ public static String getInsert(String comment) { * @param comment * @return aggregated value for the given statistic */ - public static String getSelectAggr(String statType, String comment) { + public static String getSelectAggr(String statType, String comment, String identifierQuoteString) { String select = "SELECT /* " + comment + " */ " + "SUM( " - + getStatColumnName(statType) + " ) " + " FROM " - + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + " LIKE ? ESCAPE ?"; + + getEscape(getStatColumnName(statType), identifierQuoteString) + " ) " + + " FROM " + getEscape(getStatTableName(), identifierQuoteString) + + " WHERE " + getEscape(getIdColumnName(), identifierQuoteString) + " LIKE ? ESCAPE ?"; return select; } /** * Prepares DELETE statement for cleanup. */ - public static String getDeleteAggr(String rowID, String comment) { + public static String getDeleteAggr(String rowID, String comment, String identifierQuoteString) { String delete = "DELETE /* " + comment + " */ " + - " FROM " + getStatTableName() + " WHERE " + JDBCStatsUtils.getIdColumnName() + - " LIKE ? ESCAPE ?"; + " FROM " + getEscape(getStatTableName(), identifierQuoteString) + + " WHERE " + getEscape(getIdColumnName(), identifierQuoteString) + " LIKE ? ESCAPE ?"; return delete; } + private static String getEscape(String name, String identifierQuoteString) + { + return identifierQuoteString + name + identifierQuoteString; + } }