diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c3a0..89f99ff8c4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -203,6 +203,9 @@ private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " + "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" + " VALUES (%s, ?, ?, ?, ?, %s)"; + private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " + + "(\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") " + + "VALUES(?,?,%s,%s,?,?,?)"; private List transactionalListeners; @@ -611,57 +614,64 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { txnType = TxnType.REPL_CREATED; } + long firstTxnId; String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""); LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); + try (ResultSet nextIdRs = stmt.executeQuery(s)) { + if (!nextIdRs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + firstTxnId = nextIdRs.getLong(1); } - long first = rs.getLong(1); - s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); List txnIds = new ArrayList<>(numTxns); - List rows = new ArrayList<>(); - List params = new ArrayList<>(); - params.add(rqst.getUser()); - params.add(rqst.getHostname()); - List> paramsList = new ArrayList<>(numTxns); - - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," - + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")", - rows, paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct), TxnDbUtil.getEpochFn(dbProduct)); + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + LOG.debug("Going to execute insert <" + insertQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(insertQuery)) { + for (long i = firstTxnId; i < firstTxnId + numTxns; ++i) { + txnIds.add(i); + ps.setLong(1, i); + ps.setString(2, Character.toString(TXN_OPEN)); + ps.setString(3, rqst.getUser()); + ps.setString(4, rqst.getHostname()); + ps.setInt(5, txnType.getValue()); + ps.addBatch(); + + if ((i - firstTxnId + 1) % batchSize == 0) { + ps.executeBatch(); + } + } + if (numTxns % batchSize != 0) { + ps.executeBatch(); + } } // Need to register minimum open txnid for current transactions into MIN_HISTORY table. // For a single txn we can do it in a single insert. With multiple txns calculating the // minOpenTxnId for every insert is not cost effective, so caching the value + // Also updating nextTxnId in the same batch for reducing the number of round-trips to the RDBMS + String updateNextTxnId = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (firstTxnId + numTxns); if (txnIds.size() == 1) { - s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + - "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - try (PreparedStatement pstmt = dbConn.prepareStatement(s)) { - pstmt.setLong(1, txnIds.get(0)); - pstmt.execute(); - } + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + stmt.addBatch(updateNextTxnId); + + String minHistoryQuery = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + + "SELECT " + firstTxnId + ", MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute insert <" + minHistoryQuery + ">"); + stmt.addBatch(minHistoryQuery); + stmt.executeBatch(); LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds); } else { + LOG.debug("Going to execute update <" + updateNextTxnId + ">"); + stmt.execute(updateNextTxnId); + s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); LOG.debug("Going to execute query <" + s + ">"); - long minOpenTxnId = -1L; - try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { + long minOpenTxnId; + try (ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { if (!minOpenTxnIdRs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -671,30 +681,32 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } assert (minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } - // Insert transaction entries into MIN_HISTORY_LEVEL. - List inserts = sqlGenerator.createInsertValuesStmt( - "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); + String minHistoryQuery = + "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") VALUES (?, ?)"; + LOG.debug("Going to execute insert <" + minHistoryQuery + ">"); + try (PreparedStatement ps = dbConn.prepareStatement(minHistoryQuery)) { + for (long i = firstTxnId; i < firstTxnId + numTxns; ++i) { + ps.setLong(1, i); + ps.setLong(2, minOpenTxnId); + ps.addBatch(); + + if ((i - firstTxnId + 1) % batchSize == 0) { + ps.executeBatch(); + } + } + if (numTxns % batchSize != 0) { + ps.executeBatch(); + } } LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + ") with min_open_txn: " + minOpenTxnId); } if (rqst.isSetReplPolicy()) { - List rowsRepl = new ArrayList<>(); - for (PreparedStatement pst : insertPreparedStmts) { - pst.close(); - } - insertPreparedStmts.clear(); - params.clear(); - paramsList.clear(); + List rowsRepl = new ArrayList<>(numTxns); + List params = new ArrayList<>(1); + List> paramsList = new ArrayList<>(numTxns); params.add(rqst.getReplPolicy()); for (int i = 0; i < numTxns; i++) { rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));