From 23531f6cea59d7c4fc6548c1e3f4af02c8782a8c Mon Sep 17 00:00:00 2001 From: "lavkesh.lahngir" Date: Thu, 26 May 2016 15:07:01 +0530 Subject: [PATCH] SQL Based RM state store --- hadoop-project/pom.xml | 16 +- .../apache/hadoop/yarn/conf/YarnConfiguration.java | 18 + .../hadoop-yarn-server-resourcemanager/pom.xml | 15 + .../resourcemanager/recovery/SqlRMStateStore.java | 638 +++++++++++++++++++++ .../resourcemanager/recovery/sql/Credential.java | 47 ++ .../recovery/sql/LocalResultSet.java | 52 ++ .../resourcemanager/recovery/sql/Operation.java | 45 ++ .../recovery/sql/PSQLOperations.java | 471 +++++++++++++++ .../resourcemanager/recovery/sql/SQLConstants.java | 32 ++ .../recovery/sql/SQLOperations.java | 56 ++ .../recovery/TestSqlStateStore.java | 357 ++++++++++++ 11 files changed, 1746 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index bee2e58..88a1413 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -1016,7 +1016,21 @@ jsonassert 1.3.0 - + + org.postgresql + postgresql + 9.4-1201-jdbc41 + + + com.h2database + h2 + 1.4.188 + + + com.tngtech.java + junit-dataprovider + 1.10.1 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 17fbbc3..f84ce35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -562,6 +562,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL = ZK_STATE_STORE_PREFIX + "root-node.acl"; + ////////////////////////////// + // SqlStore related Configurations + /////////////////////////////// + + public static final String DB_PRFIX = RM_PREFIX + "db."; + public static final String RM_DB_TYPE = DB_PRFIX + "type"; + public static final String RM_DB_HOST = DB_PRFIX + "hostname"; + public static final String RM_DB_DBNAME = DB_PRFIX + "dbname"; + public static final String RM_DB_USERNAME = DB_PRFIX + "username"; + public static final String RM_DB_PASSWORD = DB_PRFIX + "password"; + public static final String RM_DB_PASSWORD_FILE = DB_PRFIX + "password.file"; + public static final String RM_DB_RETRIES = DB_PRFIX + "retries"; + public static final int DEFAULT_RM_DB_RETRIES = 0; + public static final String RM_DB_RETRIES_INTERVAL = RM_DB_RETRIES + ".interval.seconds"; + public static final int DEFAULT_RM_DB_RETRIES_INTERVAL = 10; + public static final String RM_DB_VERIFICATION_TIMEOUT = DB_PRFIX + "verification.timeout.millis"; + public static final int DEFAULT_RM_DB_VERIFICATION_TIMEOUT = 10 * 1000; + /** HA related configs */ public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index c50c269..20c1cd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -33,6 +33,21 @@ + + org.postgresql + postgresql + + + com.h2database + h2 + test + + + com.tngtech.java + junit-dataprovider + test + + javax.servlet servlet-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java new file mode 100644 index 0000000..ea7254b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/SqlRMStateStore.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.*; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.*; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class SqlRMStateStore extends RMStateStore { + protected static final String CREDENTIAL_TABLE = "credentialtable"; + protected static final String APPLICATION_TABLE = "applicationtable"; + protected static final String ATTEMPT_TABLE = "attempttable"; + protected static final String RMDELEGATAION_TABLE = "rmdelegationtable"; + protected static final String RMMASTERKEYS_TABLE = "rmmasterkeystable"; + protected static final String RESERVATION_PLANS_TABLE = "reservationtable"; + protected static final String DelegtaionTokenRoot = "DelegtaionTokenRoot"; + protected static final String RMDTSequentialNumber = "RMDTSequentialNumber"; + protected static final String SEPARATOR = "/"; + protected static final String VERSION = "version"; + protected static final String EPOCH = "epoch"; + protected static final String MASTERCRED = "mastercred"; + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 3); + private static final Logger LOG = Logger.getLogger(SqlRMStateStore.class); + protected Credential credential = null; + protected String uuid = UUID.randomUUID().toString(); + protected int verificationTimeOut = 10 * 1000; + protected SQLOperations sqlop = null; + private Thread verifyActiveStatusThread; + // Retry policy + private int maxRetries = 0; + private int retryIntervalSeconds = 0; + + protected SQLOperations createSQLOperationsWithRetry() { + SQLOperations operations = new PSQLOperations(); + RetryPolicy basePolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetries, retryIntervalSeconds, + TimeUnit.SECONDS); + Map, RetryPolicy> exceptionToPolicyMap = new HashMap<>(); + exceptionToPolicyMap.put(SQLException.class, basePolicy); + RetryPolicy methodPolicy = RetryPolicies + .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); + Map methodNameToPolicyMap = new HashMap(); + methodNameToPolicyMap.put("doUpdateOrInsert", methodPolicy); + methodNameToPolicyMap.put("doDelete", methodPolicy); + methodNameToPolicyMap.put("fence", methodPolicy); + methodNameToPolicyMap.put("getFromTable", methodPolicy); + methodNameToPolicyMap.put("getFromTableRange", methodPolicy); + methodNameToPolicyMap.put("doDropTable", methodPolicy); + methodNameToPolicyMap.put("doFencingCheck", methodPolicy); + methodNameToPolicyMap.put("doMultipleOp", methodPolicy); + return (SQLOperations) RetryProxy + .create(SQLOperations.class, operations, methodNameToPolicyMap); + } + + @Override + protected void initInternal(Configuration conf) throws Exception { + maxRetries = conf.getInt(YarnConfiguration.RM_DB_RETRIES, + YarnConfiguration.DEFAULT_RM_DB_RETRIES); + retryIntervalSeconds = conf.getInt(YarnConfiguration.RM_DB_RETRIES_INTERVAL, + YarnConfiguration.DEFAULT_RM_DB_RETRIES_INTERVAL); + verificationTimeOut = conf + .getInt(YarnConfiguration.RM_DB_VERIFICATION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DB_VERIFICATION_TIMEOUT); + + sqlop = createSQLOperationsWithRetry(); + sqlop.init(conf); + credential = new Credential(CREDENTIAL_TABLE, MASTERCRED, uuid); + LOG.debug("Max Retries = " + maxRetries + " Retry Inverval Seconds = " + + retryIntervalSeconds + " Verification Timeout " + + verificationTimeOut); + LOG.debug("Credential " + credential); + } + + @Override + protected void startInternal() throws Exception { + // Create Various tables if non existing. + sqlop.doCreateTable(CREDENTIAL_TABLE); + sqlop.doCreateTable(APPLICATION_TABLE); + sqlop.doCreateTable(ATTEMPT_TABLE); + sqlop.doCreateTable(RMDELEGATAION_TABLE); + sqlop.doCreateTable(RMMASTERKEYS_TABLE); + sqlop.doCreateTable(RESERVATION_PLANS_TABLE); + sqlop.fence(credential); + if (HAUtil.isHAEnabled(getConfig())) { + // Start the checker thread if HA is enabled. + verifyActiveStatusThread = new VerifyActiveStatusThread(); + verifyActiveStatusThread.start(); + } + } + + @Override + protected void closeInternal() throws Exception { + if (verifyActiveStatusThread != null) { + verifyActiveStatusThread.interrupt(); + verifyActiveStatusThread.join(1000); + } + sqlop.disconnect(); + } + + @Override + protected Version loadVersion() throws Exception { + LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, VERSION); + try { + if (!lrs.set.next()) { + // No result + return null; + } + return new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(lrs.set.getBytes(SQLConstants.VALUE_COLUMN))); + } finally { + lrs.close(); + } + } + + @Override + protected void storeVersion() throws Exception { + byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto() + .toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, VERSION, data, credential); + } + + @Override + protected Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + @Override + public long getAndIncrementEpoch() throws Exception { + long currentEpoch = 0; + LocalResultSet lrs = sqlop.getFromTable(CREDENTIAL_TABLE, EPOCH); + try { + if (!lrs.set.next()) { + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential); + } else { + byte[] data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + Epoch epoch = new EpochPBImpl( + YarnServerResourceManagerRecoveryProtos.EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + sqlop.doUpdateOrInsert(CREDENTIAL_TABLE, EPOCH, storeData, credential); + } + return currentEpoch; + } finally { + lrs.close(); + } + } + + @Override + public RMState loadState() throws Exception { + RMState rmState = new RMState(); + // recover DelegationTokenSecretManager + loadRMDTSecretManagerState(rmState); + // recover RM applications + loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); + // recover reservation state + loadReservationSystemState(rmState); + return rmState; + } + + private void loadReservationSystemState(RMState rmState) throws Exception { + LocalResultSet lrs = sqlop.getFromTableRange(RESERVATION_PLANS_TABLE, + RESERVATION_SYSTEM_ROOT + SEPARATOR, RESERVATION_SYSTEM_ROOT + "z"); + int reservationCount = 0; + try { + while (lrs.set.next()) { + // reservation is stored as + // RESERVATION_SYSTEM_ROOT + SEPARATOR + plan_name + SEPARATOR + reservation_id; + String key = lrs.set.getString(SQLConstants.KEY_COLUMN); + String planReservationString = key + .substring((RESERVATION_SYSTEM_ROOT + SEPARATOR).length()); + String[] parts = planReservationString.split(SEPARATOR); + if (parts.length != 2) { + LOG.warn("Incorrect reservation state key " + key); + continue; + } + String planName = parts[0]; + String reservationName = parts[1]; + ReservationAllocationStateProto allocationState = ReservationAllocationStateProto + .parseFrom(lrs.set.getBytes(SQLConstants.VALUE_COLUMN)); + if (!rmState.getReservationState().containsKey(planName)) { + rmState.getReservationState().put(planName, new HashMap<>()); + } + ReservationId reservationId = ReservationId + .parseReservationId(reservationName); + rmState.getReservationState().get(planName) + .put(reservationId, allocationState); + reservationCount++; + } + } finally { + LOG.debug("Total reservation count " + reservationCount); + lrs.close(); + } + } + + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { + loadRMDelegationKeyState(rmState); + loadRMSequentialNumberState(rmState); + loadRMDelegationTokenState(rmState); + + } + + private void loadRMDelegationKeyState(RMState rmState) throws Exception { + String keyIn = + RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX; + LocalResultSet lrs = sqlop + .getFromTableRange(RMMASTERKEYS_TABLE, keyIn, keyIn + "z"); + int delegationKeyCounter = 0; + try { + while (lrs.set.next()) { + byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + try { + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + rmState.rmSecretManagerState.masterKeyState.add(key); + delegationKeyCounter++; + } finally { + is.close(); + } + } + } finally { + LOG.debug("Total RMDelegationKeys loaded " + delegationKeyCounter); + lrs.close(); + } + } + + private void loadRMSequentialNumberState(RMState rmState) throws Exception { + LocalResultSet lrs = sqlop + .getFromTable(RMDELEGATAION_TABLE, RMDTSequentialNumber); + try { + if (lrs.set.next()) { + byte[] seqData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + if (seqData != null) { + ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); + DataInputStream seqIn = new DataInputStream(seqIs); + try { + rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); + } finally { + seqIn.close(); + } + } + } + } finally { + lrs.close(); + } + } + + private void loadRMDelegationTokenState(RMState rmState) throws Exception { + String keyIn = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX; + LocalResultSet lrs = sqlop + .getFromTableRange(RMDELEGATAION_TABLE, keyIn, keyIn + "z"); + int delegationTokenCounter = 0; + try { + while (lrs.set.next()) { + byte[] childData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + if (childData == null) { + LOG.warn("Content of " + lrs.set.getString(SQLConstants.KEY_COLUMN) + + " is broken."); + continue; + } + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + try { + RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData(); + identifierData.readFields(fsIn); + RMDelegationTokenIdentifier identifier = identifierData + .getTokenIdentifier(); + long renewDate = identifierData.getRenewDate(); + rmState.rmSecretManagerState.delegationTokenState + .put(identifier, renewDate); + delegationTokenCounter++; + LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier + + " renewDate=" + renewDate); + } finally { + is.close(); + } + } + } finally { + LOG.debug("Total Delegation Tokens loaded " + delegationTokenCounter); + lrs.close(); + + } + } + + private void loadRMAppState(RMState rmState) throws Exception { + // Get all applications. + LocalResultSet lrs = sqlop + .getFromTableRange(APPLICATION_TABLE, ApplicationId.appIdStrPrefix, + ApplicationId.appIdStrPrefix + "z"); + int appCounter = 0; + try { + while (lrs.set.next()) { + ApplicationId appId = ConverterUtils + .toApplicationId(lrs.set.getString(SQLConstants.KEY_COLUMN)); + ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl( + YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto + .parseFrom(lrs.set.getBytes(SQLConstants.VALUE_COLUMN))); + if (!appId.equals(appStateData.getApplicationSubmissionContext() + .getApplicationId())) { + throw new YarnRuntimeException( + "The key is different from the application id"); + } + rmState.appState.put(appId, appStateData); + appCounter++; + // For each application fetch attempts + int attemptCounter = loadApplicationAttemptState(appStateData, appId); + LOG.debug("App loaded " + appId + " with attempts " + attemptCounter); + } + } finally { + LOG.debug("Total apps loaded " + appCounter); + lrs.close(); + } + } + + private int loadApplicationAttemptState(ApplicationStateData appState, + ApplicationId appId) throws Exception { + LocalResultSet lrs = sqlop + .getFromTableRange(ATTEMPT_TABLE, appId.toString(), + appId.toString() + "z"); + int attemptCounter = 0; + try { + while (lrs.set.next()) { + byte[] attemptData = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + ApplicationAttemptStateDataPBImpl attemptState = new ApplicationAttemptStateDataPBImpl( + YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto + .parseFrom(attemptData)); + appState.attempts.put(attemptState.getAttemptId(), attemptState); + attemptCounter++; + } + } finally { + lrs.close(); + } + return attemptCounter; + } + + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws Exception { + LocalResultSet lrs = sqlop + .getFromTable(RMMASTERKEYS_TABLE, AMRMTOKEN_SECRET_MANAGER_ROOT); + try { + byte[] data; + if (lrs.set.next()) { + data = lrs.set.getBytes(SQLConstants.VALUE_COLUMN); + } else { + LOG.warn("There is no data saved"); + return; + } + AMRMTokenSecretManagerStatePBImpl stateData = new AMRMTokenSecretManagerStatePBImpl( + YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto + .parseFrom(data)); + rmState.amrmTokenSecretManagerState = AMRMTokenSecretManagerState + .newInstance(stateData.getCurrentMasterKey(), + stateData.getNextMasterKey()); + } finally { + lrs.close(); + } + } + + @Override + protected void storeApplicationStateInternal(ApplicationId appId, + ApplicationStateData appStateData) throws Exception { + updateApplicationStateInternal(appId, appStateData); + } + + @Override + protected void updateApplicationStateInternal(ApplicationId appId, + ApplicationStateData appStateDataPB) throws Exception { + byte[] appStateData = appStateDataPB.getProto().toByteArray(); + sqlop.doUpdateOrInsert(APPLICATION_TABLE, appId.toString(), appStateData, + credential); + } + + @Override + protected void storeApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateDataPB) throws Exception { + updateApplicationAttemptStateInternal(attemptId, attemptStateDataPB); + } + + @Override + protected void updateApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, + ApplicationAttemptStateData attemptStateDataPB) throws Exception { + String key = + attemptId.getApplicationId().toString() + SEPARATOR + attemptId; + byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); + sqlop.doUpdateOrInsert(ATTEMPT_TABLE, key, attemptStateData, credential); + } + + @Override + protected void storeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, false); + } + + @Override + protected void removeRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { + String key = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber(); + sqlop.doDelete(RMDELEGATAION_TABLE, key, credential); + } + + @Override + protected void updateRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) + throws Exception { + storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true); + } + + private void storeOrUpdateRMDelegationTokenState( + RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, + boolean isUpdate) throws Exception { + String key = DelegtaionTokenRoot + SEPARATOR + DELEGATION_TOKEN_PREFIX + + rmDTIdentifier.getSequenceNumber(); + RMDelegationTokenIdentifierData identifierData = new RMDelegationTokenIdentifierData( + rmDTIdentifier, renewDate); + Operation delegationArgMap = new Operation(); + List delegationArguments = new ArrayList<>(); + delegationArguments.add(RMDELEGATAION_TABLE); + delegationArguments.add(key); + delegationArguments.add(identifierData.toByteArray()); + delegationArgMap.put(SQLConstants.UPDATE_KEY, delegationArguments); + delegationArgMap + .put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, delegationArguments); + List opList = new ArrayList<>(); + opList.add(delegationArgMap); + if (LOG.isDebugEnabled()) { + LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" + + rmDTIdentifier.getSequenceNumber()); + } + if (!isUpdate) { + // Add sequence number only when adding. + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + try (DataOutputStream seqOut = new DataOutputStream(seqOs)) { + seqOut.writeInt(rmDTIdentifier.getSequenceNumber()); + } + Operation seqArgMap = new Operation(); + List seqArguments = new ArrayList<>(); + seqArguments.add(RMDELEGATAION_TABLE); + seqArguments.add(RMDTSequentialNumber); + seqArguments.add(seqOs.toByteArray()); + seqArgMap.put(SQLConstants.UPDATE_KEY, seqArguments); + seqArgMap.put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, seqArguments); + opList.add(seqArgMap); + } + sqlop.doMultipleOp(opList, credential); + } + + @Override + protected void storeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception { + String key = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX + + delegationKey.getKeyId(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os); + delegationKey.write(fsOut); + try { + sqlop.doUpdateOrInsert(RMMASTERKEYS_TABLE, key, os.toByteArray(), + credential); + } finally { + os.close(); + } + + } + + @Override + protected void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + String key = RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR + + reservationIdName; + byte[] value = reservationAllocation.toByteArray(); + sqlop.doUpdateOrInsert(RESERVATION_PLANS_TABLE, key, value, credential); + } + + @Override + protected void removeReservationState(String planName, + String reservationIdName) throws Exception { + String key = RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR + + reservationIdName; + sqlop.doDelete(RESERVATION_PLANS_TABLE, key, credential); + } + + @Override + protected void removeRMDTMasterKeyState(DelegationKey delegationKey) + throws Exception { + String key = RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX + + delegationKey.getKeyId(); + sqlop.doDelete(RMMASTERKEYS_TABLE, key, credential); + } + + @Override + protected void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) + throws Exception { + AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState + .newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + sqlop.doUpdateOrInsert(RMMASTERKEYS_TABLE, AMRMTOKEN_SECRET_MANAGER_ROOT, + stateData, credential); + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + + @Override + protected void removeApplicationStateInternal(ApplicationStateData appState) + throws Exception { + + String appId = appState.getApplicationSubmissionContext().getApplicationId() + .toString(); + + Operation applicationDelete = new Operation(); + List deleteApplicationArgs = new ArrayList<>(); + deleteApplicationArgs.add(APPLICATION_TABLE); + deleteApplicationArgs.add(appId); + deleteApplicationArgs.add(appId + "z"); + applicationDelete.put(SQLConstants.DELETE_RANGE_KEY, deleteApplicationArgs); + + Operation attemptsDelete = new Operation(); + List deleteAttemptsArgs = new ArrayList<>(); + deleteAttemptsArgs.add(ATTEMPT_TABLE); + deleteAttemptsArgs.add(appId); + deleteAttemptsArgs.add(appId + "z"); + attemptsDelete.put(SQLConstants.DELETE_RANGE_KEY, deleteAttemptsArgs); + + List toDelete = new ArrayList<>(); + toDelete.add(applicationDelete); + toDelete.add(attemptsDelete); + + sqlop.doMultipleOp(toDelete, credential); + } + + @Override + protected void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception { + String key = + attemptId.getApplicationId().toString() + SEPARATOR + attemptId; + sqlop.doDelete(ATTEMPT_TABLE, key, credential); + } + + @Override + public void deleteStore() throws Exception { + // Drop All the tables; + sqlop.doDropTable(CREDENTIAL_TABLE); + sqlop.doDropTable(APPLICATION_TABLE); + sqlop.doDropTable(ATTEMPT_TABLE); + sqlop.doDropTable(RMDELEGATAION_TABLE); + sqlop.doDropTable(RMMASTERKEYS_TABLE); + sqlop.doDropTable(RESERVATION_PLANS_TABLE); + } + + @Override + public void removeApplication(ApplicationId removeAppId) throws Exception { + sqlop.doDelete(APPLICATION_TABLE, removeAppId.toString(), credential); + } + + private class VerifyActiveStatusThread extends Thread { + VerifyActiveStatusThread() { + super(VerifyActiveStatusThread.class.getName()); + } + + @Override + public void run() { + try { + while (true) { + LOG.debug("Verifying master check"); + sqlop.doFencingCheck(credential); + Thread.sleep(verificationTimeOut); + } + } catch (InterruptedException ie) { + LOG.info(VerifyActiveStatusThread.class.getName() + " thread " + + "interrupted! Exiting!"); + } catch (Exception e) { + notifyStoreOperationFailed(new StoreFencedException()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java new file mode 100644 index 0000000..678d61a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Credential.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +public class Credential { + private String credentialTable; + private String key; + private String secret; + + public Credential(String credentialTable, String key, String secret) { + this.credentialTable = credentialTable; + this.key = key; + this.secret = secret; + } + + public String getCredentialTable() { + return this.credentialTable; + } + + public String getKey() { + return this.key; + } + + public String getSecret() { + return this.secret; + } + + public String toString() { + return "Table=" + credentialTable + " Key=" + key + " Secret=" + secret; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java new file mode 100644 index 0000000..9bff44b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/LocalResultSet.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import org.apache.log4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +public class LocalResultSet { + + public ResultSet set = null; + public PreparedStatement stmt = null; + + public LocalResultSet(ResultSet set, PreparedStatement stmt) { + this.set = set; + this.stmt = stmt; + } + + public void close() { + if (set != null) { + try { + set.close(); + } catch (Exception e) { + Logger.getLogger(getClass()).warn("Error closing LocalResultSet " + e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (Exception e) { + Logger.getLogger(getClass()).warn("Error closing LocalResultSet " + e); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java new file mode 100644 index 0000000..add8fae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/Operation.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Operation { + Map> operationArgumentsMap; + + public Operation() { + operationArgumentsMap = new HashMap>(); + } + + public void put(String key, List argList) { + operationArgumentsMap.put(key, argList); + } + + public List get(String key) { + return operationArgumentsMap.get(key); + } + + public Set keySet() { + return operationArgumentsMap.keySet(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java new file mode 100644 index 0000000..e3fc058 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/PSQLOperations.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; +import org.apache.log4j.Logger; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class contains Postgresql based SQL operations for SqlRMStateStore. + * + * DB operations normally throw PSQLException, We are catching it and + * rethrowing SQLException so that Retry policy can work. + * + */ +public class PSQLOperations implements SQLOperations { + private static Logger LOG = Logger.getLogger(PSQLOperations.class); + + static { + LOG.info("LOADED CLASS PSQLOperations modified on 27 oct 2015"); + } + + protected Connection connection = null; + private String dbName = null; + private String dbHostName = null; + private String userName = null; + private String password = null; + private Map methodsMap = new HashMap(); + + @Override + public void init(Configuration conf) { + userName = conf.get(YarnConfiguration.RM_DB_USERNAME); + dbName = conf.get(YarnConfiguration.RM_DB_DBNAME); + dbHostName = "jdbc:" + conf.get(YarnConfiguration.RM_DB_TYPE) + "://" + conf + .get(YarnConfiguration.RM_DB_HOST) + "/" + dbName; + String passFile = ""; + if ((passFile = conf.get(YarnConfiguration.RM_DB_PASSWORD_FILE, passFile)) + .isEmpty()) { + password = conf.get(YarnConfiguration.RM_DB_PASSWORD); + } else { + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(passFile)); + password = br.readLine(); + } catch (Exception e) { + LOG.warn("Exception occured while reading password file " + e); + // If Exception then read conf from the yarn-site + password = conf.get(YarnConfiguration.RM_DB_PASSWORD); + } finally { + if (br != null) { + try { + br.close(); + } catch (Exception e) { + + } + } + + } + } + // Cache methods for doMultipleOp() + try { + methodsMap.put(SQLConstants.UPDATE_KEY, this.getClass() + .getMethod("prepareUpdate", String.class, String.class, + byte[].class)); + methodsMap.put(SQLConstants.INSERT_IF_NOT_EXIST_KEY, this.getClass() + .getMethod("prepareInsertIfNotExists", String.class, String.class, + byte[].class)); + methodsMap.put(SQLConstants.DELETE_RANGE_KEY, this.getClass() + .getMethod("prepareDeleteRange", String.class, String.class, + String.class)); + } catch (NoSuchMethodException | SecurityException e) { + throw new YarnRuntimeException( + "One or more methods initilisation failed " + e); + } + } + + @VisibleForTesting + protected synchronized void ensureConnection() throws SQLException { + if (connection != null && !connection.isClosed() && connection.isValid(0)) { + return; + } + connection = DriverManager.getConnection(dbHostName, userName, password); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + @VisibleForTesting + public synchronized void disconnect() throws SQLException { + if (connection != null) + connection.close(); + } + + @Override + public synchronized void doCreateTable(String table) throws SQLException { + try { + Connection conn = DriverManager + .getConnection(dbHostName, userName, password); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(createTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + conn.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing connetion in doCreateTable " + + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized void doDropTable(String table) throws SQLException { + try { + Connection conn = DriverManager + .getConnection(dbHostName, userName, password); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(dropTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + conn.close(); + } catch (Exception e) { + LOG.warn( + "Exception occured while closing connetion in doDropTable " + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized LocalResultSet getFromTable(String table, String key) + throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection.prepareStatement(select(table)); + getKey.setString(1, key); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + public synchronized LocalResultSet getFromTableForShare(String table, + String key) throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection + .prepareStatement(selectForShare(table)); + getKey.setString(1, key); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public synchronized LocalResultSet getFromTableRange(String table, + String from, String to) throws SQLException { + try { + ensureConnection(); + PreparedStatement getKey = connection + .prepareStatement(selectRange(table)); + getKey.setString(1, from); + getKey.setString(2, to); + return new LocalResultSet(getKey.executeQuery(), getKey); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + public synchronized PreparedStatement prepareCreateTable(String table) + throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(createTable(table)); + return preparedStatement; + } + + public synchronized PreparedStatement prepareDeleteRange(String table, + String from, String to) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(deleteRange(table)); + preparedStatement.setString(1, from); + preparedStatement.setString(2, to); + return preparedStatement; + } + + public synchronized PreparedStatement prepareDelete(String table, String key) + throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(delete(table)); + preparedStatement.setString(1, key); + return preparedStatement; + + } + + public synchronized PreparedStatement prepareUpdate(String table, String key, + byte[] value) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(update(table)); + preparedStatement.setString(2, key); + preparedStatement.setBytes(1, value); + return preparedStatement; + } + + public synchronized PreparedStatement prepareInsertIfNotExists(String table, + String key, byte[] value) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(insertIfNotExists(table)); + preparedStatement.setString(1, key); + preparedStatement.setBytes(2, value); + preparedStatement.setString(3, key); + return preparedStatement; + } + + protected String selectRange(String table) { + return "SELECT key, value from " + table + " WHERE key >= ? and key <= ?"; + } + + protected String select(String table) { + return "SELECT key, value from " + table + " WHERE key = ?"; + } + + protected String selectForShare(String table) { + return "SELECT key, value from " + table + " WHERE key = ? FOR SHARE"; + } + + protected String insertIfNotExists(String table) { + return "INSERT INTO " + table + + " (key, value) SELECT ?,? WHERE NOT EXISTS ( SELECT value FROM " + + table + " WHERE key = ?)"; + } + + protected String deleteRange(String table) { + return "DELETE FROM " + table + " WHERE key >= ? and key <= ?"; + } + + protected String delete(String table) { + return "DELETE FROM " + table + " WHERE key = ?"; + } + + protected String createTable(String table) { + return "CREATE TABLE IF NOT EXISTS " + table + + " (key text PRIMARY KEY, value bytea)"; + } + + protected String update(String table) { + return "UPDATE " + table + " SET value = ? WHERE key = ?"; + } + + protected String dropTable(String table) { + return "DROP TABLE IF EXISTS " + table; + } + + /** + * 1. Begin transaction. + * 2. Update stuff + * 3. Check the master key. + * 4. If the master key is not same as the digest + * then abort + * else + * 5. Commit. + * @throws StoreFencedException + * @throws SQLException + */ + protected synchronized void doTransaction(List operations, + Credential credential) throws StoreFencedException, SQLException { + LocalResultSet lrs = null; + try { + ensureConnection(); + connection.setAutoCommit(false); + for (PreparedStatement st : operations) { + st.executeUpdate(); + } + lrs = getFromTableForShare(credential.getCredentialTable(), + credential.getKey()); + if (!lrs.set.next()) { + LOG.warn("Credential set is null"); + throw new StoreFencedException(); + } + String cred = new String(lrs.set.getBytes("value")); + if (!cred.equals(credential.getSecret())) { + LOG.warn("Stored credentials are not equal to RM's credentials"); + throw new StoreFencedException(); + } + connection.commit(); + } catch (StoreFencedException | SQLException e) { + try { + connection.rollback(); + } catch (SQLException e1) { + LOG.error("Error occurred while Connnection rollback " + e1); + } + e.printStackTrace(); + throw e; + } finally { + // This should not throw error. If it does current transaction will be retried. + ensureConnection(); + connection.setAutoCommit(true); + if (lrs != null) + lrs.close(); + for (PreparedStatement st : operations) { + try { + st.close(); + } catch (Exception e) { + LOG.warn("Exception occured while closing prepared statements."); + } + } + } + } + + @Override + public synchronized void fence(Credential credential) throws SQLException { + // Put the credential in the master table. + try { + byte[] value = credential.getSecret().getBytes(); + PreparedStatement update = prepareUpdate(credential.getCredentialTable(), + credential.getKey(), value); + PreparedStatement insertIfNotThere = prepareInsertIfNotExists( + credential.getCredentialTable(), credential.getKey(), value); + try { + ensureConnection(); + connection.setAutoCommit(false); + update.executeUpdate(); + insertIfNotThere.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + try { + connection.rollback(); + } catch (SQLException e1) { + LOG.warn("Exception occured while rollback in fence() " + e); + } + throw e; + } finally { + try { + connection.setAutoCommit(true); + } catch (SQLException e1) { + LOG.warn("Exception occured while setting auto commit fence() " + e1); + } + try { + update.close(); + insertIfNotThere.close(); + } catch (Exception e) { + LOG.warn( + "Exception occured while closing prepared statements in fence() " + + e); + } + } + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doUpdateOrInsert(String table, String key, byte[] value, + Credential credential) throws SQLException, StoreFencedException { + try { + List opList = new ArrayList(); + opList.add(prepareUpdate(table, key, value)); + opList.add(prepareInsertIfNotExists(table, key, value)); + doTransaction(opList, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doDelete(String table, String key, Credential credential) + throws SQLException, StoreFencedException { + try { + List opList = new ArrayList(); + opList.add(prepareDelete(table, key)); + doTransaction(opList, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @Override + public void doFencingCheck(Credential credential) + throws StoreFencedException, SQLException { + try { + doTransaction(new ArrayList(), credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } + + @SuppressWarnings("rawtypes") + @Override + public void doMultipleOp(List opList, Credential credential) + throws StoreFencedException, SQLException { + try { + List listOfOperations = new ArrayList<>(); + for (Operation m : opList) { + for (String key : m.keySet()) { + try { + Method method = methodsMap.get(key); + List params = m.get(key); + // For now all the methods have 3 parameters. + listOfOperations.add((PreparedStatement) method + .invoke(this, params.get(0), params.get(1), params.get(2))); + } catch (SecurityException | IllegalAccessException e) { + throw new YarnRuntimeException("Method invokation failed " + e); + } catch (InvocationTargetException e) { + // Handle InvocationTargetExeception separately. + if (e.getCause().getClass().isAssignableFrom(SQLException.class)) { + throw new SQLException(e.getCause()); + } + } + } + } + doTransaction(listOfOperations, credential); + } catch (SQLException e) { + e.printStackTrace(); + throw new SQLException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java new file mode 100644 index 0000000..f26424a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLConstants.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +public class SQLConstants { + + public static final String DELETE_RANGE_KEY = "deleteRange"; + public static final String UPDATE_KEY = "update"; + public static final String INSERT_IF_NOT_EXIST_KEY = "insertIfNotExists"; + public static final String KEY_COLUMN = "key"; + public static final String VALUE_COLUMN = "value"; + + private SQLConstants() { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java new file mode 100644 index 0000000..fa79949 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/sql/SQLOperations.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.sql; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; + +import java.sql.SQLException; +import java.util.List; + +public interface SQLOperations { + + void init(Configuration conf); + + void doCreateTable(String applicationTable) throws SQLException; + + void fence(Credential credential) throws SQLException; + + void disconnect() throws SQLException; + + LocalResultSet getFromTable(String table, String key) throws SQLException; + + void doUpdateOrInsert(String table, String key, byte[] value, + Credential credential) throws SQLException, StoreFencedException; + + LocalResultSet getFromTableRange(String table, String from, String key) + throws SQLException; + + void doMultipleOp(List opList, Credential credential) + throws SQLException, StoreFencedException; + + void doDelete(String table, String key, Credential credential) + throws SQLException, StoreFencedException; + + void doDropTable(String table) throws SQLException; + + void doFencingCheck(Credential credential) + throws SQLException, StoreFencedException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java new file mode 100644 index 0000000..3919a3c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestSqlStateStore.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.sql.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.h2.tools.DeleteDbFiles; +import org.junit.Test; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +public class TestSqlStateStore extends RMStateStoreTestBase { + + public static final String H2_LOCATION = "h2db"; + + @Test + public void testSqlStateStore() throws Exception { + try { + TestSqlStateStoreTests tester = new TestSqlStateStoreTests(); + testRMAppStateStore(tester); + testRMDTSecretManagerStateStore(tester); + testCheckVersion(tester); + testEpoch(tester); + testAppDeletion(tester); + testAMRMTokenSecretManagerStateStore(tester); + testReservationStateStore(tester); + testDeleteStore(tester); + tester.store.sqlop.disconnect(); + } finally { + DeleteDbFiles.execute("./target", H2_LOCATION, true); + } + } + + private Configuration createHARMConf(String rmIds, String rmId, + int adminPort) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, TestSqlStateInternal.class.getName()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0"); + + for (String rpcAddress : YarnConfiguration + .getServiceAddressConfKeys(conf)) { + for (String id : HAUtil.getRMHAIds(conf)) { + conf.set(HAUtil.addSuffix(rpcAddress, id), "localhost:0"); + } + } + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), + "localhost:" + adminPort); + return conf; + } + + @Test + public void testSqlStateStoreFencing() throws Exception { + try { + StateChangeRequestInfo req = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234); + conf1.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + ResourceManager rm1 = new ResourceManager(); + rm1.init(conf1); + rm1.start(); + rm1.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with SQL Store didn't start", Service.STATE.STARTED, + rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678); + conf2.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + ResourceManager rm2 = new ResourceManager(); + rm2.init(conf2); + rm2.start(); + rm2.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with SQL Store didn't start", Service.STATE.STARTED, + rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + Thread.sleep(YarnConfiguration.DEFAULT_RM_DB_VERIFICATION_TIMEOUT); + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); + } finally { + DeleteDbFiles.execute("./target", H2_LOCATION, true); + } + } + + static class TestSqlStateInternal extends SqlRMStateStore { + + public TestSqlStateInternal(Configuration conf, String uuid) { + this.uuid = uuid; + init(conf); + start(); + } + + public TestSqlStateInternal() { + } + + @Override + protected SQLOperations createSQLOperationsWithRetry() { + return new TestSQlOperations(); + } + + } + + static class TestSQlOperations extends PSQLOperations { + @Override + protected void ensureConnection() throws SQLException { + if (connection != null && !connection.isClosed() && connection + .isValid(0)) { + return; + } + connection = DriverManager + .getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + connection + .setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + @Override + public void doCreateTable(String table) throws SQLException { + Connection conn = DriverManager + .getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(createTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + } catch (Exception e) { + + } + } + } + + @Override + public void doDropTable(String table) throws SQLException { + Connection conn = DriverManager + .getConnection("jdbc:h2:./target/" + H2_LOCATION, "test", ""); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(dropTable(table)); + stmt.execute(); + } finally { + try { + stmt.close(); + } catch (Exception e) { + + } + } + } + + @Override + protected String createTable(String table) { + return "CREATE TABLE IF NOT EXISTS " + table + + " (key varchar(1024) PRIMARY KEY, value blob)"; + } + + @Override + public void doUpdateOrInsert(String table, String key, byte[] value, + Credential credential) throws SQLException, StoreFencedException { + List opList = new ArrayList(); + String sql = + "MERGE INTO " + table + " (key, value) KEY (key) VALUES (?,?)"; + ensureConnection(); + PreparedStatement stmt = connection.prepareStatement(sql); + stmt.setString(1, key); + stmt.setObject(2, value); + opList.add(stmt); + doTransaction(opList, credential); + } + + @Override + public PreparedStatement prepareInsertIfNotExists(String table, String key, + byte[] value) throws SQLException { + ensureConnection(); + PreparedStatement preparedStatement = connection + .prepareStatement(insertIfNotExists(table)); + preparedStatement.setString(1, key); + preparedStatement.setBytes(2, value); + return preparedStatement; + } + + @Override + protected String selectForShare(String table) { + // Overriding it and changing it to select for update because select for share is not available in H2. + return selectForUpdate(table); + } + + protected String selectForUpdate(String table) { + return "SELECT key, value from " + table + " WHERE key = ? FOR UPDATE"; + } + + @Override + protected String insertIfNotExists(String table) { + return "MERGE INTO " + table + " (key, value) KEY (key) VALUES (?,?)"; + } + + @SuppressWarnings("rawtypes") + @Override + public void doMultipleOp(List opList, Credential credential) + throws StoreFencedException, SQLException { + try { + List listOfOperations = new ArrayList<>(); + for (Operation m : opList) { + for (String key : m.keySet()) { + try { + switch (key.toString()) { + case "update": + Method method = this.getClass() + .getMethod("prepareUpdate", String.class, String.class, + byte[].class); + List params = m.get(key); + listOfOperations.add((PreparedStatement) method + .invoke(this, params.get(0), params.get(1), params.get(2))); + break; + case "insertIfNotExists": + method = this.getClass() + .getMethod("prepareInsertIfNotExists", String.class, + String.class, byte[].class); + params = m.get(key); + listOfOperations.add((PreparedStatement) method + .invoke(this, params.get(0), params.get(1), params.get(2))); + break; + case "deleteRange": + method = this.getClass() + .getMethod("prepareDeleteRange", String.class, String.class, + String.class); + params = m.get(key); + listOfOperations.add((PreparedStatement) method + .invoke(this, params.get(0), params.get(1), params.get(2))); + break; + default: + break; + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new YarnRuntimeException("Method invokation failed " + e); + } + } + } + doTransaction(listOfOperations, credential); + } catch (SQLException e) { + throw new SQLException(e); + } + } + } + + static class TestSqlStateStoreTests implements RMStateStoreHelper { + TestSqlStateInternal store; + String uuid = UUID.randomUUID().toString(); + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + this.store = new TestSqlStateInternal(conf, uuid); + return this.store; + } + + @Override + public boolean isFinalStateValid() throws Exception { + return true; + } + + @Override + public void writeVersion(Version version) throws Exception { + byte[] data = ((VersionPBImpl) version).getProto().toByteArray(); + store.sqlop.doUpdateOrInsert(TestSqlStateInternal.CREDENTIAL_TABLE, + TestSqlStateInternal.VERSION, data, store.credential); + } + + @Override + public Version getCurrentVersion() throws Exception { + return store.getCurrentVersion(); + } + + @Override + public boolean appExists(RMApp app) throws Exception { + LocalResultSet lrs = null; + try { + lrs = store.sqlop.getFromTable(TestSqlStateInternal.APPLICATION_TABLE, + app.getApplicationId().toString()); + return lrs.set.next(); + } catch (Exception e) { + return false; + } finally { + try { + lrs.close(); + } catch (Exception e) { + + } + } + } + + @Override + public boolean attemptExists(RMAppAttempt attempt) throws Exception { + LocalResultSet lrs = null; + try { + lrs = store.sqlop.getFromTable(TestSqlStateInternal.ATTEMPT_TABLE, + attempt.getSubmissionContext().getApplicationId() + store.SEPARATOR + + attempt.getAppAttemptId()); + return lrs.set.next(); + } catch (Exception e) { + return false; + } finally { + try { + lrs.close(); + } catch (Exception e) { + + } + } + } + } +} -- 1.9.5 (Apple Git-50.3)