diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8828499..988f41d 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.DataInputStream; import java.io.IOException; import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +41,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -57,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -76,10 +81,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -87,6 +94,9 @@ public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; + public static final Text HDFS_DELEGATION_KIND = + new Text("HDFS_DELEGATION_TOKEN"); + private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); private final Object heartbeatMonitor = new Object(); @@ -125,6 +135,8 @@ private boolean logAggregationEnabled; + private boolean hasProxyUserPrivileges; + private final List logAggregationReportForAppsTempList; private final NodeHealthCheckerService healthChecker; @@ -229,6 +241,10 @@ protected void serviceInit(Configuration conf) throws Exception { this.logAggregationEnabled = conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + + this.hasProxyUserPrivileges = + conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED); } @Override @@ -657,6 +673,8 @@ protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { + Map credsInStateStore = new HashMap<>(); + private void waitHeartbeatInterval(long minInterval, long maxInterval) throws InterruptedException { long start = System.currentTimeMillis(); long waitInterval = maxInterval; @@ -797,9 +815,13 @@ public void run() { Map systemCredentials = response.getSystemCredentialsForApps(); + + updateStateStoreIfNecessary(systemCredentials); + if (systemCredentials != null && !systemCredentials.isEmpty()) { ((NMContext) context) - .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); + .setSystemCrendentialsForApps(NodeStatusUpdaterImpl + .this.parseCredentials(systemCredentials)); } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM @@ -843,9 +865,84 @@ private void updateMasterKeys(NodeHeartbeatResponse response) { context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); } } + + private void updateStateStoreIfNecessary( + Map systemCredentials) throws IOException { + if (hasProxyUserPrivileges && systemCredentials != null) { + // update the credential to state store if it changed. + NMStateStoreService stateStore = context.getNMStateStore(); + for (Map.Entry anApp : + systemCredentials.entrySet()) { + + ByteBuffer oldCred = credsInStateStore.get(anApp.getKey()); + if (!(oldCred != null && Arrays.equals( + anApp.getValue().array(), oldCred.array()))) { + ApplicationId appId = anApp.getKey(); + + // 1. load application info from state store. + ContainerManagerApplicationProto proto = + stateStore.loadApplication(appId); + + if (proto != null) { + + // 2. load old credentials + Credentials appCred = new Credentials(); + appCred.readTokenStorageStream( + new DataInputStream(proto.getCredentials().newInput())); + + // 3. add the new credentials to the old, where + // the existing tokens will be replaced + Credentials newCred = parseCredentials(anApp.getValue()); + appCred.addAll(newCred); + + // 4. update cache + credsInStateStore.put(appId, serializeCredentials(appCred)); + + // 5. build new ContainerManagerApplicationProto + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + + builder.setId(proto.getId()); + builder.setUser(proto.getUser()); + builder.setCredentials(ByteString + .copyFrom(serializeCredentials(appCred))); + if (proto.getLogAggregationContext() != null) { + builder.setLogAggregationContext(proto.getLogAggregationContext()); + } + builder.addAllAcls(proto.getAclsList()); + + ContainerManagerApplicationProto newProto = builder.build(); + + // 6. update the application info in state store. + stateStore.storeApplication(appId, newProto); + } + } + } + } + } + + private Credentials parseCredentials(ByteBuffer input) + throws IOException { + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + if (input != null) { + dibb.reset(input); + credentials.readTokenStorageStream(dibb); + input.rewind(); + } + return credentials; + } + + private ByteBuffer serializeCredentials(Credentials credentials) + throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + return byteBuffer; + } }; - statusUpdater = - new Thread(statusUpdaterRunnable, "Node Status Updater"); + + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 93fdf12..3687ae7 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -376,6 +376,33 @@ public void storeApplication(ApplicationId appId, } @Override + public ContainerManagerApplicationProto loadApplication( + ApplicationId appId) throws IOException { + String appKey = APPLICATIONS_KEY_PREFIX + appId; + String finishedAppKey = FINISHED_APPS_KEY_PREFIX + appId; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(appKey)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.equals(appKey)) { + break; + } + return ContainerManagerApplicationProto.parseFrom(entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return null; + } + + @Override public void storeFinishedApplication(ApplicationId appId) throws IOException { String key = FINISHED_APPS_KEY_PREFIX + appId; diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index ab49543..ef5d6ef 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -58,6 +58,12 @@ public void storeApplication(ApplicationId appId, } @Override + public ContainerManagerApplicationProto loadApplication(ApplicationId appId) + throws IOException { + return null; + } + + @Override public void storeFinishedApplication(ApplicationId appId) { } diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index fa66349..2ccc9b1 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @Private @Unstable @@ -242,6 +243,16 @@ public abstract void storeApplication(ApplicationId appId, ContainerManagerApplicationProto p) throws IOException; /** + * Load the application data. + * @param appId the application ID + * @return the application proto or {@code null} if didn't find + * the application + * @throws IOException + */ + public abstract ContainerManagerApplicationProto loadApplication( + ApplicationId appId) throws IOException; + + /** * Record that an application has finished * @param appId the application ID * @throws IOException diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index e0487e7..c232428 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -98,6 +98,12 @@ public synchronized void storeApplication(ApplicationId appId, } @Override + public ContainerManagerApplicationProto loadApplication(ApplicationId appId) + throws IOException { + return apps.get(appId); + } + + @Override public synchronized void storeFinishedApplication(ApplicationId appId) { finishedApps.add(appId); } diff --git a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 1804424..71c6a0c 100644 --- a/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/source/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -216,6 +216,35 @@ public void testApplicationStorage() throws IOException { assertEquals(1, state.getFinishedApplications().size()); assertEquals(appId1, state.getFinishedApplications().get(0)); } + + @Test + public void testLoadApplication() throws IOException { + // store an applications and verify recovered + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); + builder.setId(((ApplicationIdPBImpl) appId1).getProto()); + builder.setUser("user1"); + ContainerManagerApplicationProto appProto1 = builder.build(); + stateStore.storeApplication(appId1, appProto1); + + final ApplicationId appId2 = ApplicationId.newInstance(1235, 2); + builder.setId(((ApplicationIdPBImpl) appId2).getProto()); + builder.setUser("user2"); + ContainerManagerApplicationProto appProto2 = builder.build(); + stateStore.storeApplication(appId2, appProto2); + + + ContainerManagerApplicationProto loadedProto1 = + stateStore.loadApplication(appId1); + assertEquals(appId1.getId(), loadedProto1.getId().getId()); + assertTrue("user1".equals(loadedProto1.getUser())); + + ContainerManagerApplicationProto loadedProto2 = + stateStore.loadApplication(appId2); + assertEquals(appId2.getId(), loadedProto2.getId().getId()); + assertTrue("user2".equals(loadedProto2.getUser())); + } @Test public void testContainerStorage() throws IOException {