diff --git common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java new file mode 100644 index 0000000..e778cd5 --- /dev/null +++ common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hive.common.classification; + + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * These annotations are meant to indicate how to handle retry logic. + * Initially meant for Metastore API. + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate("Hive developer") +public class RetrySemantics { + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface Idempotent { + String[] value() default ""; + } + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface ReadOnly {} + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface CannotRetry {} +} diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 75a58c6..36a944f 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -101,6 +101,18 @@ * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is * still valid and active. In the code this is usually achieved at the same time the txn record * is locked for some operation. + * + * Note on retry logic: + * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}. The retry logic there is very + * generic and is not aware whether the operations are idempotent or not. (This is separate from + * retry logic here in TxnHander which can/does retry DB errors intelligently). The worst case is + * when an op here issues a successful commit against the RDBMS but the calling stack doesn't + * receive the ack and retries. (If an op fails before commit, it's trivially idempotent) + * Thus the ops here need to be made idempotent as much as possible or + * the metstore call stack should have logic not to retry. There are {@link RetrySemantics} + * annotations to document the behavior. {@link RetrySemantics.Idempotent} is used somewhat loosely + * to mean a more general safe-to-retry. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -120,6 +132,8 @@ // Transaction states static final protected char TXN_ABORTED = 'a'; static final protected char TXN_OPEN = 'o'; + //todo: make these like OperationType and remove above char constatns + enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} // Lock states static final protected char LOCK_ACQUIRED = 'a'; @@ -263,7 +277,8 @@ public void setConf(HiveConf conf) { dumpConfig = false; } } - + @Override + @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -339,7 +354,8 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { return getOpenTxnsInfo(); } } - + @Override + @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -414,6 +430,12 @@ private static void startHouseKeeperService(HiveConf conf, Class c){ } } + /** + * Retry-by-caller note: + * Worst case, it will leave an open txn which will timeout. + */ + @Override + @RetrySemantics.Idempotent public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (openTxnsCounter == null) { synchronized (TxnHandler.class) { @@ -515,7 +537,8 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { return openTxns(rqst); } } - + @Override + @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { long txnid = rqst.getTxnid(); try { @@ -525,10 +548,14 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); stmt = dbConn.createStatement(); - ensureValidTxn(dbConn, txnid, stmt); + TxnStatus status = findTxnState(txnid,stmt); + if(status == TxnStatus.ABORTED) { + LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) + + ") requested by it is already " + TxnStatus.ABORTED); + return; + } + raiseTxnUnexpectedState(status, txnid); } LOG.debug("Going to commit"); @@ -547,7 +574,8 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept abortTxn(rqst); } } - + @Override + @RetrySemantics.Idempotent public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { List txnids = rqst.getTxn_ids(); try { @@ -556,7 +584,7 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); int numAborted = abortTxns(dbConn, txnids, false); if (numAborted != txnids.size()) { - LOG.warn("Abort Transactions command only abort " + numAborted + " out of " + + LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + txnids.size() + " transactions. It's possible that the other " + (txnids.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); @@ -602,6 +630,8 @@ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaExce * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) * 'x' would be updated to the same value by both, i.e. lost update. */ + @Override + @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); @@ -622,9 +652,19 @@ public void commitTxn(CommitTxnRequest rqst) */ lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); if (lockHandle == null) { - //this also ensures that txn is still there and in expected state (hasn't been timed out) - ensureValidTxn(dbConn, txnid, stmt); + //if here, txn was not found (in expected state) + TxnStatus actualTxnStatus = findTxnState(txnid, stmt); + if(actualTxnStatus == TxnStatus.COMMITTED) { + /** + * This makes the operation idempotent + * (assume that this is most likely due to retry logic) + */ + LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg"); + return; + } + raiseTxnUnexpectedState(actualTxnStatus, txnid); shouldNeverHappen(txnid); + //dbConn is rolled back in finally{} } String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; @@ -645,9 +685,17 @@ public void commitTxn(CommitTxnRequest rqst) } long commitId = commitIdRs.getLong(1); Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of it's columns + */ int numCompsWritten = stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + - " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** * see if there are any overlapping txns wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, @@ -750,6 +798,7 @@ public void commitTxn(CommitTxnRequest rqst) } } @Override + @RetrySemantics.Idempotent public void performWriteSetGC() { Connection dbConn = null; Statement stmt = null; @@ -798,7 +847,17 @@ public void performWriteSetGC() { * connection (but separate transactions). This avoid some flakiness in BONECP where if you * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one * doesn't see results of the first. + * + * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case + * there will be a duplicate set of locks but both sets will belong to the same txn so they + * will not conflict with each other. For locks w/o txn context (i.e. read-only query), this + * may lead to deadlock (at least a long wait). (e.g. 1st call creates locks in {@code LOCK_WAITING} + * mode and response gets lost. Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * retries, and enqueues another set of locks in LOCK_WAITING. The 2nd LockResponse is delivered + * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st + * set of locks times out. */ + @RetrySemantics.CannotRetry public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); try { @@ -1063,7 +1122,12 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking * against doesn't move from W to A in another txn) but this method can heartbeat in * separate txn at READ_COMMITTED. + * + * Retry-by-caller note: + * Retryable because {@link #checkLock(Connection, long)} is */ + @Override + @RetrySemantics.Idempotent public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -1112,6 +1176,7 @@ public LockResponse checkLock(CheckLockRequest rqst) * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ + @RetrySemantics.Idempotent public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { try { @@ -1146,11 +1211,15 @@ public void unlock(UnlockRequest rqst) if(info == null) { //didn't find any lock with extLockId but at ReadCommitted there is a possibility that //it existed when above delete ran but it didn't have the expected state. - LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" + + JavaUtils.lockIdToString(rqst.getLockid()) + ")"); + //bail here to make the operation idempotent + return; } if(info.txnId != 0) { String msg = "Unlocking locks associated with transaction not permitted. " + info; + //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state + // which really means that the caller wants to give up waiting for the lock LOG.error(msg); throw new TxnOpenException(msg); } @@ -1189,6 +1258,7 @@ public void unlock(UnlockRequest rqst) this.e = e; } } + @RetrySemantics.ReadOnly public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { try { Connection dbConn = null; @@ -1297,6 +1367,8 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { * {@code ids} should only have txnid or lockid but not both, ideally. * Currently DBTxnManager.heartbeat() enforces this. */ + @Override + @RetrySemantics.Idempotent public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -1318,7 +1390,8 @@ public void heartbeat(HeartbeatRequest ids) heartbeat(ids); } } - + @Override + @RetrySemantics.Idempotent public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) throws MetaException { try { @@ -1379,6 +1452,7 @@ long generateCompactionQueueId(Statement stmt) throws SQLException, MetaExceptio return id; } @Override + @RetrySemantics.Idempotent public CompactionResponse compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { @@ -1503,6 +1577,7 @@ private static String compactorStateToResponse(char s) { return Character.toString(s); } } + @RetrySemantics.ReadOnly public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(new ArrayList()); Connection dbConn = null; @@ -1574,6 +1649,13 @@ private static void shouldNeverHappen(long txnid, long extLockId, long intLockId + JavaUtils.lockIdToString(extLockId) + " " + intLockId); } + /** + * Retry-by-caller note: + * This may be retried after dbConn.commit. At worst, it will create duplicate entries in + * TXN_COMPONENTS which won't affect anything. See more comments in {@link #commitTxn(CommitTxnRequest)} + */ + @Override + @RetrySemantics.Idempotent public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; @@ -1629,7 +1711,11 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) /** * Clean up corresponding records in metastore tables when corresponding object is dropped, * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc + * operations. */ + @Override + @RetrySemantics.Idempotent public void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator partitionIterator) throws MetaException { try { @@ -2322,6 +2408,7 @@ private static boolean isValidTxn(long txnId) { * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid, * checkLock() will in the worst case keep locks in Waiting state a little longer. */ + @RetrySemantics.Idempotent("See @Idempotent in the body of the method") private LockResponse checkLock(Connection dbConn, long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { TxnStore.MutexAPI.LockHandle handle = null; @@ -2331,7 +2418,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId) /** * todo: Longer term we should pass this from client somehow - this would be an optimization; once * that is in place make sure to build and test "writeSet" below using OperationType not LockType - * With SP we assume that the query modifies exactly the partitions it locked. (not entirely + * With Static Partitions we assume that the query modifies exactly the partitions it locked. (not entirely * realistic since Update/Delete may have some predicate that filters out all records out of * some partition(s), but plausible). For DP, we acquire locks very wide (all known partitions), * but for most queries only a fraction will actually be updated. #addDynamicPartitions() tells @@ -2517,6 +2604,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId) // If we've found it and it's already been marked acquired, // then just look at the other locks. if (locks[index].state == LockState.ACQUIRED) { + /**this is what makes this method @Idempotent*/ continue; } @@ -2720,6 +2808,54 @@ private void heartbeatTxn(Connection dbConn, long txnid) } } + /** + * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot: + * 1. txnid was Aborted/Committed and then GC'd (compacted) + * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) + */ + private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, MetaException { + String s = "select txn_state from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + s = sqlGenerator.addLimitClause(1, "1 from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs2 = stmt.executeQuery(s); + if(rs2.next() && rs2.getInt(1) > 0) { + return TxnStatus.COMMITTED; + } + return TxnStatus.UNKNOWN; + } + char txnState = rs.getString(1).charAt(0); + if (txnState == TXN_ABORTED) { + return TxnStatus.ABORTED; + } + assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; + return TxnStatus.OPEN; + } + + /** + * Used to raise an informative error when the caller expected a txn in a particular TxnStatus + * but found it in some other status + */ + private static void raiseTxnUnexpectedState(TxnStatus actualStatus, long txnid) + throws NoSuchTxnException, TxnAbortedException { + switch (actualStatus) { + case ABORTED: + throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + " already aborted"); + case COMMITTED: + throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); + case UNKNOWN: + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); + case OPEN: + throw new NoSuchTxnException(JavaUtils.txnIdToString(txnid) + " is " + TxnStatus.OPEN); + default: + throw new IllegalArgumentException("Unknown TxnStatus " + actualStatus); + } + } + /** + * Returns the state of the transaction with {@code txnid} or throws if {@code raiseError} is true. + */ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt) throws SQLException, NoSuchTxnException, TxnAbortedException { // We need to check whether this transaction is valid and open @@ -2734,7 +2870,7 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt //possible for for multi-stmt txns boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; LOG.debug("Going to rollback"); - dbConn.rollback(); + rollbackDBConn(dbConn); if(alreadyCommitted) { //makes the message more informative - helps to find bugs in client code throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); @@ -2743,7 +2879,7 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt } if (rs.getString(1).charAt(0) == TXN_ABORTED) { LOG.debug("Going to rollback"); - dbConn.rollback(); + rollbackDBConn(dbConn); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change } @@ -2869,6 +3005,7 @@ private void timeOutLocks(Connection dbConn, long now) { * Will also delete locks which are not associated with a transaction and have timed out * Tries to keep transactions (against metastore db) small to reduce lock contention. */ + @RetrySemantics.Idempotent public void performTimeOuts() { Connection dbConn = null; Statement stmt = null; @@ -2939,7 +3076,8 @@ public void performTimeOuts() { close(rs, stmt, dbConn); } } - + @Override + @RetrySemantics.ReadOnly public void countOpenTxns() throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -3244,6 +3382,7 @@ private void unlockInternal() { } } @Override + @RetrySemantics.Idempotent public MutexAPI getMutexAPI() { return this; } @@ -3360,6 +3499,7 @@ public void releaseLocks() { } /** * Helper class that generates SQL queries with syntax specific to target DB + * todo: why throw MetaException? */ @VisibleForTesting static final class SQLGenerator { @@ -3459,7 +3599,7 @@ String addForUpdateClause(String selectStatement) throws MetaException { } /** * Suppose you have a query "select a,b from T" and you want to limit the result set - * to the first 5 rows. The mechanism to do that differs in different DB. + * to the first 5 rows. The mechanism to do that differs in different DBs. * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the * appropriately modified row limiting query. * diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 879ae55..a8e1a06 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; @@ -68,6 +69,7 @@ * @return information about open transactions * @throws MetaException */ + @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; /** @@ -75,12 +77,14 @@ * @return list of open transactions, as well as a high water mark. * @throws MetaException */ + @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** * Get the count for open transactions. * @throws MetaException */ + @RetrySemantics.ReadOnly public void countOpenTxns() throws MetaException; /** @@ -97,6 +101,7 @@ * @throws NoSuchTxnException * @throws MetaException */ + @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException; /** @@ -114,6 +119,7 @@ * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.Idempotent public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; @@ -151,6 +157,7 @@ public LockResponse checkLock(CheckLockRequest rqst) * @throws TxnOpenException * @throws MetaException */ + @RetrySemantics.Idempotent public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException; @@ -160,6 +167,7 @@ public void unlock(UnlockRequest rqst) * @return lock information. * @throws MetaException */ + @RetrySemantics.ReadOnly public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; /** @@ -179,6 +187,7 @@ public void heartbeat(HeartbeatRequest ids) * @return info on txns that were heartbeated * @throws MetaException */ + @RetrySemantics.Idempotent public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) throws MetaException; @@ -189,6 +198,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst * @return id of the compaction that has been started or existing id if this resource is already scheduled * @throws MetaException */ + @RetrySemantics.Idempotent public CompactionResponse compact(CompactionRequest rqst) throws MetaException; /** @@ -197,6 +207,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst * @return compaction information * @throws MetaException */ + @RetrySemantics.ReadOnly public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; /** @@ -223,6 +234,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, /** * Timeout transactions and/or locks. This should only be called by the compactor. */ + @RetrySemantics.Idempotent public void performTimeOuts(); /** @@ -234,6 +246,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ + @RetrySemantics.ReadOnly public Set findPotentialCompactions(int maxAborted) throws MetaException; /** @@ -250,6 +263,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * @param workerId id of the worker calling this, will be recorded in the db * @return an info element for this compaction request, or null if there is no work to do now. */ + @RetrySemantics.ReadOnly public CompactionInfo findNextToCompact(String workerId) throws MetaException; /** @@ -264,6 +278,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * be cleaned. * @return information on the entry in the queue. */ + @RetrySemantics.ReadOnly public List findReadyToClean() throws MetaException; /** @@ -323,6 +338,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. */ + @RetrySemantics.Idempotent public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; /** @@ -333,12 +349,14 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * it's not recent. * @throws MetaException */ + @RetrySemantics.Idempotent public void purgeCompactionHistory() throws MetaException; /** * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the * transaction metadata once it becomes unnecessary. */ + @RetrySemantics.Idempotent public void performWriteSetGC(); /** @@ -349,6 +367,7 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * @return true if it is ok to compact, false if there have been too many failures. * @throws MetaException */ + @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; @VisibleForTesting @@ -393,5 +412,6 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, * it calls this to update the metadata. * @param id {@link CompactionInfo#id} */ + @RetrySemantics.Idempotent public void setHadoopJobId(String hadoopJobId, long id); }