diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 06defdb910..6ba4d2f5be 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -185,6 +185,11 @@ private static DataSource connPoolMutex; static private boolean doRetryOnConnPool = false; + // Query definitions + private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " + + "(\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(?,?,%s,%s,?,?,?)"; + private List transactionalListeners; /** @@ -592,57 +597,65 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } + long first; String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); + try (ResultSet nextIdRs = stmt.executeQuery(s)) { + if (!nextIdRs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + first = nextIdRs.getLong(1); } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); List txnIds = new ArrayList<>(numTxns); - - List rows = new ArrayList<>(); - List params = new ArrayList<>(); - params.add(rqst.getUser()); - params.add(rqst.getHostname()); - List> paramsList = new ArrayList<>(numTxns); - String dbEpochString = getDbEpochString(); - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?," - + txnType.getValue()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + String currentTime = getDbEpochString(); + String insertQuery = String.format(TXNS_INSERT_QRY, currentTime, currentTime); + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + LOG.debug("Going to execute insert <" + insertQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(insertQuery)) { + long i; + for (i = first; i < first + numTxns; i++) { + txnIds.add(i); + ps.setLong(1, i); + ps.setString(2, Character.toString(TXN_OPEN)); + ps.setString(3, rqst.getUser()); + ps.setString(4, rqst.getHostname()); + ps.setInt(5, txnType.getValue()); + ps.addBatch(); + + if (i % batchSize == 0) { + ps.executeBatch(); + } + } + if (i % batchSize != 0) { + ps.executeBatch(); + } } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. // For a single txn we can do it in a single insert. With multiple txns calculating the // minOpenTxnId for every insert is not cost effective, so caching the value + // Also updating nextTxnId in the same batch for reducing the number of round-trips to the RDBMS + String updateNextTxnId = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); if (txnIds.size() == 1) { - s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + - "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - try (PreparedStatement pstmt = dbConn.prepareStatement(s)) { - pstmt.setLong(1, txnIds.get(0)); - pstmt.execute(); - } + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + stmt.addBatch(updateNextTxnId); + + String minHistoryQuery = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + + "SELECT " + first + ", MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute insert <" + minHistoryQuery + ">"); + stmt.addBatch(minHistoryQuery); + stmt.executeBatch(); LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds); } else { + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + stmt.execute(updateNextTxnId); + s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); LOG.debug("Going to execute query <" + s + ">"); - long minOpenTxnId = -1L; - try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { + long minOpenTxnId; + try (ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { if (!minOpenTxnIdRs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -652,30 +665,33 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } assert (minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } - // Insert transaction entries into MIN_HISTORY_LEVEL. - List inserts = sqlGenerator.createInsertValuesStmt( - "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + String minHistoryQuery = + "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") VALUES (?, ?)"; + LOG.debug("Going to execute insert <" + minHistoryQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(minHistoryQuery)) { + long i; + for (i = first; i < first + numTxns; i++) { + ps.setLong(1, i); + ps.setLong(2, minOpenTxnId); + ps.addBatch(); + + if (i % batchSize == 0) { + ps.executeBatch(); + } + } + if (i % batchSize != 0) { + ps.executeBatch(); + } } LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + ") with min_open_txn: " + minOpenTxnId); } if (rqst.isSetReplPolicy()) { - List rowsRepl = new ArrayList<>(); - for (PreparedStatement pst : insertPreparedStmts) { - pst.close(); - } - insertPreparedStmts.clear(); - params.clear(); - paramsList.clear(); + List rowsRepl = new ArrayList<>(numTxns); + List params = new ArrayList<>(1); + List> paramsList = new ArrayList<>(numTxns); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));