diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f53e6c2..e3a783d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.DataInputStream; import java.io.IOException; import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +41,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -62,6 +66,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; @@ -86,12 +91,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -99,6 +106,9 @@ public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); + private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); @@ -149,6 +159,8 @@ private NMNodeLabelsHandler nodeLabelsHandler; private final NodeLabelsProvider nodeLabelsProvider; + private Map credsInStateStore = new HashMap<>(); + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { this(context, dispatcher, healthChecker, metrics, null); @@ -771,7 +783,7 @@ public long getRMIdentifier() { return this.rmIdentifier; } - private static Map parseCredentials( + private static Map parseCredentialsMap( Map systemCredentials) throws IOException { Map map = new HashMap(); @@ -793,6 +805,27 @@ public long getRMIdentifier() { return map; } + private static Credentials parseCredentials(ByteBuffer input) + throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + if (input != null) { + dibb.reset(input); + credentials.readTokenStorageStream(dibb); + input.rewind(); + } + return credentials; + } + + private static ByteBuffer serializeCredentials(Credentials credentials) + throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), + 0, dob.getLength()); + return byteBuffer; + } + protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @@ -866,8 +899,9 @@ public void run() { Map systemCredentials = response.getSystemCredentialsForApps(); if (systemCredentials != null && !systemCredentials.isEmpty()) { + updateStateStoreIfNecessary(systemCredentials); ((NMContext) context).setSystemCrendentialsForApps( - parseCredentials(systemCredentials)); + parseCredentialsMap(systemCredentials)); } List containersToDecrease = response.getContainersToDecrease(); @@ -946,11 +980,68 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { } } }; - statusUpdater = - new Thread(statusUpdaterRunnable, "Node Status Updater"); + + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } + private void updateStateStoreIfNecessary( + Map systemCredentials) throws IOException { + // update the credential to state store if it changed. + NMStateStoreService stateStore = context.getNMStateStore(); + for (Map.Entry anApp : + systemCredentials.entrySet()) { + + ByteBuffer oldCred = credsInStateStore.get(anApp.getKey()); + if (!(oldCred != null && Arrays.equals( + anApp.getValue().array(), oldCred.array()))) { + ApplicationId appId = anApp.getKey(); + + LOG.info("Received new system credentails for app " + appId + + " from RM, now update the NMStateStore."); + + // 0. mark this update + credsInStateStore.put(appId, anApp.getValue()); + + // 1. load application info from state store. + ContainerManagerApplicationProto proto = + stateStore.loadApplication(appId); + + if (proto != null) { + + // 2. load old credentials + Credentials appCred = new Credentials(); + appCred.readTokenStorageStream( + new DataInputStream(proto.getCredentials().newInput())); + + // 3. add the new credentials to the old, where the existing + // tokens that have the same alias will be replaced + Credentials newCred = parseCredentials(anApp.getValue()); + appCred.addAll(newCred); + + // 4. build new ContainerManagerApplicationProto + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + + builder.setId(proto.getId()); + builder.setUser(proto.getUser()); + builder.setCredentials(ByteString + .copyFrom(serializeCredentials(appCred))); + if (proto.getLogAggregationContext() != null) { + builder.setLogAggregationContext(proto + .getLogAggregationContext()); + } + builder.addAllAcls(proto.getAclsList()); + + ContainerManagerApplicationProto newProto = builder.build(); + + // 5. update the application info in state store. + stateStore.storeApplication(appId, newProto); + } + } + } + } + private boolean handleShutdownOrResyncCommand( NodeHeartbeatResponse response) { if (response.getNodeAction() == NodeAction.SHUTDOWN) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5fe2713..beda443 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -389,6 +389,31 @@ public void storeContainerCompleted(ContainerId containerId, } @Override + public ContainerManagerApplicationProto loadApplication( + ApplicationId appId) throws IOException { + String appKey = APPLICATIONS_KEY_PREFIX + appId; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(appKey)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.equals(appKey)) { + break; + } + return ContainerManagerApplicationProto.parseFrom(entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return null; + } + public void storeContainerRemainingRetryAttempts(ContainerId containerId, int remainingRetryAttempts) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 112095e..0569f97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -59,6 +59,12 @@ public void storeApplication(ApplicationId appId, } @Override + public ContainerManagerApplicationProto loadApplication(ApplicationId appId) + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + public void removeApplication(ApplicationId appId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 57f35a4..02786de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -270,6 +270,16 @@ public boolean isNewlyCreated() { } /** + * Load the application data. + * @param appId the application ID + * @return the application proto or {@code null} if didn't find + * the application + * @throws IOException + */ + public abstract ContainerManagerApplicationProto loadApplication( + ApplicationId appId) throws IOException; + + /** * Load the state of applications * @return recovered state for applications * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 3c5edc0..408bed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -102,6 +102,11 @@ public synchronized void removeApplication(ApplicationId appId) } @Override + public ContainerManagerApplicationProto loadApplication(ApplicationId appId) + throws IOException { + return apps.get(appId); + } + public synchronized List loadContainersState() throws IOException { // return a copy so caller can't modify our state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index d254e4b..8d2f049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -213,6 +213,35 @@ public void testApplicationStorage() throws IOException { assertEquals(1, state.getApplications().size()); assertEquals(appProto1, state.getApplications().get(0)); } + + @Test + public void testLoadApplication() throws IOException { + // store an applications and verify recovered + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + builder.setId(((ApplicationIdPBImpl) appId1).getProto()); + builder.setUser("user1"); + ContainerManagerApplicationProto appProto1 = builder.build(); + stateStore.storeApplication(appId1, appProto1); + + final ApplicationId appId2 = ApplicationId.newInstance(1235, 2); + builder.setId(((ApplicationIdPBImpl) appId2).getProto()); + builder.setUser("user2"); + ContainerManagerApplicationProto appProto2 = builder.build(); + stateStore.storeApplication(appId2, appProto2); + + + ContainerManagerApplicationProto loadedProto1 = + stateStore.loadApplication(appId1); + assertEquals(appId1.getId(), loadedProto1.getId().getId()); + assertTrue("user1".equals(loadedProto1.getUser())); + + ContainerManagerApplicationProto loadedProto2 = + stateStore.loadApplication(appId2); + assertEquals(appId2.getId(), loadedProto2.getId().getId()); + assertTrue("user2".equals(loadedProto2.getUser())); + } @Test public void testContainerStorage() throws IOException {