diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index e56653f..ec32da7 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -933,8 +933,22 @@
com.microsoft.azure
azure-storage
2.0.0
-
-
+
+
+ org.postgresql
+ postgresql
+ 9.4-1201-jdbc41
+
+
+ com.h2database
+ h2
+ 1.4.188
+
+
+ com.tngtech.java
+ junit-dataprovider
+ 1.10.1
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 38c0eed..6fcafce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -569,6 +568,24 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE";
+ ////////////////////////////////
+ // SqlStore related Configurations
+ ///////////////////////////////
+
+ public static final String DB_PRFIX = RM_PREFIX + "db.";
+ public static final String RM_DB_TYPE = DB_PRFIX + "type";
+ public static final String RM_DB_HOST = DB_PRFIX + "hostname";
+ public static final String RM_DB_DBNAME = DB_PRFIX + "dbname";
+ public static final String RM_DB_USERNAME = DB_PRFIX + "username";
+ public static final String RM_DB_PASSWORD = DB_PRFIX + "password";
+ public static final String RM_DB_PASSWORD_FILE = DB_PRFIX + "password.file";
+ public static final String RM_DB_RETRIES = DB_PRFIX + "retries";
+ public static final int DEFAULT_RM_DB_RETRIES = 0;
+ public static final String RM_DB_RETRIES_INTERVAL = RM_DB_RETRIES + ".interval.seconds";
+ public static final int DEFAULT_RM_DB_RETRIES_INTERVAL = 10;
+ public static final String RM_DB_VERIFICATION_TIMEOUT = DB_PRFIX + "verification.timeout.millis";
+ public static final int DEFAULT_RM_DB_VERIFICATION_TIMEOUT = 10 * 1000;
+
/**
* RM proxy users' prefix
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 5161ed4..57907e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -34,6 +34,20 @@
+ org.postgresql
+ postgresql
+
+
+ com.h2database
+ h2
+ test
+
+
+ com.tngtech.java
+ junit-dataprovider
+ test
+
+
javax.servlet
servlet-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java
new file mode 100644
index 0000000..4fc5621
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java
@@ -0,0 +1,556 @@
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Credential;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.LocalResultSet;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Operation;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.PSQLOperations;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.SQLConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.SQLOperations;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+
+import sun.util.logging.resources.logging;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Sql based RMStateStore
+ *
+ */
+public class SqlRMStateStore extends RMStateStore {
+
+ @VisibleForTesting
+ protected String uuid = UUID.randomUUID().toString();
+ protected static final String CREDENTIAL_TABLE = "credentialtable";
+ protected static final String APPLICATION_TABLE = "applicationtable";
+ protected static final String ATTEMPT_TABLE = "attempttable";
+ protected static final String RMDELEGATAION_TABLE = "rmdelegationtable";
+ protected static final String RMMASTERKEYS_TABLE = "rmmasterkeystable";
+
+ protected static final String DelegtaionTokenRoot = "DelegtaionTokenRoot";
+ protected static final String RMDTSequentialNumber = "RMDTSequentialNumber";
+
+ protected static final String SEPARATOR = "/";
+ protected static final String VERSION = "version";
+ protected static final String EPOCH = "epoch";
+ protected static final String MASTERCRED = "mastercred";
+ protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
+
+
+ public Credential credential = null;
+
+ private static final Logger LOG = Logger.getLogger(SqlRMStateStore.class);
+
+ static {
+ LOG.info("LOADED CLASS SqlRMStateStore modified on 27 oct 2015");
+ }
+
+ protected int verificationTimeOut = 10 * 1000;
+ private Thread verifyActiveStatusThread;
+
+ protected SQLOperations sqlop = null;
+
+ // Retry policy
+ private int maxRetries = 0;
+ private int retryIntervalSeconds = 0;
+
+ protected SQLOperations createSQLOperationsWithRetry() {
+ SQLOperations operations = new PSQLOperations();
+ RetryPolicy basePolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(maxRetries, retryIntervalSeconds, TimeUnit.SECONDS);
+ Map, RetryPolicy> exceptionToPolicyMap =
+ new HashMap, RetryPolicy>();
+ exceptionToPolicyMap.put(SQLException.class, basePolicy);
+ RetryPolicy methodPolicy =
+ RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map methodNameToPolicyMap = new HashMap();
+ methodNameToPolicyMap.put("doUpdateOrInsert", methodPolicy);
+ methodNameToPolicyMap.put("doDelete", methodPolicy);
+ methodNameToPolicyMap.put("fence", methodPolicy);
+ methodNameToPolicyMap.put("getFromTable", methodPolicy);
+ methodNameToPolicyMap.put("getFromTableRange", methodPolicy);
+ methodNameToPolicyMap.put("doDropTable", methodPolicy);
+ methodNameToPolicyMap.put("doFencingCheck", methodPolicy);
+ methodNameToPolicyMap.put("doMultipleOp", methodPolicy);
+ return (SQLOperations) RetryProxy.create(SQLOperations.class, operations, methodNameToPolicyMap);
+ }
+
+ @Override
+ protected synchronized void initInternal(Configuration conf) throws SQLException {
+ maxRetries = conf.getInt(YarnConfiguration.RM_DB_RETRIES, YarnConfiguration.DEFAULT_RM_DB_RETRIES);
+ retryIntervalSeconds =
+ conf.getInt(YarnConfiguration.RM_DB_RETRIES_INTERVAL, YarnConfiguration.DEFAULT_RM_DB_RETRIES_INTERVAL);
+ verificationTimeOut =
+ conf.getInt(YarnConfiguration.RM_DB_VERIFICATION_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_DB_VERIFICATION_TIMEOUT);
+
+ sqlop = createSQLOperationsWithRetry();
+ sqlop.init(conf);
+ credential = new Credential(CREDENTIAL_TABLE, MASTERCRED, uuid);
+ LOG.debug("Max Retries = " + maxRetries + " Retry Inverval Seconds = " + retryIntervalSeconds
+ + " Verification Timeout " + verificationTimeOut);
+ LOG.debug("Credential " + credential);
+ }
+
+ @Override
+ protected synchronized void startInternal() throws Exception {
+ // Create Various tables if non existing.
+ sqlop.doCreateTable(CREDENTIAL_TABLE);
+ sqlop.doCreateTable(APPLICATION_TABLE);
+ sqlop.doCreateTable(ATTEMPT_TABLE);
+ sqlop.doCreateTable(RMDELEGATAION_TABLE);
+ sqlop.doCreateTable(RMMASTERKEYS_TABLE);
+ sqlop.fence(credential);
+ if (HAUtil.isHAEnabled(getConfig())) {
+ // Start the checker thread if HA is enabled.
+ verifyActiveStatusThread = new VerifyActiveStatusThread();
+ verifyActiveStatusThread.start();
+ }
+ }
+
+ @Override
+ protected synchronized void closeInternal() throws Exception {
+ if (verifyActiveStatusThread != null) {
+ verifyActiveStatusThread.interrupt();
+ verifyActiveStatusThread.join(1000);
+ }
+ sqlop.disconnect();
+ }
+
+ @Override
+ protected synchronized Version loadVersion() throws Exception {
+ LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, VERSION);
+ try {
+ if (!lrs.set.next()) {
+ // No result
+ return null;
+ }
+ return new VersionPBImpl(VersionProto.parseFrom(lrs.set.getBytes(SQLConstants.VALUE_COLUMN)));
+ } finally {
+ lrs.close();
+ }
+ }
+
+ @Override
+ protected synchronized void storeVersion() throws Exception {
+ byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+ sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, VERSION, data, credential);
+ }
+
+ @Override
+ protected Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ @Override
+ public synchronized long getAndIncrementEpoch() throws Exception {
+ long currentEpoch = 0;
+ LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, EPOCH);
+ try {
+ if (!lrs.set.next()) {
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto().toByteArray();
+ sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential);
+ } else {
+ byte[] data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
+ currentEpoch = epoch.getEpoch();
+ // increment epoch and store it
+ byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto().toByteArray();
+ sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential);
+ }
+ return currentEpoch;
+ } finally {
+ lrs.close();
+ }
+ }
+
+ @Override
+ public synchronized RMState loadState() throws Exception {
+ RMState rmState = new RMState();
+ // recover DelegationTokenSecretManager
+ loadRMDTSecretManagerState(rmState);
+ // recover RM applications
+ loadRMAppState(rmState);
+ // recover AMRMTokenSecretManager
+ loadAMRMTokenSecretManagerState(rmState);
+ return rmState;
+ }
+
+ private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception {
+ LocalResultSet lrs = sqlop.getFromTable(RMMASTERKEYS_TABLE, AMRMTOKEN_SECRET_MANAGER_ROOT);
+ try {
+ byte[] data = null;
+ if (lrs.set.next()) {
+ data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ } else {
+ LOG.warn("There is no data saved");
+ return;
+ }
+ AMRMTokenSecretManagerStatePBImpl stateData =
+ new AMRMTokenSecretManagerStatePBImpl(AMRMTokenSecretManagerStateProto.parseFrom(data));
+ rmState.amrmTokenSecretManagerState =
+ AMRMTokenSecretManagerState.newInstance(stateData.getCurrentMasterKey(),
+ stateData.getNextMasterKey());
+ } finally {
+ lrs.close();
+ }
+ }
+
+ private void loadRMAppState(RMState rmState) throws Exception {
+ // Get all applications.
+ LocalResultSet lrs =
+ sqlop.getFromTableRange(APPLICATION_TABLE, ApplicationId.appIdStrPrefix, ApplicationId.appIdStrPrefix
+ + "z");
+ int appCounter = 0;
+ try {
+ while (lrs.set.next()) {
+ ApplicationId appId = ConverterUtils.toApplicationId(lrs.set.getString(SQLConstants.KEY_COLUMN));
+ ApplicationStateDataPBImpl appStateData =
+ new ApplicationStateDataPBImpl(ApplicationStateDataProto.parseFrom(lrs.set
+ .getBytes(SQLConstants.VALUE_COLUMN)));
+ ApplicationState appState =
+ new ApplicationState(appStateData.getSubmitTime(), appStateData.getStartTime(),
+ appStateData.getApplicationSubmissionContext(), appStateData.getUser(),
+ appStateData.getState(), appStateData.getDiagnostics(), appStateData.getFinishTime());
+ if (!appId.equals(appState.context.getApplicationId())) {
+ throw new YarnRuntimeException("The key is different from the application id");
+ }
+ rmState.appState.put(appId, appState);
+ appCounter++;
+ // For each application fetch attempts
+ int attemptCounter = loadApplicationAttemptState(appState, appId);
+ LOG.debug("App loaded " + appId + " with attempts " + attemptCounter);
+ }
+ } finally {
+ LOG.debug("Total apps loaded " + appCounter);
+ lrs.close();
+ }
+ }
+
+ private int loadApplicationAttemptState(ApplicationState appState, ApplicationId appId) throws Exception {
+ LocalResultSet lrs = sqlop.getFromTableRange(ATTEMPT_TABLE, appId.toString(), appId.toString() + "z");
+ int attemptCounter = 0;
+ try {
+ while (lrs.set.next()) {
+ byte[] attemptData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ ApplicationAttemptId attemptId =
+ ConverterUtils.toApplicationAttemptId(lrs.set.getString(SQLConstants.KEY_COLUMN).split(
+ SEPARATOR)[1]);
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl(ApplicationAttemptStateDataProto.parseFrom(attemptData));
+ Credentials credentials = null;
+ if (attemptStateData.getAppAttemptTokens() != null) {
+ credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(attemptStateData.getAppAttemptTokens());
+ credentials.readTokenStorageStream(dibb);
+ }
+ ApplicationAttemptState attemptState =
+ new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials,
+ attemptStateData.getStartTime(), attemptStateData.getState(),
+ attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(),
+ attemptStateData.getFinalApplicationStatus(),
+ attemptStateData.getAMContainerExitStatus(), attemptStateData.getFinishTime(),
+ attemptStateData.getMemorySeconds(), attemptStateData.getVcoreSeconds());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ attemptCounter++;
+ }
+ } finally {
+ lrs.close();
+ }
+ return attemptCounter;
+ }
+
+ private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
+ loadRMDelegationKeyState(rmState);
+ loadRMSequentialNumberState(rmState);
+ loadRMDelegationTokenState(rmState);
+
+ }
+
+ private void loadRMDelegationKeyState(RMState rmState) throws Exception {
+ String keyIn = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
+ LocalResultSet lrs = sqlop.getFromTableRange(RMMASTERKEYS_TABLE, keyIn, keyIn + "z");
+ int delegationKeyCounter = 0;
+ try {
+ while (lrs.set.next()) {
+ byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ ByteArrayInputStream is = new ByteArrayInputStream(childData);
+ DataInputStream fsIn = new DataInputStream(is);
+ try {
+ DelegationKey key = new DelegationKey();
+ key.readFields(fsIn);
+ rmState.rmSecretManagerState.masterKeyState.add(key);
+ delegationKeyCounter++;
+ } finally {
+ is.close();
+ }
+ }
+ } finally {
+ LOG.debug("Total RMDelegationKeys loaded " + delegationKeyCounter);
+ lrs.close();
+ }
+ }
+
+ private void loadRMSequentialNumberState(RMState rmState) throws Exception {
+ LocalResultSet lrs = sqlop.getFromTable(RMDELEGATAION_TABLE, RMDTSequentialNumber);
+ try {
+ if (lrs.set.next()) {
+ byte[] seqData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ if (seqData != null) {
+ ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
+ DataInputStream seqIn = new DataInputStream(seqIs);
+ try {
+ rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
+ } finally {
+ seqIn.close();
+ }
+ }
+ }
+ } finally {
+ lrs.close();
+ }
+ }
+
+ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
+ String keyIn = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX;
+ LocalResultSet lrs = sqlop.getFromTableRange(RMDELEGATAION_TABLE, keyIn, keyIn + "z");
+ int delegationTokenCounter = 0;
+ try {
+ while (lrs.set.next()) {
+ byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN);
+ if (childData == null) {
+ LOG.warn("Content of " + lrs.set.getString(SQLConstants.KEY_COLUMN) + " is broken.");
+ continue;
+ }
+ ByteArrayInputStream is = new ByteArrayInputStream(childData);
+ DataInputStream fsIn = new DataInputStream(is);
+ try {
+ RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData();
+ identifierData.readFields(fsIn);
+ RMDelegationTokenIdentifier identifier = identifierData.getTokenIdentifier();
+ long renewDate = identifierData.getRenewDate();
+ rmState.rmSecretManagerState.delegationTokenState.put(identifier, renewDate);
+ delegationTokenCounter++;
+ LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + " renewDate=" + renewDate);
+ } finally {
+ is.close();
+ }
+ }
+ } finally {
+ LOG.debug("Total Delegation Tokens loaded " + delegationTokenCounter);
+ lrs.close();
+
+ }
+ }
+
+ @Override
+ protected synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB)
+ throws Exception {
+ updateApplicationStateInternal(appId, appStateDataPB);
+ }
+
+ @Override
+ protected synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB)
+ throws Exception {
+ byte[] appStateData = appStateDataPB.getProto().toByteArray();
+ sqlop.doUpdateOrInsert(APPLICATION_TABLE, appId.toString(), appStateData, credential);
+ }
+
+ @Override
+ protected synchronized void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
+ ApplicationAttemptStateData attemptStateDataPB) throws Exception {
+ updateApplicationAttemptStateInternal(attemptId, attemptStateDataPB);
+ }
+
+ @Override
+ protected synchronized void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
+ ApplicationAttemptStateData attemptStateDataPB) throws Exception {
+ String key = attemptId.getApplicationId().toString() + SEPARATOR + attemptId;
+ byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+ sqlop.doUpdateOrInsert(ATTEMPT_TABLE, key, attemptStateData, credential);
+ }
+
+ @Override
+ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception {
+ LOG.info("STORING RMDELGATION TOKEN " + rmDTIdentifier);
+ String key = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber();
+ ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+ DataOutputStream seqOut = new DataOutputStream(seqOs);
+ RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
+ seqOut.writeInt(latestSequenceNumber);
+
+ Operation delegationArgMap = new Operation();
+ List