diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index e56653f..ec32da7 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -933,8 +933,22 @@ com.microsoft.azure azure-storage 2.0.0 - - + + + org.postgresql + postgresql + 9.4-1201-jdbc41 + + + com.h2database + h2 + 1.4.188 + + + com.tngtech.java + junit-dataprovider + 1.10.1 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 38c0eed..6fcafce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -569,6 +568,24 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION = "NONE"; + //////////////////////////////// + // SqlStore related Configurations + /////////////////////////////// + + public static final String DB_PRFIX = RM_PREFIX + "db."; + public static final String RM_DB_TYPE = DB_PRFIX + "type"; + public static final String RM_DB_HOST = DB_PRFIX + "hostname"; + public static final String RM_DB_DBNAME = DB_PRFIX + "dbname"; + public static final String RM_DB_USERNAME = DB_PRFIX + "username"; + public static final String RM_DB_PASSWORD = DB_PRFIX + "password"; + public static final String RM_DB_PASSWORD_FILE = DB_PRFIX + "password.file"; + public static final String RM_DB_RETRIES = DB_PRFIX + "retries"; + public static final int DEFAULT_RM_DB_RETRIES = 0; + public static final String RM_DB_RETRIES_INTERVAL = RM_DB_RETRIES + ".interval.seconds"; + public static final int DEFAULT_RM_DB_RETRIES_INTERVAL = 10; + public static final String RM_DB_VERIFICATION_TIMEOUT = DB_PRFIX + "verification.timeout.millis"; + public static final int DEFAULT_RM_DB_VERIFICATION_TIMEOUT = 10 * 1000; + /** * RM proxy users' prefix */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 5161ed4..57907e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -34,6 +34,20 @@ + org.postgresql + postgresql + + + com.h2database + h2 + test + + + com.tngtech.java + junit-dataprovider + test + + javax.servlet servlet-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java new file mode 100644 index 0000000..4fc5621 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java @@ -0,0 +1,556 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Credential; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.LocalResultSet; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Operation; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.PSQLOperations; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.SQLConstants; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.SQLOperations; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; + +import sun.util.logging.resources.logging; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Sql based RMStateStore + * + */ +public class SqlRMStateStore extends RMStateStore { + + @VisibleForTesting + protected String uuid = UUID.randomUUID().toString(); + protected static final String CREDENTIAL_TABLE = "credentialtable"; + protected static final String APPLICATION_TABLE = "applicationtable"; + protected static final String ATTEMPT_TABLE = "attempttable"; + protected static final String RMDELEGATAION_TABLE = "rmdelegationtable"; + protected static final String RMMASTERKEYS_TABLE = "rmmasterkeystable"; + + protected static final String DelegtaionTokenRoot = "DelegtaionTokenRoot"; + protected static final String RMDTSequentialNumber = "RMDTSequentialNumber"; + + protected static final String SEPARATOR = "/"; + protected static final String VERSION = "version"; + protected static final String EPOCH = "epoch"; + protected static final String MASTERCRED = "mastercred"; + protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0); + + + public Credential credential = null; + + private static final Logger LOG = Logger.getLogger(SqlRMStateStore.class); + + static { + LOG.info("LOADED CLASS SqlRMStateStore modified on 27 oct 2015"); + } + + protected int verificationTimeOut = 10 * 1000; + private Thread verifyActiveStatusThread; + + protected SQLOperations sqlop = null; + + // Retry policy + private int maxRetries = 0; + private int retryIntervalSeconds = 0; + + protected SQLOperations createSQLOperationsWithRetry() { + SQLOperations operations = new PSQLOperations(); + RetryPolicy basePolicy = + RetryPolicies.retryUpToMaximumCountWithFixedSleep(maxRetries, retryIntervalSeconds, TimeUnit.SECONDS); + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + exceptionToPolicyMap.put(SQLException.class, basePolicy); + RetryPolicy methodPolicy = + RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + methodNameToPolicyMap.put("doUpdateOrInsert", methodPolicy); + methodNameToPolicyMap.put("doDelete", methodPolicy); + methodNameToPolicyMap.put("fence", methodPolicy); + methodNameToPolicyMap.put("getFromTable", methodPolicy); + methodNameToPolicyMap.put("getFromTableRange", methodPolicy); + methodNameToPolicyMap.put("doDropTable", methodPolicy); + methodNameToPolicyMap.put("doFencingCheck", methodPolicy); + methodNameToPolicyMap.put("doMultipleOp", methodPolicy); + return (SQLOperations) RetryProxy.create(SQLOperations.class, operations, methodNameToPolicyMap); + } + + @Override + protected synchronized void initInternal(Configuration conf) throws SQLException { + maxRetries = conf.getInt(YarnConfiguration.RM_DB_RETRIES, YarnConfiguration.DEFAULT_RM_DB_RETRIES); + retryIntervalSeconds = + conf.getInt(YarnConfiguration.RM_DB_RETRIES_INTERVAL, YarnConfiguration.DEFAULT_RM_DB_RETRIES_INTERVAL); + verificationTimeOut = + conf.getInt(YarnConfiguration.RM_DB_VERIFICATION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DB_VERIFICATION_TIMEOUT); + + sqlop = createSQLOperationsWithRetry(); + sqlop.init(conf); + credential = new Credential(CREDENTIAL_TABLE, MASTERCRED, uuid); + LOG.debug("Max Retries = " + maxRetries + " Retry Inverval Seconds = " + retryIntervalSeconds + + " Verification Timeout " + verificationTimeOut); + LOG.debug("Credential " + credential); + } + + @Override + protected synchronized void startInternal() throws Exception { + // Create Various tables if non existing. + sqlop.doCreateTable(CREDENTIAL_TABLE); + sqlop.doCreateTable(APPLICATION_TABLE); + sqlop.doCreateTable(ATTEMPT_TABLE); + sqlop.doCreateTable(RMDELEGATAION_TABLE); + sqlop.doCreateTable(RMMASTERKEYS_TABLE); + sqlop.fence(credential); + if (HAUtil.isHAEnabled(getConfig())) { + // Start the checker thread if HA is enabled. + verifyActiveStatusThread = new VerifyActiveStatusThread(); + verifyActiveStatusThread.start(); + } + } + + @Override + protected synchronized void closeInternal() throws Exception { + if (verifyActiveStatusThread != null) { + verifyActiveStatusThread.interrupt(); + verifyActiveStatusThread.join(1000); + } + sqlop.disconnect(); + } + + @Override + protected synchronized Version loadVersion() throws Exception { + LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, VERSION); + try { + if (!lrs.set.next()) { + // No result + return null; + } + return new VersionPBImpl(VersionProto.parseFrom(lrs.set.getBytes(SQLConstants.VALUE_COLUMN))); + } finally { + lrs.close(); + } + } + + @Override + protected synchronized void storeVersion() throws Exception { + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, VERSION, data, credential); + } + + @Override + protected Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public synchronized long getAndIncrementEpoch() throws Exception { + long currentEpoch = 0; + LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, EPOCH); + try { + if (!lrs.set.next()) { + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto().toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential); + } else { + byte[] data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto().toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential); + } + return currentEpoch; + } finally { + lrs.close(); + } + } + + @Override + public synchronized RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); + return rmState; + } + + private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { + LocalResultSet lrs = sqlop.getFromTable(RMMASTERKEYS_TABLE, AMRMTOKEN_SECRET_MANAGER_ROOT); + try { + byte[] data = null; + if (lrs.set.next()) { + data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + } else { + LOG.warn("There is no data saved"); + return; + } + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl(AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance(stateData.getCurrentMasterKey(), + stateData.getNextMasterKey()); + } finally { + lrs.close(); + } + } + + private void loadRMAppState(RMState rmState) throws Exception { + // Get all applications. + LocalResultSet lrs = + sqlop.getFromTableRange(APPLICATION_TABLE, ApplicationId.appIdStrPrefix, ApplicationId.appIdStrPrefix + + "z"); + int appCounter = 0; + try { + while (lrs.set.next()) { + ApplicationId appId = ConverterUtils.toApplicationId(lrs.set.getString(SQLConstants.KEY_COLUMN)); + ApplicationStateDataPBImpl appStateData = + new ApplicationStateDataPBImpl(ApplicationStateDataProto.parseFrom(lrs.set + .getBytes(SQLConstants.VALUE_COLUMN))); + ApplicationState appState = + new ApplicationState(appStateData.getSubmitTime(), appStateData.getStartTime(), + appStateData.getApplicationSubmissionContext(), appStateData.getUser(), + appStateData.getState(), appStateData.getDiagnostics(), appStateData.getFinishTime()); + if (!appId.equals(appState.context.getApplicationId())) { + throw new YarnRuntimeException("The key is different from the application id"); + } + rmState.appState.put(appId, appState); + appCounter++; + // For each application fetch attempts + int attemptCounter = loadApplicationAttemptState(appState, appId); + LOG.debug("App loaded " + appId + " with attempts " + attemptCounter); + } + } finally { + LOG.debug("Total apps loaded " + appCounter); + lrs.close(); + } + } + + private int loadApplicationAttemptState(ApplicationState appState, ApplicationId appId) throws Exception { + LocalResultSet lrs = sqlop.getFromTableRange(ATTEMPT_TABLE, appId.toString(), appId.toString() + "z"); + int attemptCounter = 0; + try { + while (lrs.set.next()) { + byte[] attemptData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + ApplicationAttemptId attemptId = + ConverterUtils.toApplicationAttemptId(lrs.set.getString(SQLConstants.KEY_COLUMN).split( + SEPARATOR)[1]); + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl(ApplicationAttemptStateDataProto.parseFrom(attemptData)); + Credentials credentials = null; + if (attemptStateData.getAppAttemptTokens() != null) { + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus(), attemptStateData.getFinishTime(), + attemptStateData.getMemorySeconds(), attemptStateData.getVcoreSeconds()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); + attemptCounter++; + } + } finally { + lrs.close(); + } + return attemptCounter; + } + + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + loadRMDelegationKeyState(rmState); + loadRMSequentialNumberState(rmState); + loadRMDelegationTokenState(rmState); + + } + + private void loadRMDelegationKeyState(RMState rmState) throws Exception { + String keyIn = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX; + LocalResultSet lrs = sqlop.getFromTableRange(RMMASTERKEYS_TABLE, keyIn, keyIn + "z"); + int delegationKeyCounter = 0; + try { + while (lrs.set.next()) { + byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + try { + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + delegationKeyCounter++; + } finally { + is.close(); + } + } + } finally { + LOG.debug("Total RMDelegationKeys loaded " + delegationKeyCounter); + lrs.close(); + } + } + + private void loadRMSequentialNumberState(RMState rmState) throws Exception { + LocalResultSet lrs = sqlop.getFromTable(RMDELEGATAION_TABLE, RMDTSequentialNumber); + try { + if (lrs.set.next()) { + byte[] seqData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + if (seqData != null) { + ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); + DataInputStream seqIn = new DataInputStream(seqIs); + try { + rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); + } finally { + seqIn.close(); + } + } + } + } finally { + lrs.close(); + } + } + + private void loadRMDelegationTokenState(RMState rmState) throws Exception { + String keyIn = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX; + LocalResultSet lrs = sqlop.getFromTableRange(RMDELEGATAION_TABLE, keyIn, keyIn + "z"); + int delegationTokenCounter = 0; + try { + while (lrs.set.next()) { + byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + if (childData == null) { + LOG.warn("Content of " + lrs.set.getString(SQLConstants.KEY_COLUMN) + " is broken."); + continue; + } + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + try { + RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(); + identifierData.readFields(fsIn); + RMDelegationTokenIdentifier identifier = identifierData.getTokenIdentifier(); + long renewDate = identifierData.getRenewDate(); + rmState.rmSecretManagerState.delegationTokenState.put(identifier, renewDate); + delegationTokenCounter++; + LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + " renewDate=" + renewDate); + } finally { + is.close(); + } + } + } finally { + LOG.debug("Total Delegation Tokens loaded " + delegationTokenCounter); + lrs.close(); + + } + } + + @Override + protected synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) + throws Exception { + updateApplicationStateInternal(appId, appStateDataPB); + } + + @Override + protected synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) + throws Exception { + byte[] appStateData = appStateDataPB.getProto().toByteArray(); + sqlop.doUpdateOrInsert(APPLICATION_TABLE, appId.toString(), appStateData, credential); + } + + @Override + protected synchronized void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateDataPB) throws Exception { + updateApplicationAttemptStateInternal(attemptId, attemptStateDataPB); + } + + @Override + protected synchronized void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateDataPB) throws Exception { + String key = attemptId.getApplicationId().toString() + SEPARATOR + attemptId; + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); + sqlop.doUpdateOrInsert(ATTEMPT_TABLE, key, attemptStateData, credential); + } + + @Override + protected synchronized void storeRMDelegationTokenAndSequenceNumberState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception { + LOG.info("STORING RMDELGATION TOKEN " + rmDTIdentifier); + String key = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber(); + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + DataOutputStream seqOut = new DataOutputStream(seqOs); + RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate); + seqOut.writeInt(latestSequenceNumber); + + Operation delegationArgMap = new Operation(); + List delegationArguments = new ArrayList<>(); + delegationArguments.add(RMDELEGATAION_TABLE); + delegationArguments.add(key); + delegationArguments.add(identifierData.toByteArray()); + delegationArgMap.put(SQLConstants.UPDATE_KEY, delegationArguments); + delegationArgMap.put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, delegationArguments); + + Operation seqArgMap = new Operation(); + List seqArguments = new ArrayList<>(); + seqArguments.add(RMDELEGATAION_TABLE); + seqArguments.add(RMDTSequentialNumber); + seqArguments.add(seqOs.toByteArray()); + seqArgMap.put(SQLConstants.UPDATE_KEY, seqArguments); + seqArgMap.put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, seqArguments); + + List opList = new ArrayList<>(); + opList.add(delegationArgMap); + opList.add(seqArgMap); + + try { + sqlop.doMultipleOp(opList, credential); + } finally { + seqOs.close(); + } + } + + @Override + protected synchronized void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier) + throws Exception { + String key = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber(); + sqlop.doDelete(RMDELEGATAION_TABLE, key, credential); + } + + @Override + protected synchronized void updateRMDelegationTokenAndSequenceNumberInternal( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) throws Exception { + storeRMDelegationTokenAndSequenceNumber(rmDTIdentifier, renewDate, latestSequenceNumber); + } + + @Override + protected synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { + String key = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX + delegationKey.getKeyId(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + delegationKey.write(fsOut); + try { + sqlop.doUpdateOrInsert(RMMASTERKEYS_TABLE, key, os.toByteArray(), credential); + } finally { + os.close(); + } + } + + @Override + protected synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { + String key = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX + delegationKey.getKeyId(); + sqlop.doDelete(RMMASTERKEYS_TABLE, key, credential); + } + + @Override + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) { + AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + sqlop.doUpdateOrInsert(RMMASTERKEYS_TABLE, AMRMTOKEN_SECRET_MANAGER_ROOT, stateData, credential); + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + + /** + * Delete from apptable then delete from attempttable. + */ + @Override + protected synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { + String appId = appState.getAppId().toString(); + + Operation applicationDelete = new Operation(); + List deleteApplicationArgs = new ArrayList<>(); + deleteApplicationArgs.add(APPLICATION_TABLE); + deleteApplicationArgs.add(appId); + deleteApplicationArgs.add(appId + "z"); + applicationDelete.put(SQLConstants.DELETE_RANGE_KEY, deleteApplicationArgs); + + Operation attemptsDelete = new Operation(); + List deleteAttemptsArgs = new ArrayList<>(); + deleteAttemptsArgs.add(ATTEMPT_TABLE); + deleteAttemptsArgs.add(appId); + deleteAttemptsArgs.add(appId + "z"); + attemptsDelete.put(SQLConstants.DELETE_RANGE_KEY, deleteAttemptsArgs); + + List toDelete = new ArrayList<>(); + toDelete.add(applicationDelete); + toDelete.add(attemptsDelete); + + sqlop.doMultipleOp(toDelete, credential); + } + + @Override + public synchronized void deleteStore() throws Exception { + // Drop All the tables; + sqlop.doDropTable(CREDENTIAL_TABLE); + sqlop.doDropTable(APPLICATION_TABLE); + sqlop.doDropTable(ATTEMPT_TABLE); + sqlop.doDropTable(RMDELEGATAION_TABLE); + sqlop.doDropTable(RMMASTERKEYS_TABLE); + } + + private class VerifyActiveStatusThread extends Thread { + VerifyActiveStatusThread() { + super(VerifyActiveStatusThread.class.getName()); + } + + @Override + public void run() { + try { + while (true) { + LOG.debug("Verifying master check"); + sqlop.doFencingCheck(credential); + Thread.sleep(verificationTimeOut); + } + } catch (InterruptedException ie) { + LOG.info(VerifyActiveStatusThread.class.getName() + " thread " + "interrupted! Exiting!"); + } catch (Exception e) { + notifyStoreOperationFailed(new StoreFencedException()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java new file mode 100644 index 0000000..31a683a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +public class Credential { + private String credentialTable; + private String key; + private String secret; + + public Credential(String credentialTable, String key, String secret) { + this.credentialTable = credentialTable; + this.key = key; + this.secret = secret; + } + + public String getCredentialTable() { + return this.credentialTable; + } + + public String getKey() { + return this.key; + } + + public String getSecret() { + return this.secret; + } + + public String toString() { + return "Table=" + credentialTable + " Key=" + key + " Secret=" + secret; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java new file mode 100644 index 0000000..5ee257f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +import org.apache.log4j.Logger; + +public class LocalResultSet { + + public ResultSet set = null; + public PreparedStatement stmt = null; + + public LocalResultSet(ResultSet set, PreparedStatement stmt) { + this.set = set; + this.stmt = stmt; + } + + public void close() { + if (set != null) { + try { + set.close(); + } catch (Exception e) { + Logger.getLogger(getClass()).warn("Error closing LocalResultSet " + e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (Exception e) { + Logger.getLogger(getClass()).warn("Error closing LocalResultSet " + e); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java new file mode 100644 index 0000000..69af47e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +public class Operation { + Map> operationArgumentsMap; + + public Operation() { + operationArgumentsMap = new HashMap>(); + } + + public void put(String key, List argList) { + operationArgumentsMap.put(key, argList); + } + + public List get(String key) { + return operationArgumentsMap.get(key); + } + + public Set keySet() { + return operationArgumentsMap.keySet(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java new file mode 100644 index 0000000..c8a5cb7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java @@ -0,0 +1,423 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; +import org.apache.log4j.Logger; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class contains Postgresql based SQL operations for SqlRMStateStore. + * + * DB operations normally throw PSQLException, We are catching it and + * rethrowing SQLException so that Retry policy can work. + * + */ +public class PSQLOperations implements SQLOperations { + protected Connection connection = null; + private String dbName = null; + private String dbHostName = null; + private String userName = null; + private String password = null; + private static Logger LOG = Logger.getLogger(PSQLOperations.class); + + private Map methodsMap = new HashMap(); + + static { + LOG.info("LOADED CLASS PSQLOperations modified on 27 oct 2015"); + } + + @Override + public void init(Configuration conf) { + userName = conf.get(YarnConfiguration.RM_DB_USERNAME); + dbName = conf.get(YarnConfiguration.RM_DB_DBNAME); + dbHostName = + "jdbc:" + conf.get(YarnConfiguration.RM_DB_TYPE) + "://" + conf.get(YarnConfiguration.RM_DB_HOST) + "/" + + dbName; + String passFile = ""; + if ((passFile = conf.get(YarnConfiguration.RM_DB_PASSWORD_FILE, passFile)).isEmpty()) { + password = conf.get(YarnConfiguration.RM_DB_PASSWORD); + } else { + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(passFile)); + password = br.readLine(); + } catch (Exception e) { + LOG.warn("Exception occured while reading password file " + e); + // If Exception then read conf from the yarn-site + password = conf.get(YarnConfiguration.RM_DB_PASSWORD); + } finally { + if (br != null) { + try { + br.close(); + } catch (Exception e) { + + } + } + + } + } + // Cache methods for doMultipleOp() + try { + methodsMap.put(SQLConstants.UPDATE_KEY, + this.getClass().getMethod("prepareUpdate", String.class, String.class, byte[].class)); + methodsMap.put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, + this.getClass().getMethod("prepareInsertIfNotExists", String.class, String.class, byte[].class)); + methodsMap.put(SQLConstants.DELETE_RANGE_KEY, + this.getClass().getMethod("prepareDeleteRange", String.class, String.class, String.class)); + } catch (NoSuchMethodException | SecurityException e) { + throw new YarnRuntimeException("One or more methods initilisation failed " + e); + } + } + + @VisibleForTesting + protected synchronized void ensureConnection() throws SQLException { + if (connection != null && !connection.isClosed() && connection.isValid(0)) { + return; + } + connection = DriverManager.getConnection(dbHostName, userName, password); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + @VisibleForTesting + public synchronized void disconnect() throws SQLException { + if (connection != null) + connection.close(); + } + + @Override + public synchronized void doCreateTable(String table) throws SQLException { + try { + Connection conn = DriverManager.getConnection(dbHostName, userName, password); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(createTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + conn.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing connetion in doCreateTable " + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized void doDropTable(String table) throws SQLException { + try { + Connection conn = DriverManager.getConnection(dbHostName, userName, password); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(dropTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + conn.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing connetion in doDropTable " + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized LocalResultSet getFromTable(String table, String key) throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection.prepareStatement(select(table)); + getKey.setString(1, key); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + public synchronized LocalResultSet getFromTableForShare(String table, String key) throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection.prepareStatement(selectForShare(table)); + getKey.setString(1, key); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized LocalResultSet getFromTableRange(String table, String from, String to) throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection.prepareStatement(selectRange(table)); + getKey.setString(1, from); + getKey.setString(2, to); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + public synchronized PreparedStatement prepareCreateTable(String table) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(createTable(table)); + return preparedStatement; + } + + public synchronized PreparedStatement prepareDeleteRange(String table, String from, String to) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(deleteRange(table)); + preparedStatement.setString(1, from); + preparedStatement.setString(2, to); + return preparedStatement; + } + + public synchronized PreparedStatement prepareDelete(String table, String key) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(delete(table)); + preparedStatement.setString(1, key); + return preparedStatement; + + } + + public synchronized PreparedStatement prepareUpdate(String table, String key, byte[] value) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(update(table)); + preparedStatement.setString(2, key); + preparedStatement.setBytes(1, value); + return preparedStatement; + } + + public synchronized PreparedStatement prepareInsertIfNotExists(String table, String key, byte[] value) + throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(insertIfNotExists(table)); + preparedStatement.setString(1, key); + preparedStatement.setBytes(2, value); + preparedStatement.setString(3, key); + return preparedStatement; + } + + protected String selectRange(String table) { + return "SELECT key, value from " + table + " WHERE key >= ? and key <= ?"; + } + + protected String select(String table) { + return "SELECT key, value from " + table + " WHERE key = ?"; + } + + protected String selectForShare(String table) { + return "SELECT key, value from " + table + " WHERE key = ? FOR SHARE"; + } + + protected String insertIfNotExists(String table) { + return "INSERT INTO " + table + " (key, value) SELECT ?,? WHERE NOT EXISTS ( SELECT value FROM " + table + + " WHERE key = ?)"; + } + + protected String deleteRange(String table) { + return "DELETE FROM " + table + " WHERE key >= ? and key <= ?"; + } + + protected String delete(String table) { + return "DELETE FROM " + table + " WHERE key = ?"; + } + + protected String createTable(String table) { + return "CREATE TABLE IF NOT EXISTS " + table + " (key text PRIMARY KEY, value bytea)"; + } + + protected String update(String table) { + return "UPDATE " + table + " SET value = ? WHERE key = ?"; + } + + protected String dropTable(String table) { + return "DROP TABLE IF EXISTS " + table; + } + + /** + * 1. Begin transaction. + * 2. Update stuff + * 3. Check the master key. + * 4. If the master key is not same as the digest + * then abort + * else + * 5. Commit. + * @throws StoreFencedException + * @throws SQLException + */ + protected synchronized void doTransaction(List operations, Credential credential) + throws StoreFencedException, SQLException { + LocalResultSet lrs = null; + try { + ensureConnection(); + connection.setAutoCommit(false); + for (PreparedStatement st : operations) { + st.executeUpdate(); + } + lrs = getFromTableForShare(credential.getCredentialTable(), credential.getKey()); + if (!lrs.set.next()) { + LOG.warn("Credential set is null"); + throw new StoreFencedException(); + } + String cred = new String(lrs.set.getBytes("value")); + if (!cred.equals(credential.getSecret())) { + LOG.warn("Stored credentials are not equal to RM's credentials"); + throw new StoreFencedException(); + } + connection.commit(); + } catch (StoreFencedException | SQLException e) { + try { + connection.rollback(); + } catch (SQLException e1) { + LOG.error("Error occurred while Connnection rollback " + e1); + } + e.printStackTrace(); + throw e; + } finally { + // This should not throw error. If it does current transaction will be retried. + ensureConnection(); + connection.setAutoCommit(true); + if (lrs != null) + lrs.close(); + for (PreparedStatement st : operations) { + try { + st.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing prepared statements."); + } + } + } + } + + @Override + public synchronized void fence(Credential credential) throws SQLException { + // Put the credential in the master table. + try { + byte[] value = credential.getSecret().getBytes(); + PreparedStatement update = prepareUpdate(credential.getCredentialTable(), credential.getKey(), value); + PreparedStatement insertIfNotThere = + prepareInsertIfNotExists(credential.getCredentialTable(), credential.getKey(), value); + try { + ensureConnection(); + connection.setAutoCommit(false); + update.executeUpdate(); + insertIfNotThere.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + try { + connection.rollback(); + } catch (SQLException e1) { + LOG.warn("Exception occured while rollback in fence() " + e); + } + throw e; + } finally { + try { + connection.setAutoCommit(true); + } catch (SQLException e1) { + LOG.warn("Exception occured while setting auto commit fence() " + e1); + } + try { + update.close(); + insertIfNotThere.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing prepared statements in fence() " + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doUpdateOrInsert(String table, String key, byte[] value, Credential credential) throws SQLException, + StoreFencedException { + try { + List opList = new ArrayList(); + opList.add(prepareUpdate(table, key, value)); + opList.add(prepareInsertIfNotExists(table, key, value)); + doTransaction(opList, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doDelete(String table, String key, Credential credential) throws SQLException, StoreFencedException { + try { + List opList = new ArrayList(); + opList.add(prepareDelete(table, key)); + doTransaction(opList, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doFencingCheck(Credential credential) throws StoreFencedException, SQLException { + try { + doTransaction(new ArrayList(), credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @SuppressWarnings("rawtypes") + @Override + public void doMultipleOp(List opList, Credential credential) throws StoreFencedException, SQLException { + try { + List listOfOperations = new ArrayList<>(); + for (Operation m : opList) { + for (String key : m.keySet()) { + try { + Method method = methodsMap.get(key); + List params = m.get(key); + // For now all the methods have 3 parameters. + listOfOperations.add((PreparedStatement) method.invoke(this, params.get(0), params.get(1), + params.get(2))); + } catch (SecurityException | IllegalAccessException e) { + throw new YarnRuntimeException("Method invokation failed " + e); + } catch (InvocationTargetException e) { + // Handle InvocationTargetExeception separately. + if (e.getCause().getClass().isAssignableFrom(SQLException.class)) { + throw new SQLException(e.getCause()); + } + } + } + } + doTransaction(listOfOperations, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java new file mode 100644 index 0000000..27a4ef8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java @@ -0,0 +1,14 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +public class SQLConstants { + + public static final String DELETE_RANGE_KEY = "deleteRange"; + public static final String UPDATE_KEY = "update"; + public static final String INSERT_IF_NOT_EXIST_KEY = "insertIfNotExists"; + public static final String KEY_COLUMN = "key"; + public static final String VALUE_COLUMN = "value"; + + private SQLConstants() { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java new file mode 100644 index 0000000..70ad921 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; + +public interface SQLOperations { + + void init(Configuration conf); + + void doCreateTable(String applicationTable) throws SQLException; + + void fence(Credential credential) throws SQLException; + + void disconnect() throws SQLException; + + LocalResultSet getFromTable(String table , String key) throws SQLException; + + void doUpdateOrInsert(String table, String key, byte[] value, Credential credential) throws SQLException, StoreFencedException; + + LocalResultSet getFromTableRange(String table , String from , String key) throws SQLException; + + void doMultipleOp(List opList, Credential credential) throws SQLException, StoreFencedException; + + void doDelete(String table, String key, Credential credential) throws SQLException, StoreFencedException; + + void doDropTable(String table) throws SQLException; + + void doFencingCheck(Credential credential) throws SQLException, StoreFencedException ; + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java new file mode 100644 index 0000000..484b29c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java @@ -0,0 +1,302 @@ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Credential; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.LocalResultSet; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.Operation; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.PSQLOperations; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.SQLOperations; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.h2.tools.DeleteDbFiles; +import org.junit.Test; + +public class TestSqlStateStore extends RMStateStoreTestBase { + + public static final String H2_LOCATION = "h2db"; + + static class TestSqlStateInternal extends SqlRMStateStore { + + @Override + protected SQLOperations createSQLOperationsWithRetry() { + return new TestSQlOperations(); + } + + public TestSqlStateInternal(Configuration conf, String uuid) { + this.uuid = uuid; + init(conf); + start(); + } + + public TestSqlStateInternal() {} + + } + static class TestSQlOperations extends PSQLOperations { + @Override + protected void ensureConnection() throws SQLException { + if (connection != null && !connection.isClosed() && connection.isValid(0)) { + return; + } + connection = DriverManager.getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + @Override + public void doCreateTable(String table) throws SQLException { + Connection conn = DriverManager.getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(createTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + } catch (Exception e) { + + } + } + } + + @Override + public void doDropTable(String table) throws SQLException { + Connection conn = DriverManager.getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(dropTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + } catch (Exception e) { + + } + } + } + + @Override + protected String createTable(String table) { + return "CREATE TABLE IF NOT EXISTS " + table + " (key varchar(1024) PRIMARY KEY, value blob)"; + } + + @Override + public void doUpdateOrInsert(String table, String key, byte[] value, Credential credential) + throws SQLException, StoreFencedException { + List opList = new ArrayList(); + String sql = "MERGE INTO " + table + " (key, value) KEY (key) VALUES (?,?)"; + ensureConnection(); + PreparedStatement stmt = connection.prepareStatement(sql); + stmt.setString(1, key); + stmt.setObject(2, value); + opList.add(stmt); + doTransaction(opList, credential); + } + + @Override + public PreparedStatement prepareInsertIfNotExists(String table, String key, byte[] value) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(insertIfNotExists(table)); + preparedStatement.setString(1, key); + preparedStatement.setBytes(2, value); + return preparedStatement; + } + + @Override + protected String selectForShare(String table) { + // Overriding it and changing it to select for update because select for share is not available in H2. + return selectForUpdate(table); + } + + protected String selectForUpdate(String table) { + return "SELECT key, value from " + table + " WHERE key = ? FOR UPDATE"; + } + + @Override + protected String insertIfNotExists(String table) { + return "MERGE INTO " + table + " (key, value) KEY (key) VALUES (?,?)"; + } + + @SuppressWarnings("rawtypes") + @Override + public void doMultipleOp(List opList, Credential credential) throws StoreFencedException, + SQLException { + try { + List listOfOperations = new ArrayList<>(); + for (Operation m : opList) { + for (String key : m.keySet()) { + try { + switch (key.toString()) { + case "update": + Method method = + this.getClass().getMethod("prepareUpdate", String.class, String.class, + byte[].class); + List params = m.get(key); + listOfOperations.add((PreparedStatement) method.invoke(this, params.get(0), + params.get(1), params.get(2))); + break; + case "insertIfNotExists": + method = + this.getClass().getMethod("prepareInsertIfNotExists", String.class, + String.class, byte[].class); + params = m.get(key); + listOfOperations.add((PreparedStatement) method.invoke(this, params.get(0), + params.get(1), params.get(2))); + break; + case "deleteRange": + method = + this.getClass().getMethod("prepareDeleteRange", String.class, String.class, + String.class); + params = m.get(key); + listOfOperations.add((PreparedStatement) method.invoke(this, params.get(0), + params.get(1), params.get(2))); + break; + default: + break; + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new YarnRuntimeException("Method invokation failed " + e); + } + } + } + doTransaction(listOfOperations, credential); + } catch (SQLException e) { + throw new SQLException(e); + } + } + } + + @Test + public void testSqlStateStore() throws Exception { + try { + TestSqlStateStoreTests tester = new TestSqlStateStoreTests(); + testRMAppStateStore(tester); + testRMDTSecretManagerStateStore(tester); + testCheckVersion(tester); + testEpoch(tester); + testAppDeletion(tester); + testAMRMTokenSecretManagerStateStore(tester); + testDeleteStore(tester); + tester.store.sqlop.disconnect(); + } finally { + DeleteDbFiles.execute("./target", H2_LOCATION, true); + } + } + + private Configuration createHARMConf(String rmIds, String rmId, int adminPort) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, TestSqlStateInternal.class.getName()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + + for (String rpcAddress : YarnConfiguration.getServiceAddressConfKeys(conf)) { + for (String id : HAUtil.getRMHAIds(conf)) { + conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0"); + } + } + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), "localhost:" + adminPort); + return conf; + } + + @Test + public void testSqlStateStoreFencing() throws Exception { + try { + StateChangeRequestInfo req = new StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_USER); + Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234); + conf1.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + ResourceManager rm1 = new ResourceManager(); + rm1.init(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with SQL Store didn't start", Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, rm1.getRMContext() + .getRMAdminService().getServiceStatus().getState()); + Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678); + conf2.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + ResourceManager rm2 = new ResourceManager(); + rm2.init(conf2); + rm2.start(); + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with SQL Store didn't start", Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext() + .getRMAdminService().getServiceStatus().getState()); + Thread.sleep(YarnConfiguration.DEFAULT_RM_DB_VERIFICATION_TIMEOUT); + assertEquals("RM should have been fenced", HAServiceProtocol.HAServiceState.STANDBY, rm1.getRMContext() + .getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext() + .getRMAdminService().getServiceStatus().getState()); + } finally { + DeleteDbFiles.execute("./target", H2_LOCATION, true); + } + } + + static class TestSqlStateStoreTests implements RMStateStoreHelper { + TestSqlStateInternal store; + String uuid = UUID.randomUUID().toString(); + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + this.store = new TestSqlStateInternal(conf, uuid); + return this.store; + } + + @Override + public boolean isFinalStateValid() throws Exception { + return true; + } + + @Override + public void writeVersion(Version version) throws Exception { + byte[] data = ((VersionPBImpl) version).getProto().toByteArray(); + store.sqlop.doUpdateOrInsert(TestSqlStateInternal.CREDENTIAL_TABLE, TestSqlStateInternal.VERSION, data, + store.credential); + } + + @Override + public Version getCurrentVersion() throws Exception { + return store.getCurrentVersion(); + } + + @Override + public boolean appExists(RMApp app) throws Exception { + LocalResultSet lrs = null; + try { + lrs = + store.sqlop.getFromTable(TestSqlStateInternal.APPLICATION_TABLE, app.getApplicationId() + .toString()); + return lrs.set.next(); + } catch (Exception e) { + return false; + } finally { + try { + lrs.close(); + } catch (Exception e) { + + } + } + } + } +}