diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index a1bc10955a..e120e160a4 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.util.StringUtils; @@ -38,6 +37,10 @@ import java.util.List; import java.util.Set; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.ABORTED; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.COMMITTED; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.OPEN; + /** * Extends the transaction handler with methods needed only by the compactor threads. These * methods are not available through the thrift interface. @@ -104,8 +107,8 @@ public CompactionTxnHandler() { // Check for aborted txns s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + "FROM \"TXNS\", \"TXN_COMPONENTS\" " + - "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + - "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + quoteChar(ABORTED) + + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + "HAVING COUNT(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); @@ -385,7 +388,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " - + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; + + "AND \"TXN_STATE\" = " + quoteChar(ABORTED) + " AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; @@ -498,8 +501,8 @@ public void cleanTxnToWriteIdTable() throws MetaException { "UNION " + "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + "UNION " + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + - " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(ABORTED) + + " OR \"TXN_STATE\" = " + quoteChar(OPEN) + ") \"RES\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -561,7 +564,7 @@ public void cleanEmptyAbortedAndCommittedTxns() throws MetaException { String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + - " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND " + " (\"TXN_STATE\" = " + quoteChar(ABORTED) + " OR \"TXN_STATE\" = " + quoteChar(COMMITTED) + ") AND " + " \"TXN_ID\" < " + lowWaterMark; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -1142,7 +1145,7 @@ public long findMinOpenTxnIdForCleaner() throws MetaException{ try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(OPEN); LOG.debug("Going to execute query <" + query + ">"); rs = stmt.executeQuery(query); if (!rs.next()) { @@ -1151,7 +1154,7 @@ public long findMinOpenTxnIdForCleaner() throws MetaException{ long numOpenTxns = rs.getLong(1); if (numOpenTxns > 0) { query = "SELECT MIN(\"RES\".\"ID\") FROM (" + - "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(OPEN) + " UNION " + "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " + quoteChar(READY_FOR_CLEANING) + diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 8fded608d0..ab411b0530 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -140,6 +140,8 @@ import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.OpenTxn; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.OpenTxnList; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.LockTypeUtil; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; @@ -152,6 +154,9 @@ import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatchNoCount; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.ABORTED; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.COMMITTED; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus.OPEN; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; @@ -226,15 +231,58 @@ static final protected char MAJOR_TYPE = 'a'; static final protected char MINOR_TYPE = 'i'; - // Transaction states - protected static final char TXN_ABORTED = 'a'; - protected static final char TXN_OPEN = 'o'; - protected static final char TXN_COMMITTED = 'c'; - private static final char TXN_TMP = '_'; + private static final String TXN_TMP_STATE = "_"; - //todo: make these like OperationType and remove above char constants - enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} + enum TxnStatus { + OPEN('o'), ABORTED('a'), COMMITTED('c'), UNKNOWN('u'); + private final char sqlConst; + + TxnStatus(char sqlConst) { + this.sqlConst = sqlConst; + } + + public String toString() { + return Character.toString(sqlConst); + } + + public char getSqlConst() { + return sqlConst; + } + + public TxnState toTxnState() throws MetaException { + switch (this) { + case OPEN: + return TxnState.OPEN; + case ABORTED: + return TxnState.ABORTED; + case COMMITTED: + return TxnState.COMMITTED; + case UNKNOWN: + default: + throw new MetaException("Unexpected transaction state " + sqlConst); + } + } + + public static TxnStatus fromString(String sqlConst) { + return fromString(sqlConst.charAt(0)); + } + + public static TxnStatus fromString(char sqlConst) { + switch (sqlConst) { + case 'o': + return OPEN; + case 'a': + return ABORTED; + case 'c': + return COMMITTED; + case 'u': + return UNKNOWN; + default: + throw new IllegalArgumentException(quoteChar(sqlConst)); + } + } + } // Lock states static final protected char LOCK_ACQUIRED = 'a'; @@ -439,9 +487,49 @@ public Configuration getConf() { return conf; } + @Override @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { + OpenTxnList openTxnsList = getOpenTxnsList(); + List txnInfos = new ArrayList<>(); + for (OpenTxn openTxn : openTxnsList.getOpenTxnList()) { + TxnInfo info = + new TxnInfo(openTxn.getTxnId(), openTxn.getStatus().toTxnState(), openTxn.getUser(), openTxn.getHost()); + info.setStartedTime(openTxn.getStartedTime()); + info.setLastHeartbeatTime(openTxn.getLastHeartBeatTime()); + txnInfos.add(info); + } + return new GetOpenTxnsInfoResponse(openTxnsList.getHwm(), txnInfos); + } + + @Override + @RetrySemantics.ReadOnly + public GetOpenTxnsResponse getOpenTxns() throws MetaException { + OpenTxnList openTxnsList = getOpenTxnsList(); + List openList = new ArrayList<>(); + long minOpenTxn = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); + for (OpenTxn openTxn : openTxnsList.getOpenTxnList()) { + if (openTxn.getStatus() == OPEN) { + minOpenTxn = Math.min(minOpenTxn, openTxn.getTxnId()); + } + if (openTxn.getType() != TxnType.READ_ONLY) { + openList.add(openTxn.getTxnId()); + if (openTxn.getStatus() == ABORTED) { + abortedBits.set(openList.size() - 1); + } + } + } + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(openTxnsList.getHwm(), openList, byteBuffer); + if (minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; + } + + private OpenTxnList getOpenTxnsList() throws MetaException { try { // We need to figure out the HighWaterMark and the list of open transactions. Connection dbConn = null; @@ -459,12 +547,12 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - List txnInfos = new ArrayList<>(); + List txnInfos = new ArrayList<>(); String s = - "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " - + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" - + "FROM \"TXNS\" ORDER BY \"TXN_ID\""; + "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", " + + "\"TXN_LAST_HEARTBEAT\", (" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" + + " FROM \"TXNS\" ORDER BY \"TXN_ID\""; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); /* @@ -486,127 +574,35 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { openTxnLowBoundary++; while (txnId > openTxnLowBoundary) { // Add an empty open transaction for every missing value - txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null)); + txnInfos.add(new OpenTxn(openTxnLowBoundary, OPEN, TxnType.DEFAULT)); + LOG.debug("Open transaction added for missing value in TXNS {}", JavaUtils.txnIdToString(openTxnLowBoundary)); openTxnLowBoundary++; } } else { openTxnLowBoundary = txnId; } - char c = rs.getString(2).charAt(0); - TxnState state; - switch (c) { - case TXN_COMMITTED: + TxnStatus state = TxnStatus.fromString(rs.getString(2)); + if (state == COMMITTED) { // This is only here, to avoid adding this txnId as possible gap continue; - - case TXN_ABORTED: - state = TxnState.ABORTED; - break; - - case TXN_OPEN: - state = TxnState.OPEN; - break; - - default: - throw new MetaException("Unexpected transaction state " + c + " found in txns table"); } - TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4)); - txnInfo.setStartedTime(rs.getLong(5)); - txnInfo.setLastHeartbeatTime(rs.getLong(6)); + OpenTxn txnInfo = new OpenTxn(txnId, state, TxnType.findByValue(rs.getInt(3)), rs.getString(4), + rs.getString(5), rs.getLong(6), rs.getLong(7)); txnInfos.add(txnInfo); } - LOG.debug("Going to rollback"); dbConn.rollback(); - return new GetOpenTxnsInfoResponse(hwm, txnInfos); + LOG.debug("Got OpenTxnList with hwm: {} and openTxnList size {}.", hwm, txnInfos.size()); + return new OpenTxnList(hwm, txnInfos); } catch (SQLException e) { - LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxnsInfo"); + checkRetryable(dbConn, e, "getOpenTxnsList"); throw new MetaException( "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); } } catch (RetryException e) { - return getOpenTxnsInfo(); - } - } - - @Override - @RetrySemantics.ReadOnly - public GetOpenTxnsResponse getOpenTxns() throws MetaException { - try { - // We need to figure out the current transaction number and the list of open transactions. - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - /* - * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} - */ - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - - List openList = new ArrayList<>(); - String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", " - + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")" - + " FROM \"TXNS\" ORDER BY \"TXN_ID\""; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - long hwm = 0; - long openTxnLowBoundary = 0; - long minOpenTxn = Long.MAX_VALUE; - BitSet abortedBits = new BitSet(); - while (rs.next()) { - long txnId = rs.getLong(1); - long age = rs.getLong(4); - hwm = txnId; - if (age < getOpenTxnTimeOutMillis()) { - // We will consider every gap as an open transaction from the previous txnId - openTxnLowBoundary++; - while (txnId > openTxnLowBoundary) { - // Add an empty open transaction for every missing value - openList.add(openTxnLowBoundary); - minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary); - openTxnLowBoundary++; - } - } else { - openTxnLowBoundary = txnId; - } - char txnState = rs.getString(2).charAt(0); - if (txnState == TXN_COMMITTED) { - continue; - } - if (txnState == TXN_OPEN) { - minOpenTxn = Math.min(minOpenTxn, txnId); - } - TxnType txnType = TxnType.findByValue(rs.getInt(3)); - if (txnType != TxnType.READ_ONLY) { - openList.add(txnId); - if (txnState == TXN_ABORTED) { - abortedBits.set(openList.size() - 1); - } - } - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); - GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); - if (minOpenTxn < Long.MAX_VALUE) { - otr.setMin_open_txn(minOpenTxn); - } - return otr; - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "getOpenTxns"); - throw new MetaException("Unable to select from transaction database, " - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return getOpenTxns(); + return getOpenTxnsList(); } } @@ -759,7 +755,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { TxnDbUtil.getEpochFn(dbProduct)); LOG.debug("Going to execute insert <" + insertQuery + ">"); try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) { - String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP); + String state = genKeySupport ? OPEN.toString() : TXN_TMP_STATE; if (numTxns == 1) { ps.setString(1, state); ps.setString(2, rqst.getUser()); @@ -836,7 +832,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } else { try (PreparedStatement pstmt = dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) { - pstmt.setString(1, Character.toString(TXN_TMP)); + pstmt.setString(1, TXN_TMP_STATE); try (ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { txnIds.add(rs.getLong(1)); @@ -845,8 +841,8 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } try (PreparedStatement pstmt = dbConn .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) { - pstmt.setString(1, Character.toString(TXN_OPEN)); - pstmt.setString(2, Character.toString(TXN_TMP)); + pstmt.setString(1, OPEN.toString()); + pstmt.setString(2, TXN_TMP_STATE); pstmt.executeUpdate(); } } @@ -1034,7 +1030,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept txnid = targetTxnIds.get(0); } - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, OPEN); if (txnRecord == null) { TxnStatus status = findTxnState(txnid, stmt); if (status == TxnStatus.ABORTED) { @@ -1090,7 +1086,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws MetaException { List queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ") - .append(quoteChar(TXN_OPEN)).append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()) + .append(quoteChar(OPEN)).append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()) .append(" and "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), @@ -1364,11 +1360,11 @@ public void commitTxn(CommitTxnRequest rqst) * should not normally run concurrently (for same txn) but could due to bugs in the client * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, OPEN); if (txnRecord == null) { //if here, txn was not found (in expected state) TxnStatus actualTxnStatus = findTxnState(txnid, stmt); - if (actualTxnStatus == TxnStatus.COMMITTED) { + if (actualTxnStatus == COMMITTED) { if (rqst.isSetReplPolicy()) { // in case of replication, idempotent is taken care by getTargetTxnId LOG.warn("Invalid state COMMITTED for transactions started using replication replay task"); @@ -1589,7 +1585,7 @@ private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnT } queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid); // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate - queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid); + queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(COMMITTED) + " WHERE \"TXN_ID\" = " + txnid); queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); // execute all in one batch @@ -2253,7 +2249,7 @@ public void performWriteSetGC() throws MetaException { stmt = dbConn.createStatement(); long minOpenTxn; - rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN)); + rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(OPEN)); if (!rs.next()) { throw new IllegalStateException("Scalar query returned no rows?!?!!"); } @@ -2391,7 +2387,8 @@ public LockResponse lockMaterializationRebuild(String dbName, String tableName, throws MetaException { if (LOG.isDebugEnabled()) { - LOG.debug("Acquiring lock for materialization rebuild with txnId={} for {}", txnId, Warehouse.getQualifiedName(dbName,tableName)); + LOG.debug("Acquiring lock for materialization rebuild with {} for {}", + JavaUtils.txnIdToString(txnId), TableName.getDbTable(dbName, tableName)); } TxnStore.MutexAPI.LockHandle handle = null; @@ -2465,7 +2462,7 @@ public boolean heartbeatLockMaterializationRebuild(String dbName, String tableNa if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - LOG.info("No lock found for rebuild of " + Warehouse.getQualifiedName(dbName, tableName) + + LOG.info("No lock found for rebuild of " + TableName.getDbTable(dbName, tableName) + " when trying to heartbeat"); // It could not be renewed, return that information return false; @@ -2478,7 +2475,7 @@ public boolean heartbeatLockMaterializationRebuild(String dbName, String tableNa LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, - "heartbeatLockMaterializationRebuild(" + Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")"); + "heartbeatLockMaterializationRebuild(" + TableName.getDbTable(dbName, tableName) + ", " + txnId + ")"); throw new MetaException("Unable to heartbeat rebuild lock due to " + StringUtils.stringifyException(e)); } finally { @@ -2591,7 +2588,7 @@ private ConnectionLockIdPair(Connection dbConn, long extLockId) { * @throws SQLException * @throws MetaException */ - private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + private TxnRecord lockTransactionRecord(Statement stmt, long txnId, TxnStatus txnState) throws SQLException, MetaException { String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId + (txnState != null ? " AND \"TXN_STATE\" = " + quoteChar(txnState) : ""); try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) { @@ -2614,7 +2611,7 @@ private TxnRecord(int txnType) { * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes * any 2 {@code enqueueLockWithRetry()} calls. * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations - * @see #checkLockWithRetry(Connection, long, long) + * @see #checkLockWithRetry(Connection, long, long, boolean) */ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { boolean success = false; @@ -2628,7 +2625,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc stmt = dbConn.createStatement(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state - TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, OPEN); if (txnRecord == null) { ensureValidTxn(dbConn, txnid, stmt); shouldNeverHappen(txnid); @@ -2912,7 +2909,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long * separate txn at READ_COMMITTED. * * Retry-by-caller note: - * Retryable because {@link #checkLock(Connection, long, long)} is + * Retryable because {@link #checkLock(Connection, long, long, boolean)} is */ @Override @RetrySemantics.SafeToRetry @@ -3211,7 +3208,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } TxnUtils.buildQueryWithINClause(conf, queries, new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + - " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), + " WHERE \"TXN_STATE\" = " + quoteChar(OPEN) + " AND "), new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; for (String query : queries) { @@ -3534,7 +3531,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), OPEN); if (txnRecord == null) { //ensures txn is still there and in expected state ensureValidTxn(dbConn, rqst.getTxnid(), stmt); @@ -4433,8 +4430,8 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe StringBuilder suffix = new StringBuilder(); // add update txns queries to query list - prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED)) - .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND "); + prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(ABORTED)) + .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(OPEN)).append(" AND "); if (checkHeartbeat) { suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ") .append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); @@ -4744,7 +4741,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) } try (Statement stmt = dbConn.createStatement()) { String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + - " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = " + quoteChar(OPEN); LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4774,22 +4771,14 @@ private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, LOG.debug("Going to execute query <" + s + ">"); try (ResultSet rs2 = stmt.executeQuery(s)) { if (rs2.next()) { - return TxnStatus.COMMITTED; + return COMMITTED; } } // could also check WRITE_SET but that seems overkill return TxnStatus.UNKNOWN; } - char txnState = rs.getString(1).charAt(0); - if (txnState == TXN_ABORTED) { - return TxnStatus.ABORTED; - } - if (txnState == TXN_COMMITTED) { - return TxnStatus.COMMITTED; - } - assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; + return TxnStatus.fromString(rs.getString(1)); } - return TxnStatus.OPEN; } /** @@ -4804,8 +4793,8 @@ private boolean isTxnsOpenAndNotReadOnly(List txnIds, Statement stmt) thro // Get the count of txns from the given list that are in open state and not read-only. // If the returned count is same as the input number of txns, then all txns are in open state and not read-only. - prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN - + "' AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND "); + prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(OPEN) + + " AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(), txnIds, "\"TXN_ID\"", false, false); @@ -4842,10 +4831,10 @@ private String getAbortedAndReadOnlyTxns(List txnIds, Statement stmt) thro try (ResultSet rs = stmt.executeQuery(query)) { while (rs.next()) { long txnId = rs.getLong(1); - char txnState = rs.getString(2).charAt(0); + TxnStatus txnState = TxnStatus.fromString(rs.getString(2)); TxnType txnType = TxnType.findByValue(rs.getInt(3)); - if (txnState != TXN_OPEN) { + if (txnState != OPEN) { txnInfo.append("{").append(txnId).append(",").append(txnState).append("}"); } else if (txnType == TxnType.READ_ONLY) { txnInfo.append("{").append(txnId).append(",read-only}"); @@ -4929,7 +4918,7 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } } - if (rs.getString(1).charAt(0) == TXN_ABORTED) { + if (TxnStatus.fromString(rs.getString(1)) == ABORTED) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) @@ -5056,8 +5045,8 @@ public void performTimeOuts() { timeOutLocks(dbConn); while(true) { stmt = dbConn.createStatement(); - String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + - "' AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + + String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(OPEN) + + " AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); @@ -5117,7 +5106,7 @@ public void countOpenTxns() throws MetaException { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + "'"; + String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(OPEN); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -5238,6 +5227,9 @@ static String quoteString(String input) { static String quoteChar(char c) { return "'" + c + "'"; } + static String quoteChar(TxnStatus c) { + return "'" + c.getSqlConst() + "'"; + } static CompactionType dbCompactionType2ThriftType(char dbValue) { switch (dbValue) { case MAJOR_TYPE: @@ -5508,6 +5500,6 @@ public int getLoginTimeout() throws SQLException { public boolean isWrapperFor(Class iface) throws SQLException { throw new UnsupportedOperationException(); } - }; + } } diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 4ee1a45aae..d2aebe995b 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -25,9 +25,11 @@ import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.txn.TxnHandler.TxnStatus; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -391,4 +393,77 @@ private static int querySizeExpected(int sizeSoFar, return size; } + + /** + * Class to represent one row in the TXNS table. + */ + public static class OpenTxn { + + private long txnId; + private TxnStatus status; + private TxnType type; + private long startedTime; + private long lastHeartBeatTime; + private String user; + private String host; + + public OpenTxn(long txnId, TxnStatus status, TxnType type) { + this(txnId, status, type, null, null, 0L, 0L); + } + + public OpenTxn(long txnId, TxnStatus status, TxnType type, String user, String host, long startedTime, long lastHeartBeatTime) { + this.txnId = txnId; + this.status = status; + this.type = type; + this.user = user; + this.host = host; + this.startedTime = startedTime; + this.lastHeartBeatTime = lastHeartBeatTime; + } + + public long getTxnId() { + return txnId; + } + + public TxnStatus getStatus() { + return status; + } + + public long getStartedTime() { + return startedTime; + } + + public long getLastHeartBeatTime() { + return lastHeartBeatTime; + } + + public TxnType getType() { + return type; + } + + public String getUser() { + return user; + } + + public String getHost() { + return host; + } + } + public static class OpenTxnList { + private long hwm; + private List openTxnList; + + public OpenTxnList(long hwm, List openTxnList) { + this.hwm = hwm; + this.openTxnList = openTxnList; + } + + public long getHwm() { + return hwm; + } + + public List getOpenTxnList() { + return openTxnList; + } + } }