diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index 602fcd7..3b47cdd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -253,7 +253,7 @@
${basedir}/src/main/proto
- ${basedir}/src/main/proto/server
+ ${basedir}/src/main/proto
yarn_security_token.proto
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
deleted file mode 100644
index 339e99e..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "YarnSecurityTokenProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
-
-import "yarn_protos.proto";
-
-// None of the following records are supposed to be exposed to users.
-
-message NMTokenIdentifierProto {
- optional ApplicationAttemptIdProto appAttemptId = 1;
- optional NodeIdProto nodeId = 2;
- optional string appSubmitter = 3;
- optional int32 keyId = 4 [default = -1];
-}
-
-message AMRMTokenIdentifierProto {
- optional ApplicationAttemptIdProto appAttemptId = 1;
- optional int32 keyId = 2 [default = -1];
-}
-
-message ContainerTokenIdentifierProto {
- optional ContainerIdProto containerId = 1;
- optional string nmHostAddr = 2;
- optional string appSubmitter = 3;
- optional ResourceProto resource = 4;
- optional int64 expiryTimeStamp =5;
- optional int32 masterKeyId = 6 [default = -1];
- optional int64 rmIdentifier = 7;
- optional PriorityProto priority = 8;
- optional int64 creationTime = 9;
- optional LogAggregationContextProto logAggregationContext = 10;
- optional string nodeLabelExpression = 11;
- optional ContainerTypeProto containerType = 12;
-}
-
-message ClientToAMTokenIdentifierProto {
- optional ApplicationAttemptIdProto appAttemptId = 1;
- optional string clientName = 2;
-}
-
-message YARNDelegationTokenIdentifierProto {
- optional string owner = 1;
- optional string renewer = 2;
- optional string realUser = 3;
- optional int64 issueDate = 4;
- optional int64 maxDate = 5;
- optional int32 sequenceNumber = 6;
- optional int32 masterKeyId = 7;
-}
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto
new file mode 100644
index 0000000..339e99e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/yarn_security_token.proto
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "YarnSecurityTokenProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_protos.proto";
+
+// None of the following records are supposed to be exposed to users.
+
+message NMTokenIdentifierProto {
+ optional ApplicationAttemptIdProto appAttemptId = 1;
+ optional NodeIdProto nodeId = 2;
+ optional string appSubmitter = 3;
+ optional int32 keyId = 4 [default = -1];
+}
+
+message AMRMTokenIdentifierProto {
+ optional ApplicationAttemptIdProto appAttemptId = 1;
+ optional int32 keyId = 2 [default = -1];
+}
+
+message ContainerTokenIdentifierProto {
+ optional ContainerIdProto containerId = 1;
+ optional string nmHostAddr = 2;
+ optional string appSubmitter = 3;
+ optional ResourceProto resource = 4;
+ optional int64 expiryTimeStamp =5;
+ optional int32 masterKeyId = 6 [default = -1];
+ optional int64 rmIdentifier = 7;
+ optional PriorityProto priority = 8;
+ optional int64 creationTime = 9;
+ optional LogAggregationContextProto logAggregationContext = 10;
+ optional string nodeLabelExpression = 11;
+ optional ContainerTypeProto containerType = 12;
+}
+
+message ClientToAMTokenIdentifierProto {
+ optional ApplicationAttemptIdProto appAttemptId = 1;
+ optional string clientName = 2;
+}
+
+message YARNDelegationTokenIdentifierProto {
+ optional string owner = 1;
+ optional string renewer = 2;
+ optional string realUser = 3;
+ optional int64 issueDate = 4;
+ optional int64 maxDate = 5;
+ optional int32 sequenceNumber = 6;
+ optional int32 masterKeyId = 7;
+}
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 983c4b8..9748374 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -205,7 +205,7 @@
${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto
${basedir}/../../hadoop-yarn-api/src/main/proto
- ${basedir}/../../hadoop-yarn-common/src/main/proto/server/
+ ${basedir}/../../hadoop-yarn-common/src/main/proto
${basedir}/../hadoop-yarn-server-common/src/main/proto
${basedir}/src/main/proto
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 49a0bdb..9d54184 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -283,7 +283,7 @@
${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto
${basedir}/../../hadoop-yarn-api/src/main/proto
- ${basedir}/../../hadoop-yarn-common/src/main/proto/server/
+ ${basedir}/../../hadoop-yarn-common/src/main/proto
${basedir}/../hadoop-yarn-server-common/src/main/proto
${basedir}/src/main/proto
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index b972791..3a6a0b0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -26,8 +26,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -44,12 +46,14 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -62,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -96,7 +101,7 @@
private Path rootDirPath;
@Private
@VisibleForTesting
- Path rmDTSecretManagerRoot;
+ private Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
private int fsNumRetries;
@@ -105,9 +110,11 @@
YarnConfiguration.DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION;
@VisibleForTesting
- Path fsWorkingPath;
+ private Path fsWorkingPath;
+
+ private Path amrmTokenSecretManagerRoot;
+ private Path reservationRoot;
- Path amrmTokenSecretManagerRoot;
@Override
public synchronized void initInternal(Configuration conf)
throws Exception{
@@ -117,6 +124,7 @@ public synchronized void initInternal(Configuration conf)
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
amrmTokenSecretManagerRoot =
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+ reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT);
fsNumRetries =
conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
@@ -153,6 +161,7 @@ protected synchronized void startInternal() throws Exception {
mkdirsWithRetries(rmDTSecretManagerRoot);
mkdirsWithRetries(rmAppRoot);
mkdirsWithRetries(amrmTokenSecretManagerRoot);
+ mkdirsWithRetries(reservationRoot);
}
@Override
@@ -222,9 +231,24 @@ public synchronized RMState loadState() throws Exception {
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
+ // recover reservation state
+ loadReservationSystemState(rmState);
return rmState;
}
+ private void loadReservationSystemState(RMState rmState) throws Exception {
+ try {
+ final ReservationStateFileProcessor fileProcessor = new
+ ReservationStateFileProcessor(rmState);
+ final Path rootDirectory = this.reservationRoot;
+
+ processDirectoriesOfFiles(fileProcessor, rootDirectory);
+ } catch (Exception e) {
+ LOG.error("Failed to load state.", e);
+ throw e;
+ }
+ }
+
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
@@ -248,50 +272,12 @@ private void loadAMRMTokenSecretManagerState(RMState rmState)
private void loadRMAppState(RMState rmState) throws Exception {
try {
- List attempts =
- new ArrayList();
-
- for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
- checkAndResumeUpdateOperation(appDir.getPath());
- for (FileStatus childNodeStatus :
- listStatusWithRetries(appDir.getPath())) {
- assert childNodeStatus.isFile();
- String childNodeName = childNodeStatus.getPath().getName();
- if (checkAndRemovePartialRecordWithRetries(
- childNodeStatus.getPath())) {
- continue;
- }
- byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
- childNodeStatus.getLen());
- // Set attribute if not already set
- setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
- if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
- // application
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application from node: " + childNodeName);
- }
- ApplicationStateDataPBImpl appState =
- new ApplicationStateDataPBImpl(
- ApplicationStateDataProto.parseFrom(childData));
- ApplicationId appId =
- appState.getApplicationSubmissionContext().getApplicationId();
- rmState.appState.put(appId, appState);
- } else if (childNodeName
- .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
- // attempt
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application attempt from node: "
- + childNodeName);
- }
- ApplicationAttemptStateDataPBImpl attemptState =
- new ApplicationAttemptStateDataPBImpl(
- ApplicationAttemptStateDataProto.parseFrom(childData));
- attempts.add(attemptState);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
- }
- }
- }
+ List attempts = new ArrayList<>();
+ final RMAppStateFileProcessor rmAppStateFileProcessor =
+ new RMAppStateFileProcessor(rmState, attempts);
+ final Path rootDirectory = this.rmAppRoot;
+
+ processDirectoriesOfFiles(rmAppStateFileProcessor, rootDirectory);
// go through all attempts and add them to their apps, Ideally, each
// attempt node must have a corresponding app node, because remove
@@ -309,6 +295,29 @@ private void loadRMAppState(RMState rmState) throws Exception {
}
}
+ private void processDirectoriesOfFiles(
+ RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory)
+ throws Exception {
+ for (FileStatus dir : listStatusWithRetries(rootDirectory)) {
+ checkAndResumeUpdateOperation(dir.getPath());
+ String dirName = dir.getPath().getName();
+ for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) {
+ assert fileNodeStatus.isFile();
+ String fileName = fileNodeStatus.getPath().getName();
+ if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) {
+ continue;
+ }
+ byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(),
+ fileNodeStatus.getLen());
+ // Set attribute if not already set
+ setUnreadableBySuperuserXattrib(fileNodeStatus.getPath());
+
+ rmAppStateFileProcessor.processChildNode(dirName, fileName,
+ fileData);
+ }
+ }
+ }
+
private boolean checkAndRemovePartialRecord(Path record) throws IOException {
// If the file ends with .tmp then it shows that it failed
// during saving state into state store. The file will be deleted as a
@@ -843,6 +852,41 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
}
}
+ @Override
+ protected void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ Path planCreatePath = getNodePath(reservationRoot, planName);
+ mkdirsWithRetries(planCreatePath);
+ Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+ LOG.info("Storing state for reservation " + reservationIdName + " from " +
+ "plan " + planName + " at path " + reservationPath);
+ byte[] reservationData = reservationAllocation.toByteArray();
+ writeFileWithRetries(reservationPath, reservationData, true);
+ }
+
+ @Override
+ protected void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ Path planCreatePath = getNodePath(reservationRoot, planName);
+ Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+ LOG.info("Updating state for reservation " + reservationIdName + " from " +
+ "plan " + planName + " at path " + reservationPath);
+ byte[] reservationData = reservationAllocation.toByteArray();
+ updateFile(reservationPath, reservationData, true);
+ }
+
+ @Override
+ protected void removeReservationState(
+ String planName, String reservationIdName) throws Exception {
+ Path planCreatePath = getNodePath(reservationRoot, planName);
+ Path reservationPath = getNodePath(planCreatePath, reservationIdName);
+ LOG.info("Removing state for reservation " + reservationIdName + " from " +
+ "plan " + planName + " at path " + reservationPath);
+ deleteFileWithRetries(reservationPath);
+ }
+
@VisibleForTesting
public int getNumRetries() {
return fsNumRetries;
@@ -853,8 +897,7 @@ public long getRetryInterval() {
return fsRetryInterval;
}
- private void setUnreadableBySuperuserXattrib(Path p)
- throws IOException {
+ private void setUnreadableBySuperuserXattrib(Path p) throws IOException {
if (fs.getScheme().toLowerCase().contains("hdfs")
&& intermediateEncryptionEnabled
&& !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
@@ -862,4 +905,75 @@ private void setUnreadableBySuperuserXattrib(Path p)
EnumSet.of(XAttrSetFlag.CREATE));
}
}
+
+ private class ReservationStateFileProcessor implements RMStateFileProcessor {
+ private RMState rmState;
+ public ReservationStateFileProcessor(RMState state) {
+ this.rmState = state;
+ }
+
+ @Override
+ public void processChildNode(String planName, String childNodeName,
+ byte[] childData) throws InvalidProtocolBufferException {
+ ReservationAllocationStateProto allocationState =
+ ReservationAllocationStateProto.parseFrom(childData);
+ if (!rmState.reservationState.containsKey(planName)) {
+ rmState.reservationState.put(planName,
+ new HashMap());
+ }
+ ReservationId reservationId = ReservationSystemUtil.toReservationId(
+ allocationState.getReservationId());
+ rmState.reservationState.get(planName).put(reservationId,
+ allocationState);
+ }
+ }
+
+ private class RMAppStateFileProcessor implements RMStateFileProcessor {
+ private RMState rmState;
+ private List attempts;
+
+ public RMAppStateFileProcessor(RMState rmState,
+ List attempts) {
+ this.rmState = rmState;
+ this.attempts = attempts;
+ }
+
+ @Override
+ public void processChildNode(String appDirName, String childNodeName,
+ byte[] childData)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ // application
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from node: " + childNodeName);
+ }
+ ApplicationStateDataPBImpl appState =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(childData));
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+ rmState.appState.put(appId, appState);
+ } else if (childNodeName.startsWith(
+ ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ // attempt
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application attempt from node: "
+ + childNodeName);
+ }
+ ApplicationAttemptStateDataPBImpl attemptState =
+ new ApplicationAttemptStateDataPBImpl(
+ ApplicationAttemptStateDataProto.parseFrom(childData));
+ attempts.add(attemptState);
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
+ }
+ }
+ }
+
+ // Interface for common state processing of directory of file layout
+ private interface RMStateFileProcessor {
+ void processChildNode(String appDirName, String childNodeName,
+ byte[] childData)
+ throws com.google.protobuf.InvalidProtocolBufferException;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 46a3459..bee6141 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -27,6 +27,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -39,6 +40,7 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -46,6 +48,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -57,6 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -84,6 +88,8 @@
RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
private static final String RM_APP_KEY_PREFIX =
RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
+ private static final String RM_RESERVATION_KEY_PREFIX =
+ RESERVATION_SYSTEM_ROOT + SEPARATOR;
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0);
@@ -112,6 +118,12 @@ private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) {
return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
}
+ private String getReservationNodeKey(String planName,
+ String reservationId) {
+ return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR
+ + reservationId;
+ }
+
@Override
protected void initInternal(Configuration conf) throws Exception {
}
@@ -230,9 +242,57 @@ public RMState loadState() throws Exception {
loadRMDTSecretManagerState(rmState);
loadRMApps(rmState);
loadAMRMTokenSecretManagerState(rmState);
+ loadReservationState(rmState);
return rmState;
}
+ private void loadReservationState(RMState rmState) throws IOException {
+ int numReservations = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+ String key = asString(entry.getKey());
+
+ String planReservationString =
+ key.substring(RM_RESERVATION_KEY_PREFIX.length());
+ String[] parts = planReservationString.split(SEPARATOR);
+ if (parts.length != 2) {
+ LOG.warn("Incorrect reservation state key " + key);
+ continue;
+ }
+ String planName = parts[0];
+ String reservationName = parts[1];
+ ReservationAllocationStateProto allocationState =
+ ReservationAllocationStateProto.parseFrom(entry.getValue());
+ if (!rmState.reservationState.containsKey(planName)) {
+ rmState.reservationState.put(planName,
+ new HashMap());
+ }
+ ReservationId nodeReservationId =
+ ReservationId.parseReservationId(reservationName);
+ ReservationId reservationId = ReservationSystemUtil.toReservationId(
+ allocationState.getReservationId());
+ if (!nodeReservationId.equals(reservationId)) {
+ throw new YarnRuntimeException("The database entry for "
+ + reservationName + " contains data for " + reservationId);
+ }
+ rmState.reservationState.get(planName).put(reservationId,
+ allocationState);
+ numReservations++;
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ LOG.info("Recovered " + numReservations + " reservations");
+ }
+
private void loadRMDTSecretManagerState(RMState state) throws IOException {
int numKeys = loadRMDTSecretManagerKeys(state);
LOG.info("Recovered " + numKeys + " RM delegation token master keys");
@@ -544,7 +604,53 @@ protected void removeApplicationStateInternal(ApplicationStateData appState)
throw new IOException(e);
}
}
-
+
+ @Override
+ protected void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ String key = getReservationNodeKey(planName, reservationIdName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing state for reservation " + reservationIdName
+ + " plan " + planName + " at " + key);
+ }
+ try {
+ db.put(bytes(key), reservationAllocation.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ storeReservationState(reservationAllocation, planName,
+ reservationIdName);
+ }
+
+ @Override
+ protected void removeReservationState(String planName,
+ String reservationIdName) throws Exception {
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ String reservationKey =
+ getReservationNodeKey(planName, reservationIdName);
+ batch.delete(bytes(reservationKey));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing state for reservation " + reservationIdName
+ + " plan " + planName + " at " + reservationKey);
+ }
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
Long renewDate, boolean isUpdate) throws IOException {
String tokenKey = getRMDTTokenNodeKey(tokenId);
@@ -679,7 +785,7 @@ int getNumEntriesInDatabase() throws IOException {
iter = new LeveldbIterator(db);
iter.seekToFirst();
while (iter.hasNext()) {
- Entry entry = iter.next();
+ Entry entry = iter.next();
LOG.info("entry: " + asString(entry.getKey()));
++numEntries;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 609f403..6b90b2d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -28,7 +29,9 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -224,6 +227,58 @@ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
}
@Override
+ protected synchronized void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ LOG.info("Storing reservationallocation for " + reservationIdName + " " +
+ "for plan " + planName);
+ Map planState =
+ state.reservationState.get(planName);
+ if (planState == null) {
+ planState = new HashMap<>();
+ state.reservationState.put(planName, planState);
+ }
+ ReservationId reservationId =
+ ReservationId.parseReservationId(reservationIdName);
+ planState.put(reservationId, reservationAllocation);
+ }
+
+ @Override
+ protected synchronized void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ LOG.info("Updating reservationallocation for " + reservationIdName + " " +
+ "for plan " + planName);
+ Map planState =
+ state.reservationState.get(planName);
+ if (planState == null) {
+ throw new YarnRuntimeException("State for plan " + planName + " does " +
+ "not exist");
+ }
+ ReservationId reservationId =
+ ReservationId.parseReservationId(reservationIdName);
+ planState.put(reservationId, reservationAllocation);
+ }
+
+ @Override
+ protected synchronized void removeReservationState(
+ String planName, String reservationIdName) throws Exception {
+ LOG.info("Removing reservationallocation " + reservationIdName
+ + " for plan " + planName);
+
+ Map planState =
+ state.reservationState.get(planName);
+ if (planState == null) {
+ throw new YarnRuntimeException("State for plan " + planName + " does " +
+ "not exist");
+ }
+ planState.remove(reservationIdName);
+ if (planState.isEmpty()) {
+ state.reservationState.remove(planName);
+ }
+ }
+
+ @Override
protected Version loadVersion() throws Exception {
return null;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index 92c07cd..ffb6153 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
@@ -102,6 +103,26 @@ public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exceptio
}
@Override
+ protected void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void removeReservationState(String planName,
+ String reservationIdName) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ protected void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception {
+ // Do nothing
+ }
+
+ @Override
public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
// Do nothing
}
@@ -155,4 +176,6 @@ public void removeApplication(ApplicationId removeAppId) throws Exception {
// Do nothing
}
+
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 46c2954..7bf6163 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -43,10 +43,12 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
@@ -86,6 +88,8 @@
"RMDTSequenceNumber_";
protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
"AMRMTokenSecretManagerRoot";
+ protected static final String RESERVATION_SYSTEM_ROOT =
+ "ReservationSystemRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
private ResourceManager resourceManager;
@@ -135,7 +139,16 @@
new UpdateRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.UPDATE_AMRM_TOKEN,
- new StoreOrUpdateAMRMTokenTransition())
+ new StoreOrUpdateAMRMTokenTransition())
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+ RMStateStoreEventType.STORE_RESERVATION,
+ new StoreReservationAllocationTransition())
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+ RMStateStoreEventType.UPDATE_RESERVATION,
+ new UpdateReservationAllocationTransition())
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+ RMStateStoreEventType.REMOVE_RESERVATION,
+ new RemoveReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
RMStateStoreEventType.FENCED)
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
@@ -151,7 +164,10 @@
RMStateStoreEventType.STORE_DELEGATION_TOKEN,
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
- RMStateStoreEventType.UPDATE_AMRM_TOKEN));
+ RMStateStoreEventType.UPDATE_AMRM_TOKEN,
+ RMStateStoreEventType.STORE_RESERVATION,
+ RMStateStoreEventType.UPDATE_RESERVATION,
+ RMStateStoreEventType.REMOVE_RESERVATION));
private final StateMachine {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ RMStateStoreStoreReservationEvent reservationEvent =
+ (RMStateStoreStoreReservationEvent) event;
+ try {
+ LOG.info("Storing reservation allocation." + reservationEvent
+ .getReservationIdName());
+ store.storeReservationState(
+ reservationEvent.getReservationAllocation(),
+ reservationEvent.getPlanName(),
+ reservationEvent.getReservationIdName());
+ } catch (Exception e) {
+ LOG.error("Error while storing reservation allocation.", e);
+ store.notifyStoreOperationFailed(e);
+ }
+ }
+ }
+
+ private static class UpdateReservationAllocationTransition implements
+ SingleArcTransition {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ RMStateStoreStoreReservationEvent reservationEvent =
+ (RMStateStoreStoreReservationEvent) event;
+ try {
+ LOG.info("Updating reservation allocation." + reservationEvent
+ .getReservationIdName());
+ store.updateReservationState(
+ reservationEvent.getReservationAllocation(),
+ reservationEvent.getPlanName(),
+ reservationEvent.getReservationIdName());
+ } catch (Exception e) {
+ LOG.error("Error while updating reservation allocation.", e);
+ store.notifyStoreOperationFailed(e);
+ }
+ }
+ }
+
+ private static class RemoveReservationAllocationTransition implements
+ SingleArcTransition {
+ @Override
+ public void transition(RMStateStore store, RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreStoreReservationEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return;
+ }
+ RMStateStoreStoreReservationEvent reservationEvent =
+ (RMStateStoreStoreReservationEvent) event;
+ try {
+ LOG.info("Removing reservation allocation." + reservationEvent
+ .getReservationIdName());
+ store.removeReservationState(
+ reservationEvent.getPlanName(),
+ reservationEvent.getReservationIdName());
+ } catch (Exception e) {
+ LOG.error("Error while removing reservation allocation.", e);
+ store.notifyStoreOperationFailed(e);
+ }
+ }
+ }
+
public RMStateStore() {
super(RMStateStore.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -449,12 +539,16 @@ public int getDTSequenceNumber() {
* State of the ResourceManager
*/
public static class RMState {
- Map appState =
+ private Map appState =
new TreeMap();
- RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
+ private RMDTSecretManagerState rmSecretManagerState = new
+ RMDTSecretManagerState();
- AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+ private AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
+
+ private Map>
+ reservationState = new TreeMap<>();
public Map getApplicationState() {
return appState;
@@ -467,6 +561,11 @@ public RMDTSecretManagerState getRMDTSecretManagerState() {
public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
return amrmTokenSecretManagerState;
}
+
+ public Map>
+ getReservationState() {
+ return reservationState;
+ }
}
private Dispatcher rmDispatcher;
@@ -745,6 +844,57 @@ public void removeRMDTMasterKey(DelegationKey delegationKey) {
}
/**
+ * Blocking Apis to maintain reservation state.
+ */
+ public void storeNewReservation(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) {
+ handleStoreEvent(new RMStateStoreStoreReservationEvent(
+ reservationAllocation, RMStateStoreEventType.STORE_RESERVATION,
+ planName, reservationIdName));
+ }
+
+ public void updateReservation(
+ ReservationAllocationStateProto reservationAllocation,
+ String planName, String reservationIdName) {
+ handleStoreEvent(new RMStateStoreStoreReservationEvent(
+ reservationAllocation, RMStateStoreEventType.UPDATE_RESERVATION,
+ planName, reservationIdName));
+ }
+
+ public void removeReservation(String planName, String reservationIdName) {
+ handleStoreEvent(new RMStateStoreStoreReservationEvent(
+ null, RMStateStoreEventType.REMOVE_RESERVATION,
+ planName, reservationIdName));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of
+ * a reservation allocation.
+ */
+ protected abstract void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception;
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of
+ * a reservation allocation.
+ */
+ protected abstract void removeReservationState(String planName,
+ String reservationIdName) throws Exception;
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to update the state of
+ * a reservation allocation.
+ */
+ protected abstract void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName) throws Exception;
+
+ /**
* Blocking API
* Derived classes must implement this method to remove the state of
* DelegationToken Master Key
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index beba5eb..6f8dca7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -32,5 +32,8 @@
STORE_DELEGATION_TOKEN,
REMOVE_DELEGATION_TOKEN,
UPDATE_DELEGATION_TOKEN,
- UPDATE_AMRM_TOKEN
+ UPDATE_AMRM_TOKEN,
+ STORE_RESERVATION,
+ UPDATE_RESERVATION,
+ REMOVE_RESERVATION,
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java
new file mode 100644
index 0000000..ac30910
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+
+/**
+ * Event representing maintaining ReservationSystem state.
+ */
+public class RMStateStoreStoreReservationEvent extends RMStateStoreEvent {
+
+ private ReservationAllocationStateProto reservationAllocation;
+ private String planName;
+ private String reservationIdName;
+
+ public RMStateStoreStoreReservationEvent(RMStateStoreEventType type) {
+ super(type);
+ }
+
+ public RMStateStoreStoreReservationEvent(
+ ReservationAllocationStateProto reservationAllocationState,
+ RMStateStoreEventType type, String planName, String reservationIdName) {
+ this(type);
+ this.reservationAllocation = reservationAllocationState;
+ this.planName = planName;
+ this.reservationIdName = reservationIdName;
+ }
+
+ public ReservationAllocationStateProto getReservationAllocation() {
+ return reservationAllocation;
+ }
+
+ public String getPlanName() {
+ return planName;
+ }
+
+ public String getReservationIdName() {
+ return reservationIdName;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 8f096d8..0578428 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -26,6 +26,7 @@
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -53,6 +55,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@@ -66,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
@@ -107,6 +111,13 @@
* |----- currentMasterKey
* |----- nextMasterKey
*
+ * |-- RESERVATION_SYSTEM_ROOT
+ * |------PLAN_1
+ * | |------ RESERVATION_1
+ * | |------ RESERVATION_2
+ * | ....
+ * |------PLAN_2
+ * ....
* Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved
* separately. The currentMasterkey and nextMasterkey have been stored.
* Also, AMRMToken has been removed from ApplicationAttemptState.
@@ -142,6 +153,7 @@
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
private String amrmTokenSecretManagerRoot;
+ private String reservationRoot;
@VisibleForTesting
protected String znodeWorkingPath;
@@ -258,6 +270,7 @@ public synchronized void initInternal(Configuration conf) throws Exception {
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+ reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
}
@Override
@@ -279,6 +292,7 @@ public synchronized void startInternal() throws Exception {
create(delegationTokensRootPath);
create(dtSequenceNumberPath);
create(amrmTokenSecretManagerRoot);
+ create(reservationRoot);
}
private void logRootNodeAcls(String prefix) throws Exception {
@@ -375,9 +389,41 @@ public synchronized RMState loadState() throws Exception {
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
+ // recover reservation state
+ loadReservationSystemState(rmState);
return rmState;
}
+ private void loadReservationSystemState(RMState rmState) throws Exception {
+ List planNodes = getChildren(reservationRoot);
+ for (String planName : planNodes) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading plan from znode: " + planName);
+ }
+ String planNodePath = getNodePath(reservationRoot, planName);
+
+ List reservationNodes = getChildren(planNodePath);
+ for (String reservationNodeName : reservationNodes) {
+ String reservationNodePath = getNodePath(planNodePath,
+ reservationNodeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading reservation from znode: " + reservationNodePath);
+ }
+ byte[] reservationData = getData(reservationNodePath);
+ ReservationAllocationStateProto allocationState =
+ ReservationAllocationStateProto.parseFrom(reservationData);
+ if (!rmState.reservationState.containsKey(planName)) {
+ rmState.reservationState.put(planName,
+ new HashMap());
+ }
+ ReservationId reservationId = ReservationSystemUtil.toReservationId(
+ allocationState.getReservationId());
+ rmState.reservationState.get(planName).put(reservationId,
+ allocationState);
+ }
+ }
+ }
+
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
byte[] data = getData(amrmTokenSecretManagerRoot);
@@ -763,6 +809,81 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
}
+ @Override
+ protected synchronized void removeReservationState(String planName,
+ String reservationIdName)
+ throws Exception {
+ String planNodePath =
+ getNodePath(reservationRoot, planName);
+ String reservationPath = getNodePath(planNodePath,
+ reservationIdName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing reservationallocation " + reservationIdName + " for" +
+ " plan " + planName);
+ }
+ safeDelete(reservationPath);
+
+ List reservationNodes = getChildren(planNodePath);
+ if (reservationNodes.isEmpty()) {
+ safeDelete(planNodePath);
+ }
+ }
+
+ @Override
+ protected synchronized void storeReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName)
+ throws Exception {
+ SafeTransaction trx = new SafeTransaction();
+ addOrUpdateReservationState(
+ reservationAllocation, planName, reservationIdName, trx, false);
+ trx.commit();
+ }
+
+ @Override
+ protected synchronized void updateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName)
+ throws Exception {
+ SafeTransaction trx = new SafeTransaction();
+ addOrUpdateReservationState(
+ reservationAllocation, planName, reservationIdName, trx, true);
+ trx.commit();
+ }
+
+ private void addOrUpdateReservationState(
+ ReservationAllocationStateProto reservationAllocation, String planName,
+ String reservationIdName, SafeTransaction trx, boolean isUpdate)
+ throws Exception {
+ String planCreatePath =
+ getNodePath(reservationRoot, planName);
+ String reservationPath = getNodePath(planCreatePath,
+ reservationIdName);
+ byte[] reservationData = reservationAllocation.toByteArray();
+
+ if (!exists(planCreatePath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath);
+ }
+ trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
+ }
+
+ if (isUpdate) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating reservation: " + reservationIdName + " in plan:"
+ + planName + " at: " + reservationPath);
+ }
+ trx.setData(reservationPath, reservationData, -1);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing reservation: " + reservationIdName + " in plan:"
+ + planName + " at: " + reservationPath);
+ }
+ trx.create(reservationPath, reservationData, zkAcl,
+ CreateMode.PERSISTENT);
+ }
+ }
+
/**
* Utility function to ensure that the configured base znode exists.
* This recursively creates the znode as well as all of its parents.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
index a4dd23b..50eab84 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java
@@ -31,7 +31,7 @@
* {@link RLESparseResourceAllocation}
*
*/
-class InMemoryReservationAllocation implements ReservationAllocation {
+public class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName;
private final ReservationId reservationID;
@@ -45,7 +45,7 @@
private RLESparseResourceAllocation resourcesOverTime;
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map allocations,
@@ -54,7 +54,7 @@
allocations, calculator, minAlloc, false);
}
- InMemoryReservationAllocation(ReservationId reservationID,
+ public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName,
long startTime, long endTime,
Map allocations,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
index 8affae4..01defb2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java
@@ -18,14 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-final class ReservationSystemUtil {
+/**
+ * Utility class for ReservationSystem.
+ */
+public final class ReservationSystemUtil {
private ReservationSystemUtil() {
// not called
@@ -48,4 +63,101 @@ public static Resource toResource(ReservationRequest request) {
}
return resources;
}
+
+ public static ReservationAllocationStateProto buildStateProto(
+ ReservationAllocation allocation) {
+ ReservationIdProto idProto = convertToProtoFormat(
+ allocation.getReservationId());
+
+ ReservationAllocationStateProto.Builder builder =
+ ReservationAllocationStateProto.newBuilder();
+
+ builder.setAcceptanceTimestamp(allocation.getAcceptanceTime());
+ builder.setContainsGangs(allocation.containsGangs());
+ builder.setStartTime(allocation.getStartTime());
+ builder.setEndTime(allocation.getEndTime());
+ builder.setUser(allocation.getUser());
+ builder.setReservationId(idProto);
+ ReservationDefinitionProto definitionProto = convertToProtoFormat(
+ allocation.getReservationDefinition());
+ builder.setReservationDefinition(definitionProto);
+
+ for (Map.Entry entry :
+ allocation.getAllocationRequests().entrySet()) {
+ ResourceAllocationRequestProto p =
+ ResourceAllocationRequestProto.newBuilder()
+ .setStartTime(entry.getKey().getStartTime())
+ .setEndTime(entry.getKey().getEndTime())
+ .setResource(convertToProtoFormat(entry.getValue()))
+ .build();
+ builder.addAllocationRequests(p);
+ }
+
+ ReservationAllocationStateProto allocationProto = builder.build();
+ return allocationProto;
+ }
+
+ private static ReservationIdProto convertToProtoFormat(
+ ReservationId reservationId) {
+ return ((ReservationIdPBImpl)reservationId).getProto();
+ }
+
+ private static ReservationDefinitionProto convertToProtoFormat(
+ ReservationDefinition reservationDefinition) {
+ return ((ReservationDefinitionPBImpl)reservationDefinition).getProto();
+ }
+
+ public static ResourceProto convertToProtoFormat(Resource e) {
+ return YarnProtos.ResourceProto.newBuilder()
+ .setMemory(e.getMemory())
+ .setVirtualCores(e.getVirtualCores())
+ .build();
+ }
+
+ public static Map toAllocations(
+ List allocationRequestsList) {
+ Map allocations = new HashMap<>();
+ for (ResourceAllocationRequestProto proto : allocationRequestsList) {
+ allocations.put(
+ new ReservationInterval(proto.getStartTime(), proto.getEndTime()),
+ convertFromProtoFormat(proto.getResource()));
+ }
+ return allocations;
+ }
+
+ private static ResourcePBImpl convertFromProtoFormat(ResourceProto resource) {
+ return new ResourcePBImpl(resource);
+ }
+
+ public static ReservationDefinitionPBImpl convertFromProtoFormat(
+ ReservationDefinitionProto r) {
+ return new ReservationDefinitionPBImpl(r);
+ }
+
+ public static ReservationIdPBImpl convertFromProtoFormat(
+ ReservationIdProto r) {
+ return new ReservationIdPBImpl(r);
+ }
+
+ public static ReservationId toReservationId(
+ ReservationIdProto reservationId) {
+ return new ReservationIdPBImpl(reservationId);
+ }
+
+ public static InMemoryReservationAllocation toInMemoryAllocation(
+ String planName, ReservationId reservationId,
+ ReservationAllocationStateProto allocationState, Resource minAlloc,
+ ResourceCalculator planResourceCalculator) {
+ ReservationDefinition definition =
+ convertFromProtoFormat(
+ allocationState.getReservationDefinition());
+ Map allocations = toAllocations(
+ allocationState.getAllocationRequestsList());
+ InMemoryReservationAllocation allocation =
+ new InMemoryReservationAllocation(reservationId, definition,
+ allocationState.getUser(), planName, allocationState.getStartTime(),
+ allocationState.getEndTime(), allocations, planResourceCalculator,
+ minAlloc, allocationState.getContainsGangs());
+ return allocation;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
index 3c8ac34..be7551f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
@@ -96,4 +96,21 @@ message AMRMTokenSecretManagerStateProto {
message RMDelegationTokenIdentifierDataProto {
optional YARNDelegationTokenIdentifierProto token_identifier = 1;
optional int64 renewDate = 2;
-}
\ No newline at end of file
+}
+
+message ResourceAllocationRequestProto {
+ optional int64 start_time = 1;
+ optional int64 end_time = 2;
+ optional ResourceProto resource = 3;
+}
+
+message ReservationAllocationStateProto {
+ optional ReservationIdProto reservation_id = 1;
+ optional ReservationDefinitionProto reservation_definition = 2;
+ repeated ResourceAllocationRequestProto allocation_requests = 3;
+ optional int64 start_time = 4;
+ optional int64 end_time = 5;
+ optional string user = 6;
+ optional bool contains_gangs = 7;
+ optional int64 acceptance_timestamp = 8;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 9e0d22b..6a1f2c5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -36,6 +39,11 @@
import javax.crypto.SecretKey;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -63,6 +71,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@@ -691,4 +701,189 @@ public void testAMRMTokenSecretManagerStateStore(
store.close();
}
+
+ public void testReservationStateStore(
+ RMStateStoreHelper stateStoreHelper) throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+
+ long ts = System.currentTimeMillis();
+ ReservationId r1 = ReservationId.newInstance(ts, 1);
+ int start = 1;
+ int[] alloc = { 10, 10, 10, 10, 10 };
+ ResourceCalculator res = new DefaultResourceCalculator();
+ Resource minAlloc = Resource.newInstance(1024, 1);
+ boolean hasGang = true;
+ String planName = "dedicated";
+ ReservationDefinition rDef =
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1, alloc.length);
+ ReservationAllocation allocation = new InMemoryReservationAllocation(
+ r1, rDef, "u3", planName, 0, 0 + alloc.length,
+ ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+ minAlloc, hasGang);
+ ReservationAllocationStateProto allocationStateProto =
+ ReservationSystemUtil.buildStateProto(allocation);
+ assertAllocationStateEqual(allocation, allocationStateProto);
+
+ // 1. Load empty store and verify no errors
+ store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ Map>
+ reservationState = state.getReservationState();
+ Assert.assertNotNull(reservationState);
+
+ // 2. Store single reservation and verify
+ String reservationIdName = r1.toString();
+ rmContext.getStateStore().storeNewReservation(
+ allocationStateProto,
+ planName, reservationIdName);
+
+
+ // load state and verify new state
+ validateStoredReservation(
+ stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+ allocationStateProto);
+
+ // 3. update state test
+ alloc = new int[]{6, 6, 6};
+ hasGang = false;
+ allocation = new InMemoryReservationAllocation(
+ r1, rDef, "u3", planName, 2, 2 + alloc.length,
+ ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res,
+ minAlloc, hasGang);
+ allocationStateProto =
+ ReservationSystemUtil.buildStateProto(allocation);
+ rmContext.getStateStore().updateReservation(
+ allocationStateProto,
+ planName, reservationIdName);
+
+ // load state and verify updated reservation
+ validateStoredReservation(
+ stateStoreHelper, dispatcher, rmContext, r1, planName, allocation,
+ allocationStateProto);
+
+ // 4. add a second one and remove the first one
+ ReservationId r2 = ReservationId.newInstance(ts, 2);
+ ReservationAllocation allocation2 = new InMemoryReservationAllocation(
+ r2, rDef, "u3", planName, 0, 0 + alloc.length,
+ ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res,
+ minAlloc, hasGang);
+ ReservationAllocationStateProto allocationStateProto2 =
+ ReservationSystemUtil.buildStateProto(allocation2);
+ String reservationIdName2 = r2.toString();
+
+ rmContext.getStateStore().storeNewReservation(
+ allocationStateProto2,
+ planName, reservationIdName2);
+ rmContext.getStateStore().removeReservation(planName, reservationIdName);
+
+ // load state and verify r1 is removed and r2 is still there
+ Map reservations;
+
+ store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ reservationState = state.getReservationState();
+ Assert.assertNotNull(reservationState);
+ reservations = reservationState.get(planName);
+ Assert.assertNotNull(reservations);
+ ReservationAllocationStateProto storedReservationAllocation =
+ reservations.get(r1);
+ Assert.assertNull("Removed reservation should not be available in store",
+ storedReservationAllocation);
+
+ storedReservationAllocation = reservations.get(r2);
+ assertAllocationStateEqual(
+ allocationStateProto2, storedReservationAllocation);
+ assertAllocationStateEqual(allocation2, storedReservationAllocation);
+
+
+ // 5. remove last reservation removes the plan state
+ rmContext.getStateStore().removeReservation(planName, reservationIdName2);
+
+ store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ reservationState = state.getReservationState();
+ Assert.assertNotNull(reservationState);
+ reservations = reservationState.get(planName);
+ Assert.assertNull(reservations);
+ }
+
+ private void validateStoredReservation(
+ RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher,
+ RMContext rmContext, ReservationId r1, String planName,
+ ReservationAllocation allocation,
+ ReservationAllocationStateProto allocationStateProto) throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ Map>
+ reservationState = state.getReservationState();
+ Assert.assertNotNull(reservationState);
+ Map reservations =
+ reservationState.get(planName);
+ Assert.assertNotNull(reservations);
+ ReservationAllocationStateProto storedReservationAllocation =
+ reservations.get(r1);
+ Assert.assertNotNull(storedReservationAllocation);
+
+ assertAllocationStateEqual(
+ allocationStateProto, storedReservationAllocation);
+ assertAllocationStateEqual(allocation, storedReservationAllocation);
+ }
+
+ void assertAllocationStateEqual(
+ ReservationAllocationStateProto expected,
+ ReservationAllocationStateProto actual) {
+
+ Assert.assertEquals(
+ expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp());
+ Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+ Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+ Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs());
+ Assert.assertEquals(expected.getUser(), actual.getUser());
+ Assert.assertEquals(
+ ReservationSystemUtil.convertFromProtoFormat(
+ expected.getReservationId()),
+ ReservationSystemUtil.convertFromProtoFormat(
+ actual.getReservationId()));
+ assertEquals(
+ expected.getReservationDefinition(), actual.getReservationDefinition());
+ assertEquals(expected.getAllocationRequestsList(),
+ actual.getAllocationRequestsList());
+ }
+
+ void assertAllocationStateEqual(
+ ReservationAllocation expected,
+ ReservationAllocationStateProto actual) {
+ Assert.assertEquals(
+ expected.getAcceptanceTime(), actual.getAcceptanceTimestamp());
+ Assert.assertEquals(expected.getStartTime(), actual.getStartTime());
+ Assert.assertEquals(expected.getEndTime(), actual.getEndTime());
+ Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs());
+ Assert.assertEquals(expected.getUser(), actual.getUser());
+ Assert.assertEquals(
+ expected.getReservationId(),
+ ReservationSystemUtil.convertFromProtoFormat(
+ actual.getReservationId()));
+ assertEquals(
+ expected.getReservationDefinition(),
+ ReservationSystemUtil.convertFromProtoFormat(
+ actual.getReservationDefinition()));
+ assertEquals(
+ expected.getAllocationRequests(),
+ ReservationSystemUtil.toAllocations(
+ actual.getAllocationRequestsList()));
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index c842294..bd3b62e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -186,6 +186,7 @@ public void testFSRMStateStore() throws Exception {
testDeleteStore(fsTester);
testRemoveApplication(fsTester);
testAMRMTokenSecretManagerStateStore(fsTester);
+ testReservationStateStore(fsTester);
} finally {
cluster.shutdown();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index 17cffa0..4666142 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -102,6 +102,12 @@ public void testAMTokens() throws Exception {
testAMRMTokenSecretManagerStateStore(tester);
}
+ @Test(timeout = 60000)
+ public void testReservation() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testReservationStateStore(tester);
+ }
+
class LeveldbStateStoreTester implements RMStateStoreHelper {
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestProtos.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestProtos.java
new file mode 100644
index 0000000..cc96412
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestProtos.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Simple test to verify the protos generated are valid
+ */
+public class TestProtos {
+
+ @Test
+ public void testProtoCanBePrinted() throws Exception {
+ EpochProto proto = EpochProto.newBuilder().setEpoch(100).build();
+ String protoString = proto.toString();
+ Assert.assertNotNull(protoString);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 34a4492..df96653 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -174,6 +174,7 @@ public void testZKRMStateStoreRealZK() throws Exception {
testDeleteStore(zkTester);
testRemoveApplication(zkTester);
testAMRMTokenSecretManagerStateStore(zkTester);
+ testReservationStateStore(zkTester);
((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)
zkTester.getRMStateStore()).testRetryingCreateRootDir();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index be1d69a..87ed938 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -184,6 +184,22 @@ public static FairScheduler setupFairScheduler(
return scheduler;
}
+ public static ReservationDefinition createSimpleReservationDefinition(
+ long arrival, long deadline, long duration) {
+ // create a request with a single atomic ask
+ ReservationRequest r =
+ ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
+ duration);
+ ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+ ReservationRequests reqs = new ReservationRequestsPBImpl();
+ reqs.setReservationResources(Collections.singletonList(r));
+ reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+ rDef.setReservationRequests(reqs);
+ rDef.setArrival(arrival);
+ rDef.setDeadline(deadline);
+ return rDef;
+ }
+
@SuppressWarnings("unchecked")
public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
index 55224a9..9fd5113 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java
@@ -17,7 +17,6 @@
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@@ -25,11 +24,7 @@
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
-import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
-import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
@@ -67,7 +62,8 @@ public void testBlocks() {
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length + 1,
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1,
alloc.length);
Map allocations =
generateAllocation(start, alloc, false, false);
@@ -89,7 +85,8 @@ public void testSteps() {
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length + 1,
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1,
alloc.length);
Map allocations =
generateAllocation(start, alloc, true, false);
@@ -112,7 +109,8 @@ public void testSkyline() {
int[] alloc = { 0, 5, 10, 10, 5, 0 };
int start = 100;
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length + 1,
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1,
alloc.length);
Map allocations =
generateAllocation(start, alloc, true, false);
@@ -135,7 +133,8 @@ public void testZeroAlloaction() {
int[] alloc = {};
long start = 0;
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length + 1,
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1,
alloc.length);
Map allocations =
new HashMap();
@@ -154,7 +153,8 @@ public void testGangAlloaction() {
int[] alloc = { 10, 10, 10, 10, 10, 10 };
int start = 100;
ReservationDefinition rDef =
- createSimpleReservationDefinition(start, start + alloc.length + 1,
+ ReservationSystemTestUtil.createSimpleReservationDefinition(
+ start, start + alloc.length + 1,
alloc.length);
boolean isGang = true;
Map allocations =
@@ -184,22 +184,6 @@ private void doAssertions(ReservationAllocation rAllocation,
Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime());
}
- private ReservationDefinition createSimpleReservationDefinition(long arrival,
- long deadline, long duration) {
- // create a request with a single atomic ask
- ReservationRequest r =
- ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1,
- duration);
- ReservationDefinition rDef = new ReservationDefinitionPBImpl();
- ReservationRequests reqs = new ReservationRequestsPBImpl();
- reqs.setReservationResources(Collections.singletonList(r));
- reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
- rDef.setReservationRequests(reqs);
- rDef.setArrival(arrival);
- rDef.setDeadline(deadline);
- return rDef;
- }
-
private Map generateAllocation(
int startTime, int[] alloc, boolean isStep, boolean isGang) {
Map req =