diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java index 49b737ecf9..4b2d95eee2 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java @@ -29,6 +29,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -292,4 +293,25 @@ public String addEscapeCharacters(String s) { return s; } + /** + * Executes the query statements in order using the java.sql.Statement. The queries are executed in + * batches for most of the databases, executed on Oracle using "begin q1;q2;q3;end;" so only 1 actual + * JDBC call is executed. The caller should make it sure that the lenght of the query string will + * not cause any problems + * @param statement The statement used to execute the calls + * @param queries The actual queries to execute in order + * @throws SQLException If there is a problem during execution + */ + public void executeInBatch(Statement statement, String ... queries) throws SQLException { + if (dbProduct == DatabaseProduct.ORACLE) { + String finalQuery = "begin " + String.join(";", queries) + "; end;"; + LOG.debug("Going to execute complex query <" + finalQuery + ">"); + statement.execute(finalQuery); + } else { + for(String query : queries) { + statement.addBatch(query); + } + statement.executeBatch(); + } + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index ef88240a79..6d7ea913f4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -629,4 +629,14 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { throw new MetaException(msg); } } + + /** + * Removes every little bit of suspicious characters from the strings, so no sql injections is + * possible + * @param original The original string + * @return Every special charater orther than '.' is removed from the original + */ + static String cleanString(String original) { + return original.replaceAll("[^\\w\\.]", "_"); + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 74ef88545e..d8ae511825 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -597,57 +597,72 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } + long firstTxnId; String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); + try (ResultSet nextIdRs = stmt.executeQuery(s)) { + if (!nextIdRs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + firstTxnId = nextIdRs.getLong(1); } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); List txnIds = new ArrayList<>(numTxns); - - List rows = new ArrayList<>(); - List params = new ArrayList<>(); - params.add(rqst.getUser()); - params.add(rqst.getHostname()); - List> paramsList = new ArrayList<>(numTxns); - - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); - } + String cleanUserName = TxnDbUtil.cleanString(rqst.getUser()); + String cleanHostName = TxnDbUtil.cleanString(rqst.getHostname()); // Need to register minimum open txnid for current transactions into MIN_HISTORY table. // For a single txn we can do it in a single insert. With multiple txns calculating the // minOpenTxnId for every insert is not cost effective, so caching the value - if (txnIds.size() == 1) { - s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + - "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - try (PreparedStatement pstmt = dbConn.prepareStatement(s)) { - pstmt.setLong(1, txnIds.get(0)); - pstmt.execute(); - } + // Also updating nextTxnId in the same batch for reducing the number of round-trips to the RDBMS + String updateNextTxnId = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (firstTxnId + numTxns); + if (numTxns == 1) { + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + String insertQuery = "INSERT INTO \"TXNS\" " + + "(\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(" + firstTxnId + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + + TxnDbUtil.getEpochFn(dbProduct) + ",'" + cleanUserName + "','" + cleanHostName + "'," + txnType.getValue() + ")"; + LOG.debug("Going to execute insert <" + insertQuery + ">"); + String minHistoryQuery = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + + "SELECT " + firstTxnId + ", MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute minHistory insert <" + minHistoryQuery + ">"); + sqlGenerator.executeInBatch(stmt, updateNextTxnId, insertQuery, minHistoryQuery); + txnIds.add(firstTxnId); LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds); } else { + String insertQuery = "INSERT INTO \"TXNS\" " + + "(\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(?,?," + TxnDbUtil.getEpochFn(dbProduct) + "," + TxnDbUtil.getEpochFn(dbProduct) + ",?,?,?)"; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + LOG.debug("Going to execute insert <" + insertQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(insertQuery)) { + long i; + for (i = 0; i < numTxns; ++i) { + txnIds.add(i); + ps.setLong(1, firstTxnId + i); + ps.setString(2, Character.toString(TXN_OPEN)); + ps.setString(3, cleanUserName); + ps.setString(4, cleanHostName); + ps.setInt(5, txnType.getValue()); + ps.addBatch(); + + if ((i + 1) % batchSize == 0) { + ps.executeBatch(); + } + } + if (i % batchSize != 0) { + ps.executeBatch(); + } + } + + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + stmt.execute(updateNextTxnId); + s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); LOG.debug("Going to execute query <" + s + ">"); - long minOpenTxnId = -1L; - try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { + long minOpenTxnId; + try (ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { if (!minOpenTxnIdRs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -657,30 +672,33 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } assert (minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } - // Insert transaction entries into MIN_HISTORY_LEVEL. - List inserts = sqlGenerator.createInsertValuesStmt( - "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + String minHistoryQuery = + "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") VALUES (?, ?)"; + LOG.debug("Going to execute minHistory insert <" + minHistoryQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(minHistoryQuery)) { + long i; + for (i = firstTxnId; i < firstTxnId + numTxns; i++) { + ps.setLong(1, i); + ps.setLong(2, minOpenTxnId); + ps.addBatch(); + + if (i % batchSize == 0) { + ps.executeBatch(); + } + } + if (i % batchSize != 0) { + ps.executeBatch(); + } } LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + ") with min_open_txn: " + minOpenTxnId); } if (rqst.isSetReplPolicy()) { - List rowsRepl = new ArrayList<>(); - for (PreparedStatement pst : insertPreparedStmts) { - pst.close(); - } - insertPreparedStmts.clear(); - params.clear(); - paramsList.clear(); + List rowsRepl = new ArrayList<>(numTxns); + List params = new ArrayList<>(1); + List> paramsList = new ArrayList<>(numTxns); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); diff --git standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 60be0f9c22..3158cbff0f 100644 --- standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -225,6 +225,11 @@ public void testSQLGenerator() throws Exception { Assert.assertEquals("select MT_COMMENT from AUX_TABLE with (updlock) where MT_KEY1='CheckLock' and MT_KEY2=0", modSql); } + @Test + public void testCleanString() { + Assert.assertEquals("a_b_c_d_._e_f_g", "a;b,c-d_.(e'f\"g"); + } + @Before public void setUp() throws Exception { conf = MetastoreConf.newMetastoreConf();