diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index aa9e0c6..bc2de1f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -134,6 +134,10 @@ message RMStateVersionProto { optional int32 minor_version = 2; } +message RMEpochProto { + optional int32 epoch = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 517e680..01d5064 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -101,4 +101,6 @@ void setRMApplicationHistoryWriter( ConfigurationProvider getConfigurationProvider(); boolean isWorkPreservingRecoveryEnabled(); + + int getEpoch(); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1abc660..f72ef30 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -82,6 +82,7 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private int epoch; /** * Default constructor. To be used in conjunction with setter methods for @@ -359,4 +360,13 @@ public void setConfigurationProvider( ConfigurationProvider configurationProvider) { this.configurationProvider = configurationProvider; } + + @Override + public int getEpoch() { + return this.epoch; + } + + void setEpoch(int epoch) { + this.epoch = epoch; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 77de209..1c692af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -482,6 +482,8 @@ protected void serviceStart() throws Exception { if(recoveryEnabled) { try { rmStore.checkVersion(); + rmContext.setEpoch(rmStore.loadEpoch()); + rmStore.updateEpoch(rmContext.getEpoch() + 1); RMState state = rmStore.loadState(); recover(state); } catch (Exception e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 7f4dad8..9ea7700 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -43,15 +43,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMEpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMEpoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMEpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -145,6 +149,31 @@ protected synchronized void storeVersion() throws Exception { writeFile(versionNodePath, data); } } + + @Override + public synchronized int loadEpoch() throws Exception { + Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); + if (fs.exists(epochNodePath)) { + FileStatus status = fs.getFileStatus(epochNodePath); + byte[] data = readFile(epochNodePath, status.getLen()); + RMEpoch epoch = new RMEpochPBImpl(RMEpochProto.parseFrom(data)); + return epoch.getEpoch(); + } + return 0; + } + + @Override + public synchronized void updateEpoch(int epoch) throws Exception { + Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); + RMEpochPBImpl pb = new RMEpochPBImpl(); + pb.setEpoch(epoch); + byte[] data = pb.getProto().toByteArray(); + if (fs.exists(epochNodePath)) { + updateFile(epochNodePath, data); + } else { + writeFile(epochNodePath, data); + } + } @Override public synchronized RMState loadState() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index a43b20d..5d48cc7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -53,6 +53,15 @@ public void checkVersion() throws Exception { } @Override + public synchronized int loadEpoch() throws Exception { + return 0; + } + + @Override + public synchronized void updateEpoch(int epoch) throws Exception { + } + + @Override public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 6a0426c..148893a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -48,6 +48,15 @@ protected void closeInternal() throws Exception { } @Override + public synchronized int loadEpoch() throws Exception { + return 0; + } + + @Override + public synchronized void updateEpoch(int epoch) throws Exception { + } + + @Override public RMState loadState() throws Exception { throw new UnsupportedOperationException("Cannot load state from null store"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index affc6f9..53c38f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; @@ -84,6 +85,7 @@ protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; protected static final String VERSION_NODE = "RMVersionNode"; + protected static final String EPOCH_NODE = "RMEpochNode"; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -513,6 +515,17 @@ public void checkVersion() throws Exception { */ protected abstract RMStateVersion getCurrentVersion(); + + /** + * Get the current epoch of RM. + */ + public abstract int loadEpoch() throws Exception; + + /** + * Update the epoch of RM based on current epoch. + */ + public abstract void updateEpoch(int currentEpoch) throws Exception; + /** * Blocking API * The derived class must recover state from the store and return a new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 63ae990..8f83b70 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -44,16 +44,21 @@ import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMEpochProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMEpoch; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMEpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; @@ -392,6 +397,31 @@ protected synchronized RMStateVersion loadVersion() throws Exception { } @Override + public synchronized int loadEpoch() throws Exception { + String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); + + if (existsWithRetries(epochNodePath, true) != null) { + byte[] data = getDataWithRetries(epochNodePath, true); + RMEpoch epoch = new RMEpochPBImpl(RMEpochProto.parseFrom(data)); + return epoch.getEpoch(); + } + return 0; + } + + @Override + public synchronized void updateEpoch(int epoch) throws Exception { + String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); + RMEpochPBImpl pb = new RMEpochPBImpl(); + pb.setEpoch(epoch); + byte[] data = pb.getProto().toByteArray(); + if (existsWithRetries(epochNodePath, true) != null) { + setDataWithRetries(epochNodePath, data, -1); + } else { + createWithRetries(epochNodePath, data, zkAcl, CreateMode.PERSISTENT); + } + } + + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); // recover DelegationTokenSecretManager diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMEpoch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMEpoch.java new file mode 100644 index 0000000..7274d6e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMEpoch.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * The epoch information of RM for work-preserving restart. + */ +@Private +@Unstable +public abstract class RMEpoch { + + public static RMEpoch newInstance(int sequenceNumber) { + RMEpoch epoch = Records.newRecord(RMEpoch.class); + epoch.setEpoch(sequenceNumber); + return epoch; + } + + public abstract int getEpoch(); + + public abstract void setEpoch(int sequenceNumber); + + public String toString() { + return String.valueOf(getEpoch()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getEpoch(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RMEpoch other = (RMEpoch) obj; + if (this.getEpoch() == other.getEpoch()) { + return true; + } else { + return false; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMEpochPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMEpochPBImpl.java new file mode 100644 index 0000000..e548794 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMEpochPBImpl.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMEpochProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMEpochProtoOrBuilder; + +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMEpoch; + +public class RMEpochPBImpl extends RMEpoch { + + RMEpochProto proto = RMEpochProto.getDefaultInstance(); + RMEpochProto.Builder builder = null; + boolean viaProto = false; + + public RMEpochPBImpl() { + builder = RMEpochProto.newBuilder(); + } + + public RMEpochPBImpl(RMEpochProto proto) { + this.proto = proto; + viaProto = true; + } + + public RMEpochProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RMEpochProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getEpoch() { + RMEpochProtoOrBuilder p = viaProto ? proto : builder; + return p.getEpoch(); + } + + @Override + public void setEpoch(int sequentialNumber) { + maybeInitBuilder(); + builder.setEpoch(sequentialNumber); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index d3d03fd..f01c929 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -57,7 +57,10 @@ private final String queueName; Queue queue; final String user; - private final AtomicInteger containerIdCounter = new AtomicInteger(0); + // TODO making containerIdCounter long + private final AtomicInteger containerIdCounter; + private final int EPOCH_BIT_MASK = 0x3ff; + private final int EPOCH_BIT_SHIFT = 22; final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -70,15 +73,19 @@ /* Allocated by scheduler */ boolean pending = true; // for app metrics - + + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + int epoch) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.containerIdCounter = new AtomicInteger( + (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT); } public ApplicationId getApplicationId() { @@ -413,7 +420,7 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( } public synchronized void recoverContainer(RMContainer rmContainer) { - // ContainerIdCounter on recovery will be addressed in YARN-2052 + // ContainerId is refreshed with epoch after RM restart. this.containerIdCounter.incrementAndGet(); QueueMetrics metrics = queue.getMetrics(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 09cf52d..daa13a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -109,9 +110,9 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, rmContext.getEpoch()); this.queue = queue; - + if (rmContext != null && rmContext.getRMApps() != null && rmContext.getRMApps() .containsKey(applicationAttemptId.getApplicationId())) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 507e164..aaf9e5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -493,6 +493,19 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } + + public void testEpoch(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + + int firstTimeEpoch = store.loadEpoch(); + Assert.assertEquals(0, firstTimeEpoch); + + store.updateEpoch(firstTimeEpoch + 1); + int secondTimeEpoch = store.loadEpoch(); + Assert.assertEquals(1, secondTimeEpoch); + } public void testAppDeletion(RMStateStoreHelper stateStoreHelper) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index da25c5b..6ccaeae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -158,6 +158,7 @@ public void testFSRMStateStore() throws Exception { .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); testCheckVersion(fsTester); + testEpoch(fsTester); testAppDeletion(fsTester); } finally { cluster.shutdown(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 284794b..d3a5475 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -120,6 +120,7 @@ public void testZKRMStateStoreRealZK() throws Exception { testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); testCheckVersion(zkTester); + testEpoch(zkTester); testAppDeletion(zkTester); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 93fd300..20a4aa8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -61,10 +62,15 @@ public void testMove() { QueueMetrics newMetrics = newQueue.getMetrics(); ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(3); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, - user, oldQueue, oldQueue.getActiveUsersManager(), null); + user, oldQueue, oldQueue.getActiveUsersManager(), rmContext); oldMetrics.submitApp(user); + // confirm that containerId is calculated based on epoch. + assertEquals(app.getNewContainerId(), 0x00c00001); + // Resource request Resource requestedResource = Resource.newInstance(1536, 2); Priority requestedPriority = Priority.newInstance(2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java index c651cb6..2d5a6d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; @@ -59,8 +60,11 @@ public void testDelayScheduling() { double rackLocalityThreshold = .6; ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null); + new FSSchedulerApp(applicationAttemptId, "user1", queue , null, + rmContext); // Default level should be node-local assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel( @@ -118,10 +122,12 @@ public void testDelaySchedulingForContinuousScheduling() long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds long rackLocalityDelayMs = 6 * 1000L; // 6 seconds + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); FSSchedulerApp schedulerApp = new FSSchedulerApp(applicationAttemptId, "user1", queue, - null, null); + null, rmContext); AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); long startTime = clock.getTime(); Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime); @@ -173,9 +179,12 @@ public void testLocalityLevelWithoutDelays() { Priority prio = Mockito.mock(Priority.class); Mockito.when(prio.getPriority()).thenReturn(1); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getEpoch()).thenReturn(0); ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); FSSchedulerApp schedulerApp = - new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null); + new FSSchedulerApp(applicationAttemptId, "user1", queue , null, + rmContext); assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel( prio, 10, -1.0, -1.0)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index c1866f0..cc738f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Before; @@ -40,6 +41,7 @@ private MaxRunningAppsEnforcer maxAppsEnforcer; private int appNum; private TestFairScheduler.MockClock clock; + private RMContext rmContext; @Before public void setup() throws Exception { @@ -59,13 +61,16 @@ public void setup() throws Exception { userMaxApps = allocConf.userMaxApps; maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); appNum = 0; + rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(0); } private FSSchedulerApp addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null); + FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, + rmContext); queue.addApp(app, runnable); if (runnable) { maxAppsEnforcer.trackRunnableApp(app);