diff --git shims/common/pom.xml shims/common/pom.xml index 700bd22..fdfb264 100644 --- shims/common/pom.xml +++ shims/common/pom.xml @@ -86,5 +86,10 @@ + + javax.jdo + jdo-api + ${jdo-api.version} + diff --git shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java index d6dc079..76c576e 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java +++ shims/common/src/main/java/org/apache/hadoop/hive/thrift/DBTokenStore.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jdo.JDOCanRetryException; + public class DBTokenStore implements DelegationTokenStore { private static final Logger LOG = LoggerFactory.getLogger(DBTokenStore.class); private Configuration conf; @@ -167,7 +169,12 @@ private Object invokeOnTokenStore(String methName, Object[] params, Class ... } catch (IllegalAccessException e) { throw new TokenStoreException(e); } catch (InvocationTargetException e) { - throw new TokenStoreException(e.getCause()); + Throwable cause = e.getCause(); + if (cause instanceof JDOCanRetryException) { + throw new RecoverableTokenStoreException(cause); + } else { + throw new TokenStoreException(cause); + } } catch (NoSuchMethodException e) { throw new TokenStoreException(e); } diff --git shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java index 867b4ed..604aa85 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java +++ shims/common/src/main/java/org/apache/hadoop/hive/thrift/DelegationTokenStore.java @@ -46,6 +46,18 @@ public TokenStoreException(String message, Throwable cause) { } } + public static class RecoverableTokenStoreException extends TokenStoreException { + private static final long serialVersionUID = -3285573610483682037L; + + public RecoverableTokenStoreException(Throwable cause) { + super(cause); + } + + public RecoverableTokenStoreException(String message, Throwable cause) { + super(message, cause); + } + } + /** * Add new master key. The token store assigns and returns the sequence number. * Caller needs to use the identifier to update the key (since it is embedded in the key). diff --git shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java index 4d910d8..b4e3fbb 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java +++ shims/common/src/main/java/org/apache/hadoop/hive/thrift/TokenStoreDelegationTokenSecretManager.java @@ -317,15 +317,19 @@ public void run() { + ie); } } catch (Throwable t) { - LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " - + t, t); - // Wait 5 seconds too in case of an exception, so we do not end up in busy waiting for - // the solution for this exception - try { - Thread.sleep(5000); // 5 seconds - } catch (InterruptedException ie) { - LOGGER.error("InterruptedException received for ExpiredTokenRemover thread during " + + LOGGER.error("ExpiredTokenRemover thread received unexpected exception. " + t, t); + if (t instanceof DelegationTokenStore.RecoverableTokenStoreException) { + // Wait 5 seconds too in case of an exception, so we do not end up in busy waiting for + // the solution for this exception + try { + Thread.sleep(5000); // 5 seconds + } catch (InterruptedException ie) { + LOGGER.error("InterruptedException received for ExpiredTokenRemover thread during " + "wait in exception sleep " + ie); + } + } else { + LOGGER.error("Unrecoverable error. Exiting..."); + Runtime.getRuntime().exit(-1); } } }