diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 369d9a4..b287d43 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -23,7 +23,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -748,11 +748,14 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" + " \"TXN_WRITE_NOTIFICATION_LOG\" " + - "where \"WNL_DATABASE\" = " + quoteString(dbName) + - "and \"WNL_TABLE\" = " + quoteString(tblName) + " and \"WNL_PARTITION\" = " + - quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId())); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + "where \"WNL_DATABASE\" = ? " + + "and \"WNL_TABLE\" = ? " + " and \"WNL_PARTITION\" = ? " + + "and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId())); + List params = Arrays.asList(dbName, tblName, partition); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName), quoteString(partition)); + rs = pst.executeQuery(); if (!rs.next()) { // if rs is empty then no lock is taken and thus it can not cause deadlock. long nextNLId = getNextNLId(stmt, sqlGenerator, @@ -761,6 +764,7 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " + "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " + "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)"; + closeStmt(pst); int currentTime = now(); pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); pst.setLong(1, nextNLId); @@ -793,6 +797,7 @@ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent aci " \"WNL_FILES\" = ? ," + " \"WNL_EVENT_TIME\" = ?" + " where \"WNL_ID\" = ?"; + closeStmt(pst); pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); pst.setString(1, tableObj); pst.setString(2, partitionObj); @@ -826,6 +831,7 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE return; } Statement stmt = null; + PreparedStatement pst = null; ResultSet rs = null; try { stmt = dbConn.createStatement(); @@ -852,21 +858,20 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE long nextNLId = getNextNLId(stmt, sqlGenerator, "org.apache.hadoop.hive.metastore.model.MNotificationLog"); - List insert = new ArrayList<>(); + String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," + + quoteString(" ") + ",?, ?)"; - insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," + - quoteString(event.getEventType()) + "," + quoteString(event.getDbName()) + "," + - quoteString(" ") + "," + quoteString(event.getMessage()) + "," + - quoteString(event.getMessageFormat())); + s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " + + " \"EVENT_TYPE\", \"DB_NAME\", " + + " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal; + List params = Arrays.asList( + event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat()); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); - List sql = sqlGenerator.createInsertValuesStmt( - "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " + - " \"EVENT_TYPE\", \"DB_NAME\"," + - " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", insert); - for (String q : sql) { - LOG.info("Going to execute insert <" + q + ">"); - stmt.execute(q); - } + LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", + quoteString(event.getEventType()), quoteString(event.getDbName()), + quoteString(event.getMessage()), quoteString(event.getMessageFormat())); + pst.execute(); // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. if (event.isSetEventId()) { @@ -879,6 +884,7 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE throw e; } finally { closeStmt(stmt); + closeStmt(pst); close(rs); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 2576ba2..cc86afe 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -98,8 +98,8 @@ public void testSingleReadTable() throws Exception { public void testSingleReadPartition() throws Exception { addPartitionInput(newTable(true)); QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY); - txnMgr.openTxn(ctx, null); - txnMgr.acquireLocks(qp, ctx, null); + txnMgr.openTxn(ctx, "fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); List locks = ctx.getHiveLocks(); Assert.assertEquals(1, locks.size()); Assert.assertEquals(1, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index d0ac7db..49b737e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -26,6 +26,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -47,18 +50,80 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { } /** - * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object + * @return List PreparedStatement objects for fully formed INSERT INTO ... statements + */ + public List createInsertValuesPreparedStmt(Connection dbConn, + String tblColumns, List rows, + List> paramsList) + throws SQLException { + if (rows == null || rows.size() == 0) { + return Collections.emptyList(); + } + assert((paramsList == null) || (rows.size() == paramsList.size())); + + List rowsCountInStmts = new ArrayList<>(); + List insertStmts = createInsertValuesStmt(tblColumns, rows, rowsCountInStmts); + assert(insertStmts.size() == rowsCountInStmts.size()); + + List preparedStmts = new ArrayList<>(); + int paramsListFromIdx = 0; + try { + for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) { + String sql = insertStmts.get(stmtIdx); + PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null); + if (paramsList != null) { + int paramIdx = 1; + int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx); + for (int paramsListIdx = paramsListFromIdx; paramsListIdx < paramsListToIdx; paramsListIdx++) { + List params = paramsList.get(paramsListIdx); + for (int i = 0; i < params.size(); i++, paramIdx++) { + pStmt.setString(paramIdx, params.get(i)); + } + } + paramsListFromIdx = paramsListToIdx; + } + preparedStmts.add(pStmt); + } + } catch (SQLException e) { + for (PreparedStatement pst : preparedStmts) { + pst.close(); + } + throw e; + } + return preparedStmts; + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB * * @param tblColumns e.g. "T(a,b,c)" * @param rows e.g. list of Strings like 3,4,'d' * @return fully formed INSERT INTO ... statements */ public List createInsertValuesStmt(String tblColumns, List rows) { + return createInsertValuesStmt(tblColumns, rows, null); + } + + /** + * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB + * + * @param tblColumns e.g. "T(a,b,c)" + * @param rows e.g. list of Strings like 3,4,'d' + * @param rowsCountInStmts Output the number of rows in each insert statement returned. + * @return fully formed INSERT INTO ... statements + */ + private List createInsertValuesStmt(String tblColumns, List rows, List rowsCountInStmts) { if (rows == null || rows.size() == 0) { return Collections.emptyList(); } List insertStmts = new ArrayList<>(); StringBuilder sb = new StringBuilder(); + int numRowsInCurrentStmt = 0; switch (dbProduct) { case ORACLE: if (rows.size() > 1) { @@ -69,15 +134,23 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { if (numRows > 0) { sb.append(" select * from dual"); insertStmts.add(sb.toString()); + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + numRowsInCurrentStmt = 0; } sb.setLength(0); sb.append("insert all "); } sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows)) .append(") "); + numRowsInCurrentStmt++; } sb.append("select * from dual"); insertStmts.add(sb.toString()); + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } return insertStmts; } //fall through @@ -89,13 +162,21 @@ public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) { if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { if (numRows > 0) { insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } + numRowsInCurrentStmt = 0; } sb.setLength(0); sb.append("insert into ").append(tblColumns).append(" values"); } sb.append('(').append(rows.get(numRows)).append("),"); + numRowsInCurrentStmt++; } insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma + if (rowsCountInStmts != null) { + rowsCountInStmts.add(numRowsInCurrentStmt); + } return insertStmts; default: String msg = "Unrecognized database product name <" + dbProduct + ">"; @@ -171,6 +252,33 @@ public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaEx } } + /** + * Make PreparedStatement object with list of String type parameters to be set. + * It is assumed the input sql string have the number of "?" equal to number of parameters + * passed as input. + * @param dbConn - Connection object + * @param sql - SQL statement with "?" for input parameters. + * @param parameters - List of String type parameters to be set in PreparedStatement object + * @return PreparedStatement type object + * @throws SQLException + */ + public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List parameters) + throws SQLException { + PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql)); + if ((parameters == null) || parameters.isEmpty()) { + return pst; + } + try { + for (int i = 1; i <= parameters.size(); i++) { + pst.setString(i, parameters.get(i - 1)); + } + } catch (SQLException e) { + pst.close(); + throw e; + } + return pst; + } + public DatabaseProduct getDbProduct() { return dbProduct; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d76049e..1df1ebc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.Driver; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; @@ -570,10 +571,11 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { throws SQLException, MetaException { int numTxns = rqst.getNum_txns(); ResultSet rs = null; + List insertPreparedStmts = null; TxnType txnType = TxnType.DEFAULT; try { if (rqst.isSetReplPolicy()) { - List targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt); + List targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), dbConn); if (!targetTxnIdList.isEmpty()) { if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) { @@ -603,16 +605,20 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { List txnIds = new ArrayList<>(numTxns); List rows = new ArrayList<>(); + List params = new ArrayList<>(); + params.add(rqst.getUser()); + params.add(rqst.getHostname()); + List> paramsList = new ArrayList<>(numTxns); for (long i = first; i < first + numTxns; i++) { txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," - + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue()); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue()); + paramsList.add(params); } - List queries = sqlGenerator.createInsertValuesStmt( - "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows); - for (String q : queries) { - LOG.debug("Going to execute update <" + q + ">"); - stmt.execute(q); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", + rows, paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.execute(); } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. @@ -644,18 +650,23 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (rqst.isSetReplPolicy()) { List rowsRepl = new ArrayList<>(); - + for (PreparedStatement pst : insertPreparedStmts) { + pst.close(); + } + insertPreparedStmts.clear(); + params.clear(); + paramsList.clear(); + params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { - rowsRepl.add( - quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); + paramsList.add(params); } - List queriesRepl = sqlGenerator.createInsertValuesStmt( - "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl); - - for (String query : queriesRepl) { - LOG.info("Going to execute insert <" + query + ">"); - stmt.execute(query); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl, + paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.execute(); } } @@ -665,12 +676,18 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } return txnIds; } finally { + if (insertPreparedStmts != null) { + for (PreparedStatement pst : insertPreparedStmts) { + pst.close(); + } + } close(rs); } } - private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Statement stmt) + private List getTargetTxnIdList(String replPolicy, List sourceTxnIdList, Connection dbConn) throws SQLException { + PreparedStatement pst = null; ResultSet rs = null; try { List inQueries = new ArrayList<>(); @@ -678,15 +695,18 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { StringBuilder suffix = new StringBuilder(); List targetTxnIdList = new ArrayList<>(); prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where "); - suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy)); + suffix.append(" and RTM_REPL_POLICY = ?"); TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList, "RTM_SRC_TXN_ID", false, false); + List params = Arrays.asList(replPolicy); for (String query : inQueries) { - LOG.debug("Going to execute select <" + query + ">"); - rs = stmt.executeQuery(query); + LOG.debug("Going to execute select <" + query.replaceAll("\\?", "{}") + ">", quoteString(replPolicy)); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); + rs = pst.executeQuery(); while (rs.next()) { targetTxnIdList.add(rs.getLong(1)); } + closeStmt(pst); } LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString()); return targetTxnIdList; @@ -694,6 +714,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { LOG.warn("failed to get target txn ids " + e.getMessage()); throw e; } finally { + closeStmt(pst); close(rs); } } @@ -703,12 +724,10 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - List targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt); + List targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), dbConn); if (targetTxnIds.isEmpty()) { LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy); return -1; @@ -722,7 +741,7 @@ public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaExcep throw new MetaException("Unable to get target transaction id " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + closeDbConn(dbConn); unlockInternal(); } } catch (RetryException e) { @@ -730,6 +749,14 @@ public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaExcep } } + private void deleteReplTxnMapEntry(Connection dbConn, long sourceTxnId, String replPolicy) throws SQLException { + String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = ?"; + try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(replPolicy))) { + LOG.info("Going to execute <" + s.replaceAll("\\?", "{}") + ">", quoteString(replPolicy)); + pst.executeUpdate(); + } + } + @Override @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { @@ -746,7 +773,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept if (rqst.isSetReplPolicy()) { sourceTxnId = rqst.getTxnid(); List targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), - Collections.singletonList(sourceTxnId), stmt); + Collections.singletonList(sourceTxnId), dbConn); if (targetTxnIds.isEmpty()) { // Idempotent case where txn was already closed or abort txn event received without // corresponding open txn event. @@ -764,10 +791,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state ABORTED for transactions started using replication replay task"); - String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + - " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); - LOG.info("Going to execute <" + s + ">"); - stmt.executeUpdate(s); + deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) + ") requested by it is already " + TxnStatus.ABORTED); @@ -777,10 +801,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } if (rqst.isSetReplPolicy()) { - String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + - " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); - LOG.info("Going to execute <" + s + ">"); - stmt.executeUpdate(s); + deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } if (transactionalListeners != null) { @@ -879,6 +900,7 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; + List insertPreparedStmts = null; ResultSet lockHandle = null; ResultSet commitIdRs = null, rs; try { @@ -889,7 +911,7 @@ public void commitTxn(CommitTxnRequest rqst) if (rqst.isSetReplPolicy()) { sourceTxnId = rqst.getTxnid(); List targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(), - Collections.singletonList(sourceTxnId), stmt); + Collections.singletonList(sourceTxnId), dbConn); if (targetTxnIds.isEmpty()) { // Idempotent case where txn was already closed or commit txn event received without // corresponding open txn event. @@ -1048,25 +1070,26 @@ public void commitTxn(CommitTxnRequest rqst) } else { if (rqst.isSetWriteEventInfos()) { List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { - rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," + - quoteString(writeEventInfo.getTable()) + "," + - quoteString(writeEventInfo.getPartition()) + "," + + rows.add(txnid + ", ?, ?, ?," + writeEventInfo.getWriteId() + "," + - "'" + isUpdateDelete + "'"); + quoteChar(isUpdateDelete)); + List params = new ArrayList<>(); + params.add(writeEventInfo.getDatabase()); + params.add(writeEventInfo.getTable()); + params.add(writeEventInfo.getPartition()); + paramsList.add(params); } - List queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " + - "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", rows); - for (String q : queries) { - LOG.debug("Going to execute insert <" + q + "> "); - stmt.execute(q); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "COMPLETED_TXN_COMPONENTS " + + "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", + rows, paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.execute(); } } - - s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + - " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy()); - LOG.info("Repl going to execute <" + s + ">"); - stmt.executeUpdate(s); + deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } // cleanup all txn related metadata @@ -1103,6 +1126,11 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + if (insertPreparedStmts != null) { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } close(commitIdRs); close(lockHandle, stmt, dbConn); unlockInternal(); @@ -1130,8 +1158,11 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx try { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; + List insertPreparedStmts = null; ResultSet rs = null; TxnStore.MutexAPI.LockHandle handle = null; + List params = Arrays.asList(dbName, tblName); try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -1139,11 +1170,11 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx // Check if this txn state is already replicated for this given table. If yes, then it is // idempotent case and just return. - String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) - + " and nwi_table = " + quoteString(tblName); - LOG.debug("Going to execute query <" + sql + ">"); - - rs = stmt.executeQuery(sql); + String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"; + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params); + LOG.debug("Going to execute query <" + sql.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + rs = pStmt.executeQuery(); if (rs.next()) { LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: " + dbName + "." + tblName); @@ -1159,19 +1190,21 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx // Map each aborted write id with each allocated txn. List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); int i = 0; for (long txn : txnIds) { long writeId = abortedWriteIds.get(i++); - rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + rows.add(txn + ", ?, ?, " + writeId); + paramsList.add(params); LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); } // Insert entries to TXN_TO_WRITE_ID for aborted write ids - List inserts = sqlGenerator.createInsertValuesStmt( - "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows, + paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.execute(); } // Abort all the allocated txns so that the mapped write ids are referred as aborted ones. @@ -1186,11 +1219,13 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx long nextWriteId = validWriteIdList.getHighWatermark() + 1; // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. - sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," + sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + Long.toString(nextWriteId) + ")"; - LOG.debug("Going to execute insert <" + sql + ">"); - stmt.execute(sql); + closeStmt(pStmt); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params); + LOG.debug("Going to execute insert <" + sql.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + pStmt.execute(); LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName); LOG.debug("Going to commit"); @@ -1202,6 +1237,12 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + if (insertPreparedStmts != null) { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } + closeStmt(pStmt); close(rs, stmt, dbConn); if(handle != null) { handle.releaseLocks(); @@ -1244,13 +1285,11 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo try { String[] names = TxnUtils.getDbTableName(fullTableName); assert names.length == 2; - String s = "select t2w_txnid from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ? and t2w_writeid = ?"; - pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s)); - pst.setString(1, names[0]); - pst.setString(2, names[1]); - pst.setLong(3, writeId); + List params = Arrays.asList(names[0], names[1]); + String s = "select t2w_txnid from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ? and t2w_writeid = " + writeId; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]), - quoteString(names[1]), writeId); + quoteString(names[1])); rs = pst.executeQuery(); if (rs.next()) { return TxnCommonUtils.createValidReadTxnList(getOpenTxns(), rs.getLong(1)); @@ -1266,7 +1305,6 @@ private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Lo public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; ValidTxnList validTxnList; try { @@ -1274,7 +1312,6 @@ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) t * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); // We should prepare the valid write ids list based on validTxnList of current txn. // If no txn exists in the caller, then they would pass null for validTxnList and so it is @@ -1292,7 +1329,7 @@ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) t // Get the valid write id list for all the tables read by the current txn List tblValidWriteIdsList = new ArrayList<>(); for (String fullTableName : rqst.getFullTableNames()) { - tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList)); + tblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, fullTableName, validTxnList)); } LOG.debug("Going to rollback"); @@ -1306,7 +1343,7 @@ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) t throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + closeDbConn(dbConn); } } catch (RetryException e) { return getValidWriteIds(rqst); @@ -1315,10 +1352,13 @@ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) t // Method to get the Valid write ids list for the given table // Input fullTableName is expected to be of format . - private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName, + private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fullTableName, ValidTxnList validTxnList) throws SQLException { + PreparedStatement pst = null; ResultSet rs = null; String[] names = TxnUtils.getDbTableName(fullTableName); + assert(names.length == 2); + List params = Arrays.asList(names[0], names[1]); try { // Need to initialize to 0 to make sure if nobody modified this table, then current txn // shouldn't read any data. @@ -1333,11 +1373,12 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT // Find the writeId high water mark based upon txnId high water mark. If found, then, need to // traverse through all write Ids less than writeId HWM to make exceptions list. // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm)) - String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm - + " and t2w_database = " + quoteString(names[0]) - + " and t2w_table = " + quoteString(names[1]); - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); + String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + Long.toString(txnHwm) + + " and t2w_database = ? and t2w_table = ?"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", + quoteString(names[0]), quoteString(names[1])); + rs = pst.executeQuery(); if (rs.next()) { writeIdHwm = rs.getLong(1); } @@ -1346,10 +1387,12 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT if (writeIdHwm <= 0) { // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest // allocated write id. - s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0]) - + " and nwi_table = " + quoteString(names[1]); - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); + s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"; + closeStmt(pst); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", + quoteString(names[0]), quoteString(names[1])); + rs = pst.executeQuery(); if (rs.next()) { long maxWriteId = rs.getLong(1); if (maxWriteId > 0) { @@ -1363,13 +1406,13 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT // then will be added to invalid list. The results should be sorted in ascending order based // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up // using binary search. - s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm - + " and t2w_database = " + quoteString(names[0]) - + " and t2w_table = " + quoteString(names[1]) - + " order by t2w_writeid asc"; - - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); + s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + Long.toString(writeIdHwm) + + " and t2w_database = ? and t2w_table = ? order by t2w_writeid asc"; + closeStmt(pst); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">", + quoteString(names[0]), quoteString(names[1])); + rs = pst.executeQuery(); while (rs.next()) { long txnId = rs.getLong(1); long writeId = rs.getLong(2); @@ -1395,6 +1438,7 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT } return owi; } finally { + closeStmt(pst); close(rs); } } @@ -1409,6 +1453,8 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds try { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; + List insertPreparedStmts = null; ResultSet rs = null; TxnStore.MutexAPI.LockHandle handle = null; List txnToWriteIds = new ArrayList<>(); @@ -1428,7 +1474,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { srcTxnIds.add(txnToWriteId.getTxnId()); } - txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt); + txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn); if (srcTxnIds.size() != txnIds.size()) { // Idempotent case where txn was already closed but gets allocate write id event. // So, just ignore it and return empty list. @@ -1459,8 +1505,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // The write id would have been already allocated in case of multi-statement txns where // first write on a table will allocate write id and rest of the writes should re-use it. prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where" - + " t2w_database = " + quoteString(dbName) - + " and t2w_table = " + quoteString(tblName) + " and "); + + " t2w_database = ? and t2w_table = ?" + " and "); suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "t2w_txnid", false, false); @@ -1468,9 +1513,12 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds long allocatedTxnsCount = 0; long txnId; long writeId = 0; + List params = Arrays.asList(dbName, tblName); for (String query : queries) { - LOG.debug("Going to execute query <" + query + ">"); - rs = stmt.executeQuery(query); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); + LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + rs = pStmt.executeQuery(); while (rs.next()) { // If table write ID is already allocated for the given transaction, then just use it txnId = rs.getLong(1); @@ -1479,6 +1527,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds allocatedTxnsCount++; LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId); } + closeStmt(pStmt); } // Batch allocation should always happen atomically. Either write ids for all txns is allocated or none. @@ -1503,58 +1552,69 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID String s = sqlGenerator.addForUpdateClause( - "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) - + " and nwi_table = " + quoteString(tblName)); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?"); + closeStmt(pStmt); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + rs = pStmt.executeQuery(); if (!rs.next()) { // First allocation of write id should add the table to the next_write_id meta table // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here // For repl flow, we need to force set the incoming write id. writeId = (srcWriteId > 0) ? srcWriteId : 1; - s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")"; - LOG.debug("Going to execute insert <" + s + ">"); - stmt.execute(s); + s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + + Long.toString(writeId + numOfWriteIds) + ")"; + closeStmt(pStmt); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + pStmt.execute(); } else { long nextWriteId = rs.getLong(1); writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated - s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds) - + " where nwi_database = " + quoteString(dbName) - + " and nwi_table = " + quoteString(tblName); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + s = "update NEXT_WRITE_ID set nwi_next = " + Long.toString(writeId + numOfWriteIds) + + " where nwi_database = ? and nwi_table = ?"; + closeStmt(pStmt); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + pStmt.executeUpdate(); // For repl flow, if the source write id is mismatching with target next write id, then current // metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID. // This is possible in case of first incremental repl after bootstrap where concurrent write // and drop table was performed at source during bootstrap dump. if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) { - s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName) - + " and t2w_table = " + quoteString(tblName); - LOG.debug("Going to execute delete <" + s + ">"); - stmt.executeUpdate(s); + s = "delete from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ?"; + closeStmt(pStmt); + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute delete <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + pStmt.executeUpdate(); } } // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated // write ids List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); for (long txn : txnIds) { - rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + rows.add(txn + ", ?, ?, " + writeId); txnToWriteIds.add(new TxnToWriteId(txn, writeId)); + paramsList.add(params); LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); writeId++; } // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids - List inserts = sqlGenerator.createInsertValuesStmt( - "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows, + paramsList); + for (PreparedStatement pst : insertPreparedStmts) { + pst.execute(); } if (transactionalListeners != null) { @@ -1574,6 +1634,12 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + if (insertPreparedStmts != null) { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } + closeStmt(pStmt); close(rs, stmt, dbConn); if(handle != null) { handle.releaseLocks(); @@ -1589,12 +1655,11 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pst = null; TxnStore.MutexAPI.LockHandle handle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry @@ -1603,11 +1668,12 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) // First allocation of write id should add the table to the next_write_id meta table // The initial value for write id should be 1 and hence we add 1 with number of write ids // allocated here - String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," + - Long.toString(rqst.getSeeWriteId() + 1) + ")"; - LOG.debug("Going to execute insert <" + s + ">"); - stmt.execute(s); + String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, " + + Long.toString(rqst.getSeeWriteId() + 1) + ")"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(rqst.getDbName(), rqst.getTblName())); + LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", + quoteString(rqst.getDbName()), quoteString(rqst.getTblName())); + pst.execute(); LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { @@ -1617,7 +1683,7 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + close(null, pst, dbConn); if(handle != null) { handle.releaseLocks(); } @@ -1737,12 +1803,11 @@ public Materialization getMaterializationInvalidationInfo( // We are composing a query that returns a single row if an update happened after // the materialization was created. Otherwise, query returns 0 rows. Connection dbConn = null; - Statement stmt = null; + PreparedStatement pst = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - stmt.setMaxRows(1); + List params = new ArrayList<>(); StringBuilder query = new StringBuilder(); // compose a query that select transactions containing an update... query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND ("); @@ -1754,7 +1819,10 @@ public Materialization getMaterializationInvalidationInfo( query.append("OR"); } String[] names = TxnUtils.getDbTableName(fullyQualifiedName); - query.append(" (ctc_database=" + quoteString(names[0]) + " AND ctc_table=" + quoteString(names[1])); + assert(names.length == 2); + query.append(" (ctc_database=? AND ctc_table=?"); + params.add(names[0]); + params.add(names[1]); ValidWriteIdList tblValidWriteIdList = validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName); if (tblValidWriteIdList == null) { @@ -1780,7 +1848,9 @@ public Materialization getMaterializationInvalidationInfo( if (LOG.isDebugEnabled()) { LOG.debug("Going to execute query <" + s + ">"); } - rs = stmt.executeQuery(s); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + pst.setMaxRows(1); + rs = pst.executeQuery(); return new Materialization(rs.next()); } catch (SQLException ex) { @@ -1788,7 +1858,7 @@ public Materialization getMaterializationInvalidationInfo( throw new MetaException("Unable to retrieve materialization invalidation information due to " + StringUtils.stringifyException(ex)); } finally { - close(rs, stmt, dbConn); + close(rs, pst, dbConn); } } @@ -1802,7 +1872,7 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, TxnStore.MutexAPI.LockHandle handle = null; Connection dbConn = null; - Statement stmt = null; + PreparedStatement pst = null; ResultSet rs = null; try { lockInternal(); @@ -1813,13 +1883,14 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, */ handle = getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name()); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); + List params = Arrays.asList(dbName, tableName); String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" + - " mrl_db_name =" + quoteString(dbName) + - " AND mrl_tbl_name=" + quoteString(tableName); - LOG.debug("Going to execute query <" + selectQ + ">"); - rs = stmt.executeQuery(selectQ); + " mrl_db_name = ? AND mrl_tbl_name = ?"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, selectQ, params); + LOG.debug("Going to execute query <" + selectQ.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tableName)); + rs = pst.executeQuery(); if(rs.next()) { LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName + " since it is already being rebuilt"); @@ -1827,9 +1898,12 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, } String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " + "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId + - ", '" + dbName + "', '" + tableName + "', " + Instant.now().toEpochMilli() + ")"; - LOG.debug("Going to execute update <" + insertQ + ">"); - stmt.executeUpdate(insertQ); + ", ?, ?, " + Instant.now().toEpochMilli() + ")"; + closeStmt(pst); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, insertQ, params); + LOG.debug("Going to execute update <" + insertQ.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tableName)); + pst.executeUpdate(); LOG.debug("Going to commit"); dbConn.commit(); return new LockResponse(txnId, LockState.ACQUIRED); @@ -1838,7 +1912,7 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, throw new MetaException("Unable to retrieve materialization invalidation information due to " + StringUtils.stringifyException(ex)); } finally { - close(rs, stmt, dbConn); + close(rs, pst, dbConn); if(handle != null) { handle.releaseLocks(); } @@ -1851,19 +1925,19 @@ public boolean heartbeatLockMaterializationRebuild(String dbName, String tableNa throws MetaException { try { Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; + PreparedStatement pst = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); String s = "update MATERIALIZATION_REBUILD_LOCKS" + " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() + " where mrl_txn_id = " + txnId + - " AND mrl_db_name =" + quoteString(dbName) + - " AND mrl_tbl_name=" + quoteString(tableName); - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); + " AND mrl_db_name = ?" + + " AND mrl_tbl_name = ?"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tableName)); + LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tableName)); + int rc = pst.executeUpdate(); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -1884,7 +1958,7 @@ public boolean heartbeatLockMaterializationRebuild(String dbName, String tableNa throw new MetaException("Unable to heartbeat rebuild lock due to " + StringUtils.stringifyException(e)); } finally { - close(rs, stmt, dbConn); + close(null, pst, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -2019,6 +2093,8 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc Connection dbConn = null; try { Statement stmt = null; + PreparedStatement pStmt = null; + List insertPreparedStmts = null; ResultSet rs = null; ResultSet lockHandle = null; try { @@ -2056,6 +2132,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc if (txnid > 0) { List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); // For each component in this lock request, // add an entry to the txn_components table for (LockComponent lc : rqst.getComponent()) { @@ -2115,30 +2192,41 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc // may return empty result sets. // Get the write id allocated by this txn for the given table writes s = "select t2w_writeid from TXN_TO_WRITE_ID where" - + " t2w_database = " + quoteString(dbName) - + " and t2w_table = " + quoteString(tblName) - + " and t2w_txnid = " + txnid; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + + " t2w_database = ? and t2w_table = ? and t2w_txnid = " + txnid; + pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); + LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + rs = pStmt.executeQuery(); if (rs.next()) { writeId = rs.getLong(1); } } - rows.add(txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'")+ "," + + rows.add(txnid + ", ?, " + + (tblName == null ? "null" : "?") + ", " + + (partName == null ? "null" : "?")+ "," + quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," + (writeId == null ? "null" : writeId)); + List params = new ArrayList<>(); + params.add(dbName); + if (tblName != null) { + params.add(tblName); + } + if (partName != null) { + params.add(partName); + } + paramsList.add(params); } - List queries = sqlGenerator.createInsertValuesStmt( - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows); - for(String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int modCount = stmt.executeUpdate(query); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", + rows, paramsList); + for(PreparedStatement pst : insertPreparedStmts) { + int modCount = pst.executeUpdate(); + closeStmt(pst); } + insertPreparedStmts = null; } - List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); long intLockId = 0; for (LockComponent lc : rqst.getComponent()) { if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && @@ -2170,24 +2258,40 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc break; } long now = getDbTime(dbConn); - rows.add(extLockId + ", " + intLockId + "," + txnid + ", " + - quoteString(dbName) + ", " + - valueOrNullLiteral(tblName) + ", " + - valueOrNullLiteral(partName) + ", " + + rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + + ((tblName == null) ? "null" : "?") + ", " + + ((partName == null) ? "null" : "?") + ", " + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + //for locks associated with a txn, we always heartbeat txn and timeout based on that (isValidTxn(txnid) ? 0 : now) + ", " + - valueOrNullLiteral(rqst.getUser()) + ", " + - valueOrNullLiteral(rqst.getHostname()) + ", " + - valueOrNullLiteral(rqst.getAgentInfo()));// + ")"; + ((rqst.getUser() == null) ? "null" : "?") + ", " + + ((rqst.getHostname() == null) ? "null" : "?") + ", " + + ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; + List params = new ArrayList<>(); + params.add(dbName); + if (tblName != null) { + params.add(tblName); + } + if (partName != null) { + params.add(partName); + } + if (rqst.getUser() != null) { + params.add(rqst.getUser()); + } + if (rqst.getHostname() != null) { + params.add(rqst.getHostname()); + } + if (rqst.getAgentInfo() != null) { + params.add(rqst.getAgentInfo()); + } + paramsList.add(params); } - List queries = sqlGenerator.createInsertValuesStmt( + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " + "hl_table, hl_partition,hl_lock_state, hl_lock_type, " + - "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows); - for(String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - int modCount = stmt.executeUpdate(query); + "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows, paramsList); + for(PreparedStatement pst : insertPreparedStmts) { + int modCount = pst.executeUpdate(); } dbConn.commit(); success = true; @@ -2199,7 +2303,13 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + if (insertPreparedStmts != null) { + for (PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } close(lockHandle); + closeStmt(pStmt); close(rs, stmt, null); if (!success) { /* This needs to return a "live" connection to be used by operation that follows it. @@ -2399,10 +2509,9 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { ShowLocksResponse rsp = new ShowLocksResponse(); List elems = new ArrayList<>(); List sortedList = new ArrayList<>(); - Statement stmt = null; + PreparedStatement pst = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," + @@ -2412,22 +2521,26 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { String dbName = rqst.getDbname(); String tableName = rqst.getTablename(); String partName = rqst.getPartname(); + List params = new ArrayList<>(); StringBuilder filter = new StringBuilder(); if (dbName != null && !dbName.isEmpty()) { - filter.append("hl_db=").append(quoteString(dbName)); + filter.append("hl_db=?"); + params.add(dbName); } if (tableName != null && !tableName.isEmpty()) { if (filter.length() > 0) { filter.append(" and "); } - filter.append("hl_table=").append(quoteString(tableName)); + filter.append("hl_table=?"); + params.add(tableName); } if (partName != null && !partName.isEmpty()) { if (filter.length() > 0) { filter.append(" and "); } - filter.append("hl_partition=").append(quoteString(partName)); + filter.append("hl_partition=?"); + params.add(partName); } String whereClause = filter.toString(); @@ -2435,8 +2548,9 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { s = s + " where " + whereClause; } - LOG.debug("Doing to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = pst.executeQuery(); while (rs.next()) { ShowLocksResponseElement e = new ShowLocksResponseElement(); e.setLockid(rs.getLong(1)); @@ -2481,7 +2595,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); + closeStmt(pst); closeDbConn(dbConn); } //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined @@ -2615,20 +2729,19 @@ public long getTxnIdForWriteId( String dbName, String tblName, long writeId) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pst = null; try { /** * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); String query = "select t2w_txnid from TXN_TO_WRITE_ID where" - + " t2w_database = " + quoteString(dbName) - + " and t2w_table = " + quoteString(tblName) - + " and t2w_writeid = " + writeId; - LOG.debug("Going to execute query <" + query + ">"); - ResultSet rs = stmt.executeQuery(query); + + " t2w_database = ? and t2w_table = ? and t2w_writeid = " + writeId; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, Arrays.asList(dbName, tblName)); + LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">", + quoteString(dbName), quoteString(tblName)); + ResultSet rs = pst.executeQuery(); long txnId = -1; if (rs.next()) { txnId = rs.getLong(1); @@ -2642,7 +2755,7 @@ public long getTxnIdForWriteId( throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { - close(null, stmt, dbConn); + close(null, pst, dbConn); } } catch (RetryException e) { return getTxnIdForWriteId(dbName, tblName, writeId); @@ -2656,6 +2769,7 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + PreparedStatement pst = null; TxnStore.MutexAPI.LockHandle handle = null; try { lockInternal(); @@ -2670,20 +2784,24 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { long id = generateCompactionQueueId(stmt); + List params = new ArrayList<>(); StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where"). append(" cq_state IN(").append(quoteChar(INITIATED_STATE)). append(",").append(quoteChar(WORKING_STATE)). - append(") AND cq_database=").append(quoteString(rqst.getDbname())). - append(" AND cq_table=").append(quoteString(rqst.getTablename())).append(" AND "); + append(") AND cq_database=?"). + append(" AND cq_table=?").append(" AND "); + params.add(rqst.getDbname()); + params.add(rqst.getTablename()); if(rqst.getPartitionname() == null) { sb.append("cq_partition is null"); - } - else { - sb.append("cq_partition=").append(quoteString(rqst.getPartitionname())); + } else { + sb.append("cq_partition=?"); + params.add(rqst.getPartitionname()); } + pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); LOG.debug("Going to execute query <" + sb.toString() + ">"); - ResultSet rs = stmt.executeQuery(sb.toString()); + ResultSet rs = pst.executeQuery(); if(rs.next()) { long enqueuedId = rs.getLong(1); String state = compactorStateToResponse(rs.getString(2).charAt(0)); @@ -2693,6 +2811,8 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { return new CompactionResponse(enqueuedId, state, false); } close(rs); + closeStmt(pst); + params.clear(); StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + "cq_table, "); String partName = rqst.getPartitionname(); @@ -2704,14 +2824,16 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { if (rqst.getRunas() != null) buf.append(", cq_run_as"); buf.append(") values ("); buf.append(id); - buf.append(", '"); - buf.append(rqst.getDbname()); - buf.append("', '"); - buf.append(rqst.getTablename()); - buf.append("', '"); + buf.append(", ?"); + buf.append(", ?"); + buf.append(", "); + params.add(rqst.getDbname()); + params.add(rqst.getTablename()); if (partName != null) { - buf.append(partName); - buf.append("', '"); + buf.append("?, '"); + params.add(partName); + } else { + buf.append("'"); } buf.append(INITIATED_STATE); buf.append("', '"); @@ -2729,18 +2851,20 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { dbConn.rollback(); throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); } + buf.append("'"); if (rqst.getProperties() != null) { - buf.append("', '"); - buf.append(new StringableMap(rqst.getProperties()).toString()); + buf.append(", ?"); + params.add(new StringableMap(rqst.getProperties()).toString()); } if (rqst.getRunas() != null) { - buf.append("', '"); - buf.append(rqst.getRunas()); + buf.append(", ?"); + params.add(rqst.getRunas()); } - buf.append("')"); + buf.append(")"); String s = buf.toString(); + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + pst.executeUpdate(); LOG.debug("Going to commit"); dbConn.commit(); return new CompactionResponse(id, INITIATED_RESPONSE, true); @@ -2751,6 +2875,7 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { + closeStmt(pst); closeStmt(stmt); closeDbConn(dbConn); if(handle != null) { @@ -2859,6 +2984,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) Connection dbConn = null; Statement stmt = null; ResultSet lockHandle = null; + List insertPreparedStmts = null; try { try { lockInternal(); @@ -2878,18 +3004,22 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) Long writeId = rqst.getWriteid(); List rows = new ArrayList<>(); + List> paramsList = new ArrayList<>(); for (String partName : rqst.getPartitionnames()) { - rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname())) - + "," + quoteString(normalizeCase(rqst.getTablename())) + - "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId); + rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId); + List params = new ArrayList<>(); + params.add(normalizeCase(rqst.getDbname())); + params.add(normalizeCase(rqst.getTablename())); + params.add(partName); + paramsList.add(params); } int modCount = 0; //record partitions that were written to - List queries = sqlGenerator.createInsertValuesStmt( - "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows); - for(String query : queries) { - LOG.debug("Going to execute update <" + query + ">"); - modCount = stmt.executeUpdate(query); + insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, + "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", + rows, paramsList); + for(PreparedStatement pst : insertPreparedStmts) { + modCount = pst.executeUpdate(); } LOG.debug("Going to commit"); dbConn.commit(); @@ -2900,6 +3030,11 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { + if (insertPreparedStmts != null) { + for(PreparedStatement pst : insertPreparedStmts) { + closeStmt(pst); + } + } close(lockHandle, stmt, dbConn); unlockInternal(); }